From 011dc2162ef51a87f8def8749c67d095739da80b Mon Sep 17 00:00:00 2001 From: Asish Kumar Date: Thu, 9 Apr 2026 17:16:55 +0000 Subject: [PATCH] Synchronize gateway liveness (#15096) Problem RemoteClusterServiceWatcher updates gateway liveness from a background goroutine while mirror endpoint reconciliation reads the same state without synchronization. This leaves the multicluster service mirror vulnerable to race detector failures and inconsistent readiness updates under gateway flapping. Solution Protect gateway liveness with a dedicated RWMutex-backed accessor, route the liveness loop through a helper that uses those accessors, and update existing tests to use the synchronized setter. Add a race-focused regression test that exercises the liveness watcher concurrently with endpoint readiness updates. Validation - go test ./multicluster/service-mirror/... - go test -race ./multicluster/service-mirror/... - go test -race ./multicluster/service-mirror -run TestGatewayAliveSynchronization -count=1 Fixes #15096 Signed-off-by: Asish Kumar --- .../service-mirror/cluster_watcher.go | 54 ++++++++++++------- .../cluster_watcher_mirroring_test.go | 4 +- .../cluster_watcher_race_test.go | 48 +++++++++++++++++ 3 files changed, 86 insertions(+), 20 deletions(-) create mode 100644 multicluster/service-mirror/cluster_watcher_race_test.go diff --git a/multicluster/service-mirror/cluster_watcher.go b/multicluster/service-mirror/cluster_watcher.go index 8a7a1d415f556..a151e947352ed 100644 --- a/multicluster/service-mirror/cluster_watcher.go +++ b/multicluster/service-mirror/cluster_watcher.go @@ -10,6 +10,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/linkerd/linkerd2/controller/gen/apis/link/v1alpha3" @@ -60,6 +61,7 @@ type ( eventsQueue workqueue.TypedRateLimitingInterface[any] requeueLimit int repairPeriod time.Duration + gatewayAliveMu sync.RWMutex gatewayAlive bool liveness chan bool headlessServicesEnabled bool @@ -371,6 +373,18 @@ func (rcsw *RemoteClusterServiceWatcher) getMirrorServiceAnnotations(remoteServi return annotations } +func (rcsw *RemoteClusterServiceWatcher) getGatewayAlive() bool { + rcsw.gatewayAliveMu.RLock() + defer rcsw.gatewayAliveMu.RUnlock() + return rcsw.gatewayAlive +} + +func (rcsw *RemoteClusterServiceWatcher) setGatewayAlive(alive bool) { + rcsw.gatewayAliveMu.Lock() + defer rcsw.gatewayAliveMu.Unlock() + rcsw.gatewayAlive = alive +} + // Provides annotations for federated service func (rcsw *RemoteClusterServiceWatcher) getFederatedServiceAnnotations(remoteService *corev1.Service) map[string]string { annotations := rcsw.getCommonServiceAnnotations(remoteService) @@ -1468,27 +1482,31 @@ func (rcsw *RemoteClusterServiceWatcher) Start(ctx context.Context) error { ev := RepairEndpoints{} rcsw.eventsQueue.Add(&ev) - go func() { - ticker := time.NewTicker(rcsw.repairPeriod) - for { - select { - case <-ticker.C: - ev := RepairEndpoints{} - rcsw.eventsQueue.Add(&ev) - case alive := <-rcsw.liveness: - rcsw.log.Debugf("gateway liveness change from %t to %t", rcsw.gatewayAlive, alive) - rcsw.gatewayAlive = alive - ev := RepairEndpoints{} - rcsw.eventsQueue.Add(&ev) - case <-rcsw.stopper: - return - } - } - }() + go rcsw.watchGatewayLiveness() return nil } +func (rcsw *RemoteClusterServiceWatcher) watchGatewayLiveness() { + ticker := time.NewTicker(rcsw.repairPeriod) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + ev := RepairEndpoints{} + rcsw.eventsQueue.Add(&ev) + case alive := <-rcsw.liveness: + rcsw.log.Debugf("gateway liveness change from %t to %t", rcsw.getGatewayAlive(), alive) + rcsw.setGatewayAlive(alive) + ev := RepairEndpoints{} + rcsw.eventsQueue.Add(&ev) + case <-rcsw.stopper: + return + } + } +} + // Stop stops watching the cluster and cleans up all mirrored resources func (rcsw *RemoteClusterServiceWatcher) Stop(cleanupState bool) { close(rcsw.stopper) @@ -1786,7 +1804,7 @@ func (rcsw *RemoteClusterServiceWatcher) updateMirrorEndpoints(ctx context.Conte } func (rcsw *RemoteClusterServiceWatcher) updateReadiness(endpoints *corev1.Endpoints) { - if !rcsw.gatewayAlive { + if !rcsw.getGatewayAlive() { rcsw.log.Warnf("gateway for %s/%s does not have ready addresses; setting addresses to not ready", endpoints.Namespace, endpoints.Name) for i := range endpoints.Subsets { endpoints.Subsets[i].NotReadyAddresses = append(endpoints.Subsets[i].NotReadyAddresses, endpoints.Subsets[i].Addresses...) diff --git a/multicluster/service-mirror/cluster_watcher_mirroring_test.go b/multicluster/service-mirror/cluster_watcher_mirroring_test.go index 58ed875c942bf..08c65e95dfdc4 100644 --- a/multicluster/service-mirror/cluster_watcher_mirroring_test.go +++ b/multicluster/service-mirror/cluster_watcher_mirroring_test.go @@ -567,7 +567,7 @@ func TestServiceCreatedGatewayAlive(t *testing.T) { // The gateway is now down which triggers repairing Endpoints on the local // cluster. - watcher.gatewayAlive = false + watcher.setGatewayAlive(false) events.Add(&RepairEndpoints{}) for events.Len() > 0 { watcher.processNextEvent(context.Background()) @@ -725,7 +725,7 @@ func TestServiceCreatedGatewayDown(t *testing.T) { // The gateway is now alive which triggers repairing Endpoints on the // local cluster. - watcher.gatewayAlive = true + watcher.setGatewayAlive(true) events.Add(&RepairEndpoints{}) for events.Len() > 0 { watcher.processNextEvent(context.Background()) diff --git a/multicluster/service-mirror/cluster_watcher_race_test.go b/multicluster/service-mirror/cluster_watcher_race_test.go new file mode 100644 index 0000000000000..dde1185c66f23 --- /dev/null +++ b/multicluster/service-mirror/cluster_watcher_race_test.go @@ -0,0 +1,48 @@ +package servicemirror + +import ( + "testing" + "time" + + logging "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/util/workqueue" +) + +func TestGatewayAliveSynchronization(t *testing.T) { + stopper := make(chan struct{}) + queue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]()) + defer queue.ShutDown() + + watcher := &RemoteClusterServiceWatcher{ + stopper: stopper, + log: logging.NewEntry(logging.New()), + eventsQueue: queue, + repairPeriod: time.Hour, + gatewayAlive: true, + liveness: make(chan bool, 1024), + } + + done := make(chan struct{}) + go func() { + defer close(done) + watcher.watchGatewayLiveness() + }() + + endpoints := &corev1.Endpoints{ + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{ + IP: "192.0.2.1", + }}, + }}, + } + + for i := 0; i < 1024; i++ { + watcher.liveness <- i%2 == 0 + ep := endpoints.DeepCopy() + watcher.updateReadiness(ep) + } + + close(stopper) + <-done +}