From 3213d9c0352a777d46a51bea3975c3edc1a5805b Mon Sep 17 00:00:00 2001 From: Aryan Puttur Date: Wed, 3 Jun 2026 10:06:09 -0400 Subject: [PATCH] Retry site flow controller start after acquiring leader lease Fixes #2466 --- internal/kube/adaptor/collector.go | 43 +++++++++++++++++++++++++++--- 1 file changed, 39 insertions(+), 4 deletions(-) diff --git a/internal/kube/adaptor/collector.go b/internal/kube/adaptor/collector.go index f657125bc..670e816c5 100644 --- a/internal/kube/adaptor/collector.go +++ b/internal/kube/adaptor/collector.go @@ -83,7 +83,7 @@ func siteCollector(ctx context.Context, cli *internalclient.KubeClient) { } func startFlowController(ctx context.Context, cli *internalclient.KubeClient) error { - deployment, err := cli.Kube.AppsV1().Deployments(cli.Namespace).Get(context.TODO(), deploymentName(), metav1.GetOptions{}) + deployment, err := cli.Kube.AppsV1().Deployments(cli.Namespace).Get(ctx, deploymentName(), metav1.GetOptions{}) if err != nil { return fmt.Errorf("failed to get transport deployment: %s", err) } @@ -119,6 +119,43 @@ func startFlowController(ctx context.Context, cli *internalclient.KubeClient) er return nil } +func ensureStartFlowController(ctx context.Context, cli *internalclient.KubeClient) { + go func() { + b := backoff.NewExponentialBackOff() + b.InitialInterval = time.Millisecond * 250 + b.MaxInterval = time.Second * 30 + b.MaxElapsedTime = 0 + b.Reset() + + attempt := 0 + err := backoff.RetryNotify( + func() error { + attempt++ + if err := startFlowController(ctx, cli); err != nil { + return err + } + if attempt > 1 { + slog.Info("COLLECTOR: Site flow controller started after retry", slog.Int("attempt", attempt)) + } + return nil + }, + backoff.WithContext(b, ctx), + func(err error, d time.Duration) { + if ctx.Err() != nil { + return + } + slog.Error("COLLECTOR: Failed to start controller for emitting site events, retrying", + slog.Any("error", err), + slog.Duration("retryAfter", d), + ) + }, + ) + if err != nil && ctx.Err() == nil { + slog.Error("COLLECTOR: Stopped retrying site flow controller start", slog.Any("error", err)) + } + }() +} + func runLeaderElection(lock *resourcelock.LeaseLock, id string, cli *internalclient.KubeClient) { var ( mu sync.Mutex @@ -141,9 +178,7 @@ func runLeaderElection(lock *resourcelock.LeaseLock, id string, cli *internalcli leaderCtx, leaderCtxCancel = context.WithCancel(ctx) slog.Info("COLLECTOR: Became leader. Starting status sync and site controller", slog.Any("elapsedTime", strategy.GetElapsedTime())) siteCollector(leaderCtx, cli) - if err := startFlowController(leaderCtx, cli); err != nil { - slog.Error("COLLECTOR: Failed to start controller for emitting site events", slog.Any("error", err)) - } + ensureStartFlowController(leaderCtx, cli) }, OnStoppedLeading: func() { slog.Info("COLLECTOR: Lost leader lock. Stopping status sync and site controller", slog.Any("elapsedTime", strategy.GetElapsedTime()))