Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 36 additions & 18 deletions multicluster/service-mirror/cluster_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/linkerd/linkerd2/controller/gen/apis/link/v1alpha3"
Expand Down Expand Up @@ -60,6 +61,7 @@ type (
eventsQueue workqueue.TypedRateLimitingInterface[any]
requeueLimit int
repairPeriod time.Duration
gatewayAliveMu sync.RWMutex
gatewayAlive bool
liveness chan bool
headlessServicesEnabled bool
Expand Down Expand Up @@ -371,6 +373,18 @@ func (rcsw *RemoteClusterServiceWatcher) getMirrorServiceAnnotations(remoteServi
return annotations
}

func (rcsw *RemoteClusterServiceWatcher) getGatewayAlive() bool {
rcsw.gatewayAliveMu.RLock()
defer rcsw.gatewayAliveMu.RUnlock()
return rcsw.gatewayAlive
}

func (rcsw *RemoteClusterServiceWatcher) setGatewayAlive(alive bool) {
rcsw.gatewayAliveMu.Lock()
defer rcsw.gatewayAliveMu.Unlock()
rcsw.gatewayAlive = alive
}

// Provides annotations for federated service
func (rcsw *RemoteClusterServiceWatcher) getFederatedServiceAnnotations(remoteService *corev1.Service) map[string]string {
annotations := rcsw.getCommonServiceAnnotations(remoteService)
Expand Down Expand Up @@ -1468,27 +1482,31 @@ func (rcsw *RemoteClusterServiceWatcher) Start(ctx context.Context) error {
ev := RepairEndpoints{}
rcsw.eventsQueue.Add(&ev)

go func() {
ticker := time.NewTicker(rcsw.repairPeriod)
for {
select {
case <-ticker.C:
ev := RepairEndpoints{}
rcsw.eventsQueue.Add(&ev)
case alive := <-rcsw.liveness:
rcsw.log.Debugf("gateway liveness change from %t to %t", rcsw.gatewayAlive, alive)
rcsw.gatewayAlive = alive
ev := RepairEndpoints{}
rcsw.eventsQueue.Add(&ev)
case <-rcsw.stopper:
return
}
}
}()
go rcsw.watchGatewayLiveness()

return nil
}

func (rcsw *RemoteClusterServiceWatcher) watchGatewayLiveness() {
ticker := time.NewTicker(rcsw.repairPeriod)
defer ticker.Stop()

for {
select {
case <-ticker.C:
ev := RepairEndpoints{}
rcsw.eventsQueue.Add(&ev)
case alive := <-rcsw.liveness:
rcsw.log.Debugf("gateway liveness change from %t to %t", rcsw.getGatewayAlive(), alive)
rcsw.setGatewayAlive(alive)
ev := RepairEndpoints{}
rcsw.eventsQueue.Add(&ev)
case <-rcsw.stopper:
return
}
}
}

// Stop stops watching the cluster and cleans up all mirrored resources
func (rcsw *RemoteClusterServiceWatcher) Stop(cleanupState bool) {
close(rcsw.stopper)
Expand Down Expand Up @@ -1786,7 +1804,7 @@ func (rcsw *RemoteClusterServiceWatcher) updateMirrorEndpoints(ctx context.Conte
}

func (rcsw *RemoteClusterServiceWatcher) updateReadiness(endpoints *corev1.Endpoints) {
if !rcsw.gatewayAlive {
if !rcsw.getGatewayAlive() {
rcsw.log.Warnf("gateway for %s/%s does not have ready addresses; setting addresses to not ready", endpoints.Namespace, endpoints.Name)
for i := range endpoints.Subsets {
endpoints.Subsets[i].NotReadyAddresses = append(endpoints.Subsets[i].NotReadyAddresses, endpoints.Subsets[i].Addresses...)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ func TestServiceCreatedGatewayAlive(t *testing.T) {

// The gateway is now down which triggers repairing Endpoints on the local
// cluster.
watcher.gatewayAlive = false
watcher.setGatewayAlive(false)
events.Add(&RepairEndpoints{})
for events.Len() > 0 {
watcher.processNextEvent(context.Background())
Expand Down Expand Up @@ -725,7 +725,7 @@ func TestServiceCreatedGatewayDown(t *testing.T) {

// The gateway is now alive which triggers repairing Endpoints on the
// local cluster.
watcher.gatewayAlive = true
watcher.setGatewayAlive(true)
events.Add(&RepairEndpoints{})
for events.Len() > 0 {
watcher.processNextEvent(context.Background())
Expand Down
48 changes: 48 additions & 0 deletions multicluster/service-mirror/cluster_watcher_race_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package servicemirror

import (
"testing"
"time"

logging "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/util/workqueue"
)

func TestGatewayAliveSynchronization(t *testing.T) {
stopper := make(chan struct{})
queue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]())
defer queue.ShutDown()

watcher := &RemoteClusterServiceWatcher{
stopper: stopper,
log: logging.NewEntry(logging.New()),
eventsQueue: queue,
repairPeriod: time.Hour,
gatewayAlive: true,
liveness: make(chan bool, 1024),
}

done := make(chan struct{})
go func() {
defer close(done)
watcher.watchGatewayLiveness()
}()

endpoints := &corev1.Endpoints{
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{
IP: "192.0.2.1",
}},
}},
}

for i := 0; i < 1024; i++ {
watcher.liveness <- i%2 == 0
ep := endpoints.DeepCopy()
watcher.updateReadiness(ep)
}

close(stopper)
<-done
}
Loading