diff --git a/multicluster/service-mirror/cluster_watcher.go b/multicluster/service-mirror/cluster_watcher.go index 8a7a1d415f556..a151e947352ed 100644 --- a/multicluster/service-mirror/cluster_watcher.go +++ b/multicluster/service-mirror/cluster_watcher.go @@ -10,6 +10,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/linkerd/linkerd2/controller/gen/apis/link/v1alpha3" @@ -60,6 +61,7 @@ type ( eventsQueue workqueue.TypedRateLimitingInterface[any] requeueLimit int repairPeriod time.Duration + gatewayAliveMu sync.RWMutex gatewayAlive bool liveness chan bool headlessServicesEnabled bool @@ -371,6 +373,18 @@ func (rcsw *RemoteClusterServiceWatcher) getMirrorServiceAnnotations(remoteServi return annotations } +func (rcsw *RemoteClusterServiceWatcher) getGatewayAlive() bool { + rcsw.gatewayAliveMu.RLock() + defer rcsw.gatewayAliveMu.RUnlock() + return rcsw.gatewayAlive +} + +func (rcsw *RemoteClusterServiceWatcher) setGatewayAlive(alive bool) { + rcsw.gatewayAliveMu.Lock() + defer rcsw.gatewayAliveMu.Unlock() + rcsw.gatewayAlive = alive +} + // Provides annotations for federated service func (rcsw *RemoteClusterServiceWatcher) getFederatedServiceAnnotations(remoteService *corev1.Service) map[string]string { annotations := rcsw.getCommonServiceAnnotations(remoteService) @@ -1468,27 +1482,31 @@ func (rcsw *RemoteClusterServiceWatcher) Start(ctx context.Context) error { ev := RepairEndpoints{} rcsw.eventsQueue.Add(&ev) - go func() { - ticker := time.NewTicker(rcsw.repairPeriod) - for { - select { - case <-ticker.C: - ev := RepairEndpoints{} - rcsw.eventsQueue.Add(&ev) - case alive := <-rcsw.liveness: - rcsw.log.Debugf("gateway liveness change from %t to %t", rcsw.gatewayAlive, alive) - rcsw.gatewayAlive = alive - ev := RepairEndpoints{} - rcsw.eventsQueue.Add(&ev) - case <-rcsw.stopper: - return - } - } - }() + go rcsw.watchGatewayLiveness() return nil } +func (rcsw *RemoteClusterServiceWatcher) watchGatewayLiveness() { + ticker := time.NewTicker(rcsw.repairPeriod) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + ev := RepairEndpoints{} + rcsw.eventsQueue.Add(&ev) + case alive := <-rcsw.liveness: + rcsw.log.Debugf("gateway liveness change from %t to %t", rcsw.getGatewayAlive(), alive) + rcsw.setGatewayAlive(alive) + ev := RepairEndpoints{} + rcsw.eventsQueue.Add(&ev) + case <-rcsw.stopper: + return + } + } +} + // Stop stops watching the cluster and cleans up all mirrored resources func (rcsw *RemoteClusterServiceWatcher) Stop(cleanupState bool) { close(rcsw.stopper) @@ -1786,7 +1804,7 @@ func (rcsw *RemoteClusterServiceWatcher) updateMirrorEndpoints(ctx context.Conte } func (rcsw *RemoteClusterServiceWatcher) updateReadiness(endpoints *corev1.Endpoints) { - if !rcsw.gatewayAlive { + if !rcsw.getGatewayAlive() { rcsw.log.Warnf("gateway for %s/%s does not have ready addresses; setting addresses to not ready", endpoints.Namespace, endpoints.Name) for i := range endpoints.Subsets { endpoints.Subsets[i].NotReadyAddresses = append(endpoints.Subsets[i].NotReadyAddresses, endpoints.Subsets[i].Addresses...) diff --git a/multicluster/service-mirror/cluster_watcher_mirroring_test.go b/multicluster/service-mirror/cluster_watcher_mirroring_test.go index 58ed875c942bf..08c65e95dfdc4 100644 --- a/multicluster/service-mirror/cluster_watcher_mirroring_test.go +++ b/multicluster/service-mirror/cluster_watcher_mirroring_test.go @@ -567,7 +567,7 @@ func TestServiceCreatedGatewayAlive(t *testing.T) { // The gateway is now down which triggers repairing Endpoints on the local // cluster. - watcher.gatewayAlive = false + watcher.setGatewayAlive(false) events.Add(&RepairEndpoints{}) for events.Len() > 0 { watcher.processNextEvent(context.Background()) @@ -725,7 +725,7 @@ func TestServiceCreatedGatewayDown(t *testing.T) { // The gateway is now alive which triggers repairing Endpoints on the // local cluster. - watcher.gatewayAlive = true + watcher.setGatewayAlive(true) events.Add(&RepairEndpoints{}) for events.Len() > 0 { watcher.processNextEvent(context.Background()) diff --git a/multicluster/service-mirror/cluster_watcher_race_test.go b/multicluster/service-mirror/cluster_watcher_race_test.go new file mode 100644 index 0000000000000..dde1185c66f23 --- /dev/null +++ b/multicluster/service-mirror/cluster_watcher_race_test.go @@ -0,0 +1,48 @@ +package servicemirror + +import ( + "testing" + "time" + + logging "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/util/workqueue" +) + +func TestGatewayAliveSynchronization(t *testing.T) { + stopper := make(chan struct{}) + queue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]()) + defer queue.ShutDown() + + watcher := &RemoteClusterServiceWatcher{ + stopper: stopper, + log: logging.NewEntry(logging.New()), + eventsQueue: queue, + repairPeriod: time.Hour, + gatewayAlive: true, + liveness: make(chan bool, 1024), + } + + done := make(chan struct{}) + go func() { + defer close(done) + watcher.watchGatewayLiveness() + }() + + endpoints := &corev1.Endpoints{ + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{ + IP: "192.0.2.1", + }}, + }}, + } + + for i := 0; i < 1024; i++ { + watcher.liveness <- i%2 == 0 + ep := endpoints.DeepCopy() + watcher.updateReadiness(ep) + } + + close(stopper) + <-done +}