Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 39 additions & 4 deletions internal/kube/adaptor/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func siteCollector(ctx context.Context, cli *internalclient.KubeClient) {
}

func startFlowController(ctx context.Context, cli *internalclient.KubeClient) error {
deployment, err := cli.Kube.AppsV1().Deployments(cli.Namespace).Get(context.TODO(), deploymentName(), metav1.GetOptions{})
deployment, err := cli.Kube.AppsV1().Deployments(cli.Namespace).Get(ctx, deploymentName(), metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get transport deployment: %s", err)
}
Expand Down Expand Up @@ -119,6 +119,43 @@ func startFlowController(ctx context.Context, cli *internalclient.KubeClient) er
return nil
}

func ensureStartFlowController(ctx context.Context, cli *internalclient.KubeClient) {
go func() {
b := backoff.NewExponentialBackOff()
b.InitialInterval = time.Millisecond * 250
b.MaxInterval = time.Second * 30
b.MaxElapsedTime = 0
b.Reset()

attempt := 0
err := backoff.RetryNotify(
func() error {
attempt++
if err := startFlowController(ctx, cli); err != nil {
return err
}
if attempt > 1 {
slog.Info("COLLECTOR: Site flow controller started after retry", slog.Int("attempt", attempt))
}
return nil
},
backoff.WithContext(b, ctx),
func(err error, d time.Duration) {
if ctx.Err() != nil {
return
}
slog.Error("COLLECTOR: Failed to start controller for emitting site events, retrying",
slog.Any("error", err),
slog.Duration("retryAfter", d),
)
},
)
if err != nil && ctx.Err() == nil {
slog.Error("COLLECTOR: Stopped retrying site flow controller start", slog.Any("error", err))
}
}()
}

func runLeaderElection(lock *resourcelock.LeaseLock, id string, cli *internalclient.KubeClient) {
var (
mu sync.Mutex
Expand All @@ -141,9 +178,7 @@ func runLeaderElection(lock *resourcelock.LeaseLock, id string, cli *internalcli
leaderCtx, leaderCtxCancel = context.WithCancel(ctx)
slog.Info("COLLECTOR: Became leader. Starting status sync and site controller", slog.Any("elapsedTime", strategy.GetElapsedTime()))
siteCollector(leaderCtx, cli)
if err := startFlowController(leaderCtx, cli); err != nil {
slog.Error("COLLECTOR: Failed to start controller for emitting site events", slog.Any("error", err))
}
ensureStartFlowController(leaderCtx, cli)
},
OnStoppedLeading: func() {
slog.Info("COLLECTOR: Lost leader lock. Stopping status sync and site controller", slog.Any("elapsedTime", strategy.GetElapsedTime()))
Expand Down
Loading