From 4a1a319cf498800c99f7067d62d5e7adf232d2c9 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Fri, 17 Apr 2026 16:07:42 -0400 Subject: [PATCH 01/10] fix: Honor x-ld-fd-fallback header in fdv2 initializer phase (SDK-2203) Prior to this change, the Go SDK only inspected the x-ld-fd-fallback response header on FDv2 synchronizer responses (streaming and polling). If an initializer received the header, the signal was silently dropped and the SDK would continue attempting subsequent initializers and FDv2 synchronizers rather than reverting to FDv1. This diverged from the spec and from the Node SDK's behavior. DataInitializer.Fetch now returns (basis, fallbackToFDv1, err). The FDv2 data system branches on the bool, applying any accompanying Basis before swapping the synchronizer list for the FDv1 fallback builder -- so evaluations can serve the server-provided payload while FDv1 spins up. When no FDv1 fallback is configured, the data system logs and clears the synchronizer list, mirroring the synchronizer-triggered path. A shared isFDv1FallbackRequested helper and fdv1FallbackHeader constant replace the duplicated header-string checks across streaming and polling data sources. --- internal/datasourcev2/helpers.go | 10 +++ internal/datasourcev2/polling_data_source.go | 9 +- .../datasourcev2/polling_data_source_test.go | 86 ++++++++++++++++++- .../datasourcev2/streaming_data_source.go | 6 +- .../streaming_data_source_test.go | 3 +- internal/datasystem/fdv2_datasystem.go | 41 +++++++-- ldclient_end_to_end_fdv2_test.go | 48 +++++++++++ ldfiledatav2/file_data_source_impl.go | 4 +- ldfiledatav2/file_data_source_test.go | 5 +- subsystems/data_source.go | 9 +- testhelpers/ldtestdatav2/test_data_source.go | 5 +- .../ldtestdatav2/test_data_source_test.go | 4 +- 12 files changed, 202 insertions(+), 28 deletions(-) diff --git a/internal/datasourcev2/helpers.go b/internal/datasourcev2/helpers.go index 54a02311..3053f9ab 100644 --- a/internal/datasourcev2/helpers.go +++ b/internal/datasourcev2/helpers.go @@ -12,6 +12,16 @@ import ( "github.com/launchdarkly/go-sdk-common/v3/ldlog" ) +// fdv1FallbackHeader is the response header LaunchDarkly uses to instruct an SDK to abandon the +// FDv2 protocol and fall back to FDv1. +const fdv1FallbackHeader = "X-LD-FD-Fallback" + +// isFDv1FallbackRequested reports whether the response headers signal that the SDK should revert +// to the FDv1 protocol. +func isFDv1FallbackRequested(h http.Header) bool { + return h.Get(fdv1FallbackHeader) == "true" +} + type httpStatusError struct { Message string Code int diff --git a/internal/datasourcev2/polling_data_source.go b/internal/datasourcev2/polling_data_source.go index 253fd2c0..e3185676 100644 --- a/internal/datasourcev2/polling_data_source.go +++ b/internal/datasourcev2/polling_data_source.go @@ -68,13 +68,14 @@ func (pp *PollingProcessor) Name() string { } //nolint:revive // DataInitializer method. -func (pp *PollingProcessor) Fetch(ds subsystems.DataSelector, ctx context.Context) (*subsystems.Basis, error) { +func (pp *PollingProcessor) Fetch(ds subsystems.DataSelector, ctx context.Context) (*subsystems.Basis, bool, error) { changeSet, headers, err := pp.requester.Request(ctx, ds.Selector()) + fallback := isFDv1FallbackRequested(headers) if err != nil { - return nil, err + return nil, fallback, err } environmentID := internal.NewInitMetadataFromHeaders(headers).GetEnvironmentID() - return &subsystems.Basis{ChangeSet: *changeSet, Persist: true, EnvironmentID: environmentID}, nil + return &subsystems.Basis{ChangeSet: *changeSet, Persist: true, EnvironmentID: environmentID}, fallback, nil } //nolint:revive // DataSynchronizer method. @@ -116,7 +117,7 @@ func (pp *PollingProcessor) Sync(ds subsystems.DataSelector) <-chan subsystems.D Time: time.Now(), } - if hse.Header.Get("X-LD-FD-Fallback") == "true" { + if isFDv1FallbackRequested(hse.Header) { resultChan <- subsystems.DataSynchronizerResult{ State: interfaces.DataSourceStateOff, Error: errorInfo, diff --git a/internal/datasourcev2/polling_data_source_test.go b/internal/datasourcev2/polling_data_source_test.go index c2534571..3c6e10fc 100644 --- a/internal/datasourcev2/polling_data_source_test.go +++ b/internal/datasourcev2/polling_data_source_test.go @@ -21,6 +21,7 @@ import ( "github.com/launchdarkly/go-server-sdk/v7/testhelpers/ldservicesv2" "github.com/launchdarkly/go-test-helpers/v3/httphelpers" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) var alwaysTrueFlag = ldbuilders.NewFlagBuilder("always-true-flag").SingleVariation(ldvalue.Bool(true)).Build() @@ -40,7 +41,7 @@ func TestPollingProcessorInitializerCanMakeSuccessfulRequest(t *testing.T) { ) defer processor.Close() - basis, err := processor.Fetch(ds, context.Background()) + basis, _, err := processor.Fetch(ds, context.Background()) assert.NoError(t, err) assert.Len(t, basis.ChangeSet.Changes(), 1) assert.Equal(t, basis.ChangeSet.IntentCode(), subsystems.IntentTransferFull) @@ -67,7 +68,7 @@ func TestPollingProcessorInitializerAppendsFilterParameter(t *testing.T) { }, ) defer processor.Close() - _, err := processor.Fetch(ds, context.Background()) + _, _, err := processor.Fetch(ds, context.Background()) assert.NoError(t, err) r := <-requestsCh @@ -90,7 +91,7 @@ func TestPollingProcessorInitializerAppendsBasisParameter(t *testing.T) { defer processor.Close() ds := mocks.NewMockDataSelector(subsystems.NewSelector("test-state", 1)) - _, err := processor.Fetch(ds, context.Background()) + _, _, err := processor.Fetch(ds, context.Background()) assert.NoError(t, err) r := <-requestsCh @@ -115,7 +116,7 @@ func TestPollingProcessorSynchronizerAppendsFilterParameter(t *testing.T) { ) defer processor.Close() - _, err := processor.Fetch(ds, context.Background()) + _, _, err := processor.Fetch(ds, context.Background()) assert.NoError(t, err) r := <-requestsCh @@ -261,6 +262,83 @@ func TestPollingProcessorSynchronizerHandlesFallbackToFDv2(t *testing.T) { }) } +func TestPollingProcessorInitializerHandlesFallbackOnSuccessfulResponse(t *testing.T) { + ds := mocks.NewMockDataSelector(subsystems.NoSelector()) + data := ldservicesv2.NewServerSDKData().Flags(alwaysTrueFlag).ToInitializerPayload(subsystems.NewSelector("test-state", 1)) + + // Wrap the valid 200 handler to inject X-LD-FD-Fallback alongside a well-formed payload. + underlying := ldservices.ServerSidePollingV2ServiceHandler(data) + fallbackOn200 := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("X-LD-FD-Fallback", "true") + underlying.ServeHTTP(w, r) + }) + + httphelpers.WithServer(fallbackOn200, func(ts *httptest.Server) { + processor := NewPollingProcessor( + sharedtest.BasicClientContext(), + datasource.PollingConfig{ + BaseURI: ts.URL, + PollInterval: time.Minute * 30, + }, + ) + defer processor.Close() + + basis, fallback, err := processor.Fetch(ds, context.Background()) + assert.True(t, fallback) + assert.NoError(t, err) + // Even when the server signals fallback, a valid payload in the same response + // should still be surfaced so the caller can apply it before switching protocols. + require.NotNil(t, basis) + assert.Len(t, basis.ChangeSet.Changes(), 1) + assert.Equal(t, "test-state", basis.ChangeSet.Selector().State()) + }) +} + +func TestPollingProcessorInitializerHandlesFallbackOnErrorResponse(t *testing.T) { + ds := mocks.NewMockDataSelector(subsystems.NoSelector()) + + fallbackHeader := http.Header{ + "X-LD-FD-Fallback": []string{"true"}, + } + handler, _ := httphelpers.RecordingHandler(httphelpers.HandlerWithResponse(500, fallbackHeader, nil)) + httphelpers.WithServer(handler, func(ts *httptest.Server) { + processor := NewPollingProcessor( + sharedtest.BasicClientContext(), + datasource.PollingConfig{ + BaseURI: ts.URL, + PollInterval: time.Minute * 30, + }, + ) + defer processor.Close() + + basis, fallback, err := processor.Fetch(ds, context.Background()) + assert.Nil(t, basis) + assert.True(t, fallback) + assert.Error(t, err, "underlying HTTP error should still be surfaced alongside the fallback signal") + }) +} + +func TestPollingProcessorInitializerErrorWithoutFallbackHeaderReturnsRegularError(t *testing.T) { + ds := mocks.NewMockDataSelector(subsystems.NoSelector()) + + handler, _ := httphelpers.RecordingHandler(httphelpers.HandlerWithStatus(500)) + httphelpers.WithServer(handler, func(ts *httptest.Server) { + processor := NewPollingProcessor( + sharedtest.BasicClientContext(), + datasource.PollingConfig{ + BaseURI: ts.URL, + PollInterval: time.Minute * 30, + }, + ) + defer processor.Close() + + basis, fallback, err := processor.Fetch(ds, context.Background()) + assert.Nil(t, basis) + assert.False(t, fallback) + assert.Error(t, err) + }) +} + func TestPollingProcessorSynchronizerHandlesRecoverableErrors(t *testing.T) { for _, statusCode := range []int{400, 408, 429, 500, 503} { t.Run(fmt.Sprintf("handles recoverable error %d", statusCode), func(t *testing.T) { diff --git a/internal/datasourcev2/streaming_data_source.go b/internal/datasourcev2/streaming_data_source.go index d76d7cd8..e081b7e8 100644 --- a/internal/datasourcev2/streaming_data_source.go +++ b/internal/datasourcev2/streaming_data_source.go @@ -106,8 +106,8 @@ func (sp *StreamProcessor) Name() string { } //nolint:revive // DataInitializer method. -func (sp *StreamProcessor) Fetch(ds subsystems.DataSelector, _ context.Context) (*subsystems.Basis, error) { - return nil, errors.New("StreamProcessor does not implement Fetch capability") +func (sp *StreamProcessor) Fetch(ds subsystems.DataSelector, _ context.Context) (*subsystems.Basis, bool, error) { + return nil, false, errors.New("StreamProcessor does not implement Fetch capability") } //nolint:revive // DataSynchronizer method. @@ -353,7 +353,7 @@ func (sp *StreamProcessor) subscribe(ds subsystems.DataSelector, resultChan chan Time: time.Now(), } - if se.Header.Get("X-LD-FD-Fallback") == "true" { + if isFDv1FallbackRequested(se.Header) { resultChan <- subsystems.DataSynchronizerResult{ State: interfaces.DataSourceStateOff, Error: errorInfo, diff --git a/internal/datasourcev2/streaming_data_source_test.go b/internal/datasourcev2/streaming_data_source_test.go index 5bcc1ef8..9cf5d4a0 100644 --- a/internal/datasourcev2/streaming_data_source_test.go +++ b/internal/datasourcev2/streaming_data_source_test.go @@ -60,8 +60,9 @@ func TestStreamingDoesNotWorkAsInitializer(t *testing.T) { ) defer sp.Close() - basis, err := sp.Fetch(ds, context.Background()) + basis, fallback, err := sp.Fetch(ds, context.Background()) assert.Nil(t, basis) + assert.False(t, fallback) assert.NotNil(t, err) } diff --git a/internal/datasystem/fdv2_datasystem.go b/internal/datasystem/fdv2_datasystem.go index 8d279ab1..9c32c44d 100644 --- a/internal/datasystem/fdv2_datasystem.go +++ b/internal/datasystem/fdv2_datasystem.go @@ -205,7 +205,16 @@ func (f *FDv2) launchTask(task func()) { func (f *FDv2) run(ctx context.Context, closeWhenReady chan struct{}) { f.UpdateStatus(interfaces.DataSourceStateInitializing, interfaces.DataSourceErrorInfo{}) - f.runInitializers(ctx, closeWhenReady) + if f.runInitializers(ctx, closeWhenReady) { + if f.fdv1FallbackBuilder != nil { + f.loggers.Warn("Reverting to FDv1 protocol") + f.synchronizerBuilders = []func() (subsystems.DataSynchronizer, error){f.fdv1FallbackBuilder} + f.currentSyncIndex = 0 + } else { + f.loggers.Warn("Initializer requested FDv1 fallback but none configured") + f.synchronizerBuilders = nil + } + } if f.configuredWithDataSources && f.dataStoreStatusProvider.IsStatusMonitoringEnabled() { f.launchTask(func() { @@ -235,12 +244,33 @@ func (f *FDv2) runPersistentStoreOutageRecovery(ctx context.Context, statuses <- } } -func (f *FDv2) runInitializers(ctx context.Context, closeWhenReady chan struct{}) { +// runInitializers runs each configured initializer in order until one succeeds, the context is +// cancelled, or an initializer signals a revert to FDv1. Returns true if an initializer signalled +// a revert to FDv1. If fallback is signalled alongside a valid Basis, that Basis is applied before +// returning so evaluations can serve the server-provided data while the FDv1 synchronizer spins up. +func (f *FDv2) runInitializers(ctx context.Context, closeWhenReady chan struct{}) (fallbackToFDv1 bool) { for _, initializer := range f.initializers { f.loggers.Infof("Attempting to initialize via %s", initializer.Name()) - basis, err := initializer.Fetch(f.store, ctx) + basis, fallback, err := initializer.Fetch(f.store, ctx) if errors.Is(err, context.Canceled) { - return + return false + } + if fallback { + if err != nil { + f.loggers.Warnf("Initializer %s requested fallback to FDv1 protocol: %v", initializer.Name(), err) + } else { + f.loggers.Warnf("Initializer %s requested fallback to FDv1 protocol", initializer.Name()) + } + if basis != nil { + f.environmentIDProvider.SetEnvironmentID(basis.EnvironmentID) + f.store.Apply(basis.ChangeSet, basis.Persist) + if basis.ChangeSet.Selector().IsDefined() { + f.readyOnce.Do(func() { + close(closeWhenReady) + }) + } + } + return true } if err != nil { f.loggers.Warnf("Initializer %s failed: %v", initializer.Name(), err) @@ -253,9 +283,10 @@ func (f *FDv2) runInitializers(ctx context.Context, closeWhenReady chan struct{} f.readyOnce.Do(func() { close(closeWhenReady) }) - return + return false } } + return false } func (f *FDv2) runSynchronizers(ctx context.Context, closeWhenReady chan struct{}) { diff --git a/ldclient_end_to_end_fdv2_test.go b/ldclient_end_to_end_fdv2_test.go index 32c7a92f..15506bdc 100644 --- a/ldclient_end_to_end_fdv2_test.go +++ b/ldclient_end_to_end_fdv2_test.go @@ -116,6 +116,54 @@ func TestFDV2CanFallBackToV1(t *testing.T) { }) } +// When the polling initializer receives x-ld-fd-fallback from the server, the SDK should skip any +// remaining FDv2 synchronizers and switch to the FDv1 polling synchronizer directly — without ever +// attempting the FDv2 streaming synchronizer. +func TestFDV2CanFallBackToV1FromInitializer(t *testing.T) { + dataV1 := ldservices.NewServerSDKData().Flags(alwaysFalseFlag) + + header := http.Header{ + "X-LD-FD-Fallback": []string{"true"}, + } + + // FDv2 polling initializer: returns 500 + fallback header. Must trigger revert to FDv1 before + // the FDv2 streaming synchronizer is ever dialed. + pollV2InitRecordingHandler, pollV2InitReqCh := httphelpers.RecordingHandler(httphelpers.HandlerWithResponse(500, header, nil)) + // FDv1 polling synchronizer: returns valid FDv1 data. + pollV1SyncRecordingHandler, pollV1SyncReqCh := httphelpers.RecordingHandler(ldservices.ServerSidePollingServiceHandler(dataV1)) + // FDv2 streaming synchronizer: should never be hit. If it is, the test will surface it via + // streamV2SyncReqCh. + streamHandler, streamV2SyncReqCh := httphelpers.RecordingHandler(httphelpers.HandlerWithResponse(500, header, nil)) + + handler := httphelpers.SequentialHandler(pollV2InitRecordingHandler, pollV1SyncRecordingHandler, streamHandler) + + httphelpers.WithServer(handler, func(server *httptest.Server) { + logCapture := ldlogtest.NewMockLog() + + config := Config{ + Events: ldcomponents.NoEvents(), + Logging: ldcomponents.Logging().Loggers(logCapture.Loggers), + DataSystem: ldcomponents.DataSystem().WithRelayProxyEndpoints(server.URL).Default(), + } + + client, err := MakeCustomClient(testSdkKey, config, time.Second*5) + + <-pollV2InitReqCh + <-pollV1SyncReqCh + + require.NoError(t, err) + defer client.Close() + + reached := client.GetDataSourceStatusProvider().WaitFor(interfaces.DataSourceStateValid, time.Second*5) + require.True(t, reached, "timed out waiting for data source to reach VALID state") + + assertNoMoreRequests(t, streamV2SyncReqCh) + + value, _ := client.BoolVariation(alwaysFalseFlag.Key, testUser, true) + assert.False(t, value) + }) +} + func TestFDV2StreamingSynchronizer(t *testing.T) { data := ldservicesv2.NewServerSDKData().Flags(alwaysTrueFlag) diff --git a/ldfiledatav2/file_data_source_impl.go b/ldfiledatav2/file_data_source_impl.go index e1496a31..21765bfa 100644 --- a/ldfiledatav2/file_data_source_impl.go +++ b/ldfiledatav2/file_data_source_impl.go @@ -135,7 +135,7 @@ func (fs *fileDataSource) Sync(ds subsystems.DataSelector) <-chan subsystems.Dat return resultChan } -func (fs *fileDataSource) Fetch(ds subsystems.DataSelector, ctx context.Context) (*subsystems.Basis, error) { +func (fs *fileDataSource) Fetch(ds subsystems.DataSelector, ctx context.Context) (*subsystems.Basis, bool, error) { changeSetChan := fs.changeSetBroadcaster.AddListener() statusChan := fs.statusBroadcaster.AddListener() @@ -178,7 +178,7 @@ func (fs *fileDataSource) Fetch(ds subsystems.DataSelector, ctx context.Context) wg.Wait() - return basis, err + return basis, false, err } // Reload tells the data source to immediately attempt to reread all of the configured source files diff --git a/ldfiledatav2/file_data_source_test.go b/ldfiledatav2/file_data_source_test.go index 1f50945f..a5d42d09 100644 --- a/ldfiledatav2/file_data_source_test.go +++ b/ldfiledatav2/file_data_source_test.go @@ -284,8 +284,9 @@ func TestSuccessfullyLoadsJsonFlagsAsInitializer(t *testing.T) { initializer, err := factory.AsInitializer().Build(subsystems.BasicClientContext{}) assert.NoError(t, err) - basis, err := initializer.Fetch(mocks.NewMockDataSelector(subsystems.NoSelector()), context.Background()) + basis, fallback, err := initializer.Fetch(mocks.NewMockDataSelector(subsystems.NoSelector()), context.Background()) assert.NoError(t, err) + assert.False(t, fallback) assert.NotNil(t, basis) assert.Len(t, basis.ChangeSet.Changes(), 2) @@ -310,7 +311,7 @@ func TestInitializerReturnsErrorIfFileDoesNotExist(t *testing.T) { initializer, err := factory.AsInitializer().Build(subsystems.BasicClientContext{}) assert.NoError(t, err) - _, err = initializer.Fetch(mocks.NewMockDataSelector(subsystems.NoSelector()), context.Background()) + _, _, err = initializer.Fetch(mocks.NewMockDataSelector(subsystems.NoSelector()), context.Background()) assert.Error(t, err) }) } diff --git a/subsystems/data_source.go b/subsystems/data_source.go index 82f0cb29..1d8ffc2b 100644 --- a/subsystems/data_source.go +++ b/subsystems/data_source.go @@ -45,9 +45,12 @@ type DataSelector interface { type DataInitializer interface { // Name returns the name of the data initializer. Name() string - // Fetch returns a Basis, or an error if the Basis could not be retrieved. If the context has expired, - // return the context's error. - Fetch(ds DataSelector, ctx context.Context) (*Basis, error) + // Fetch returns a Basis, or an error if the Basis could not be retrieved. If the context has + // expired, it returns the context's error. If the LaunchDarkly server has instructed the SDK + // to fall back to the FDv1 protocol (via the x-ld-fd-fallback response header), fallbackToFDv1 + // is true; in that case the Basis is nil and err may be nil or carry the underlying HTTP + // error for logging. Callers should branch on fallbackToFDv1 before err. + Fetch(ds DataSelector, ctx context.Context) (basis *Basis, fallbackToFDv1 bool, err error) } // DataSynchronizerResult represents the results of a Synchronizer's ongoing Sync method. diff --git a/testhelpers/ldtestdatav2/test_data_source.go b/testhelpers/ldtestdatav2/test_data_source.go index f252ef3e..860c5eae 100644 --- a/testhelpers/ldtestdatav2/test_data_source.go +++ b/testhelpers/ldtestdatav2/test_data_source.go @@ -316,8 +316,9 @@ func (d *testDataSourceImpl) Name() string { return "TestDataSynchronizer" } -func (d *testDataSourceImpl) Fetch(ds subsystems.DataSelector, ctx context.Context) (*subsystems.Basis, error) { - return d.owner.makeBasis() +func (d *testDataSourceImpl) Fetch(ds subsystems.DataSelector, ctx context.Context) (*subsystems.Basis, bool, error) { + basis, err := d.owner.makeBasis() + return basis, false, err } func (d *testDataSourceImpl) Sync(ds subsystems.DataSelector) <-chan subsystems.DataSynchronizerResult { diff --git a/testhelpers/ldtestdatav2/test_data_source_test.go b/testhelpers/ldtestdatav2/test_data_source_test.go index 0710800a..e518ce61 100644 --- a/testhelpers/ldtestdatav2/test_data_source_test.go +++ b/testhelpers/ldtestdatav2/test_data_source_test.go @@ -53,7 +53,7 @@ func TestCanBeUsedAsInitializer(t *testing.T) { assert.NoError(t, err) selector := mocks.NewMockDataSelector(subsystems.NoSelector()) - basis, err := initializer.Fetch(selector, context.Background()) + basis, _, err := initializer.Fetch(selector, context.Background()) assert.NoError(t, err) changes := basis.ChangeSet.Changes() @@ -70,7 +70,7 @@ func TestCanBeUsedAsInitializer(t *testing.T) { td.Update(td.Flag("flag1").On(false)) - basis, err = initializer.Fetch(selector, context.Background()) + basis, _, err = initializer.Fetch(selector, context.Background()) assert.NoError(t, err) changes = basis.ChangeSet.Changes() From 79549e314d6186339a1165b3804aa2333e3cf8f1 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Mon, 20 Apr 2026 08:41:55 -0400 Subject: [PATCH 02/10] fix: Set status to Off when init-triggered fallback has no FDv1 fallback configured If an initializer requested FDv1 fallback but no fdv1FallbackBuilder was configured, the run loop cleared the synchronizer list and logged a warning but never updated the data source status. runSynchronizers then closed closeWhenReady and returned without a status update, leaving the status permanently stuck at Initializing. Mirror the synchronizer-triggered path (syncFDv1 with nil fallback builder) by calling UpdateStatus(DataSourceStateOff, ...) in the initializer-triggered branch as well. Add a regression test that configures a custom data system with only an initializer (no FDv1 fallback) and asserts the status transitions to Off. --- internal/datasystem/fdv2_datasystem.go | 1 + ldclient_end_to_end_fdv2_test.go | 43 ++++++++++++++++++++++++++ 2 files changed, 44 insertions(+) diff --git a/internal/datasystem/fdv2_datasystem.go b/internal/datasystem/fdv2_datasystem.go index 9c32c44d..bb3a9a25 100644 --- a/internal/datasystem/fdv2_datasystem.go +++ b/internal/datasystem/fdv2_datasystem.go @@ -213,6 +213,7 @@ func (f *FDv2) run(ctx context.Context, closeWhenReady chan struct{}) { } else { f.loggers.Warn("Initializer requested FDv1 fallback but none configured") f.synchronizerBuilders = nil + f.UpdateStatus(interfaces.DataSourceStateOff, f.getStatus().LastError) } } diff --git a/ldclient_end_to_end_fdv2_test.go b/ldclient_end_to_end_fdv2_test.go index 15506bdc..f3be9d70 100644 --- a/ldclient_end_to_end_fdv2_test.go +++ b/ldclient_end_to_end_fdv2_test.go @@ -116,6 +116,49 @@ func TestFDV2CanFallBackToV1(t *testing.T) { }) } +// When an initializer requests FDv1 fallback but no FDv1 fallback is configured, the data source +// status must transition to Off rather than staying stuck at Initializing. This mirrors the +// synchronizer-triggered path when fdv1FallbackBuilder is nil. +func TestFDV2InitializerFallbackWithoutFDv1FallbackTransitionsToOff(t *testing.T) { + header := http.Header{ + "X-LD-FD-Fallback": []string{"true"}, + } + handler, requestsCh := httphelpers.RecordingHandler(httphelpers.HandlerWithResponse(500, header, nil)) + + httphelpers.WithServer(handler, func(server *httptest.Server) { + logCapture := ldlogtest.NewMockLog() + + // Custom data system: a polling initializer, no synchronizers, no FDv1 fallback. + config := Config{ + Events: ldcomponents.NoEvents(), + Logging: ldcomponents.Logging().Loggers(logCapture.Loggers), + DataSystem: ldcomponents.DataSystem().Custom().Initializers( + ldcomponents.PollingDataSourceV2().BaseURI(server.URL).AsInitializer(), + ), + } + + client, err := MakeCustomClient(testSdkKey, config, time.Second*5) + require.Error(t, err) + require.NotNil(t, client) + defer client.Close() + + <-requestsCh + + // With no FDv1 fallback configured, an initializer-triggered fallback must transition the + // status to Off — if it stays at Initializing, MakeCustomClient treats it as an init + // failure and we see initializationFailedErrorMessage here. Either way the status field + // should end up Off, so assert that directly. + assert.Equal(t, + interfaces.DataSourceStateOff, + client.GetDataSourceStatusProvider().GetStatus().State, + "status should transition to Off when initializer fallback requested but no FDv1 fallback configured") + assert.Equal(t, initializationFailedErrorMessage, err.Error()) + + assert.Contains(t, logCapture.GetOutput(ldlog.Warn), + "Initializer requested FDv1 fallback but none configured") + }) +} + // When the polling initializer receives x-ld-fd-fallback from the server, the SDK should skip any // remaining FDv2 synchronizers and switch to the FDv1 polling synchronizer directly — without ever // attempting the FDv2 streaming synchronizer. From b62ac9128628b5da1022e937b8ad8e3670bea7ea Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Mon, 20 Apr 2026 11:14:47 -0400 Subject: [PATCH 03/10] chore: Log when initializer payload is applied before FDv1 fallback Prior to this change, when an initializer returned a valid payload alongside the FDv1 fallback signal, the payload was applied to the store silently. Add an Info log so operators can confirm which initializer's data was applied before the protocol switch. --- internal/datasystem/fdv2_datasystem.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/datasystem/fdv2_datasystem.go b/internal/datasystem/fdv2_datasystem.go index bb3a9a25..2392e9ca 100644 --- a/internal/datasystem/fdv2_datasystem.go +++ b/internal/datasystem/fdv2_datasystem.go @@ -266,6 +266,7 @@ func (f *FDv2) runInitializers(ctx context.Context, closeWhenReady chan struct{} f.environmentIDProvider.SetEnvironmentID(basis.EnvironmentID) f.store.Apply(basis.ChangeSet, basis.Persist) if basis.ChangeSet.Selector().IsDefined() { + f.loggers.Infof("Applied payload from %s before falling back to FDv1", initializer.Name()) f.readyOnce.Do(func() { close(closeWhenReady) }) From cf13c4616d6d843d06e19d626c3b7c67dd90bcad Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Mon, 20 Apr 2026 13:46:06 -0400 Subject: [PATCH 04/10] fix: Honor x-ld-fd-fallback on synchronizer success paths The header was checked on initializer responses and on synchronizer error responses, but not when the streaming or polling synchronizer got a 200 with a valid payload. In that case the SDK applied the payload and then kept the FDv2 connection open indefinitely, ignoring the server's request to revert to FDv1. Streaming: consumeStream now tracks whether the response headers carried x-ld-fd-fallback: true, and after any event that emits a Valid result (IntentNone or EventPayloadTransferred) it emits an Off / RevertToFDv1 result and closes the stream. Extracted reportMalformedEvent out of the closure to keep consumeStream within cyclomatic threshold. Polling: the poll() helper now returns (fallback, err). On a successful response whose headers request fallback, it emits the Valid result with any accompanying ChangeSet and then an Off / RevertToFDv1 result; Sync returns from the goroutine. Unit tests cover both the streaming and polling synchronizer success-with-fallback paths. End-to-end test TestFDV2CanFallBackToV1FromStreamingSuccess verifies the full path: streaming sync returns 200 + valid SSE + fallback header, payload is applied, client reverts to FDv1, FDv1 data wins. --- internal/datasourcev2/polling_data_source.go | 29 +++++-- .../datasourcev2/polling_data_source_test.go | 36 ++++++++ .../datasourcev2/streaming_data_source.go | 85 +++++++++++++------ .../streaming_data_source_test.go | 42 +++++++++ ldclient_end_to_end_fdv2_test.go | 52 ++++++++++++ 5 files changed, 213 insertions(+), 31 deletions(-) diff --git a/internal/datasourcev2/polling_data_source.go b/internal/datasourcev2/polling_data_source.go index e3185676..56b1e36a 100644 --- a/internal/datasourcev2/polling_data_source.go +++ b/internal/datasourcev2/polling_data_source.go @@ -107,7 +107,11 @@ func (pp *PollingProcessor) Sync(ds subsystems.DataSelector) <-chan subsystems.D close(resultChan) return case <-ticker.C: - if err := pp.poll(ctx, ds, resultChan); err != nil { + fallback, err := pp.poll(ctx, ds, resultChan) + if err == nil && fallback { + return + } + if err != nil { if hse, ok := err.(httpStatusError); ok { environmentID := internal.NewInitMetadataFromHeaders(hse.Header).GetEnvironmentID() @@ -172,21 +176,36 @@ func (pp *PollingProcessor) Sync(ds subsystems.DataSelector) <-chan subsystems.D return resultChan } +// poll performs a single polling request. It returns (fallback, err). A non-nil err indicates the +// request failed; the caller handles it per the existing error path. fallback=true indicates the +// server requested a revert to FDv1 via x-ld-fd-fallback: true on a SUCCESSFUL response; in that +// case poll has already emitted a Valid result with any accompanying ChangeSet, and has also +// emitted an Off/RevertToFDv1 result. The caller should exit the sync goroutine. func (pp *PollingProcessor) poll( ctx context.Context, ds subsystems.DataSelector, resultChan chan<- subsystems.DataSynchronizerResult, -) error { +) (bool, error) { changeSet, headers, err := pp.requester.Request(ctx, ds.Selector()) if err != nil { - return err + return false, err } + environmentID := internal.NewInitMetadataFromHeaders(headers).GetEnvironmentID() resultChan <- subsystems.DataSynchronizerResult{ ChangeSet: changeSet, State: interfaces.DataSourceStateValid, - EnvironmentID: internal.NewInitMetadataFromHeaders(headers).GetEnvironmentID(), + EnvironmentID: environmentID, } - return nil + if isFDv1FallbackRequested(headers) { + resultChan <- subsystems.DataSynchronizerResult{ + State: interfaces.DataSourceStateOff, + RevertToFDv1: true, + EnvironmentID: environmentID, + } + return true, nil + } + + return false, nil } //nolint:revive // no doc comment for standard method diff --git a/internal/datasourcev2/polling_data_source_test.go b/internal/datasourcev2/polling_data_source_test.go index 3c6e10fc..4fdbce70 100644 --- a/internal/datasourcev2/polling_data_source_test.go +++ b/internal/datasourcev2/polling_data_source_test.go @@ -236,6 +236,42 @@ func TestPollingProcessorSynchronizerHandlesInvalidJSON(t *testing.T) { }) } +func TestPollingProcessorSynchronizerHandlesFallbackOnSuccessfulResponse(t *testing.T) { + ds := mocks.NewMockDataSelector(subsystems.NoSelector()) + data := ldservicesv2.NewServerSDKData().Flags(alwaysTrueFlag).ToInitializerPayload(subsystems.NewSelector("test-state", 1)) + + // Wrap the valid 200 handler so the response also carries x-ld-fd-fallback: true. + underlying := ldservices.ServerSidePollingV2ServiceHandler(data) + fallbackOnSuccess := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("X-LD-FD-Fallback", "true") + underlying.ServeHTTP(w, r) + }) + + httphelpers.WithServer(fallbackOnSuccess, func(ts *httptest.Server) { + processor := NewPollingProcessor( + sharedtest.BasicClientContext(), + datasource.PollingConfig{ + BaseURI: ts.URL, + PollInterval: time.Minute * 30, + }, + ) + defer processor.Close() + + resultChan := processor.Sync(ds) + + // First result: valid payload applied before switching protocols. + first := <-resultChan + assert.Equal(t, interfaces.DataSourceStateValid, first.State) + require.NotNil(t, first.ChangeSet) + assert.Len(t, first.ChangeSet.Changes(), 1) + + // Second result: FDv1 fallback. + second := <-resultChan + assert.Equal(t, interfaces.DataSourceStateOff, second.State) + assert.True(t, second.RevertToFDv1) + }) +} + func TestPollingProcessorSynchronizerHandlesFallbackToFDv2(t *testing.T) { ds := mocks.NewMockDataSelector(subsystems.NoSelector()) diff --git a/internal/datasourcev2/streaming_data_source.go b/internal/datasourcev2/streaming_data_source.go index e081b7e8..0fc759b8 100644 --- a/internal/datasourcev2/streaming_data_source.go +++ b/internal/datasourcev2/streaming_data_source.go @@ -126,6 +126,38 @@ func (sp *StreamProcessor) Sync(ds subsystems.DataSelector) <-chan subsystems.Da return resultChan } +// reportMalformedEvent logs a malformed-event error and pushes an Interrupted result on resultChan. +// Callers are responsible for resetting their own change-set builder and triggering a restart. +func (sp *StreamProcessor) reportMalformedEvent( + event es.Event, + err error, + environmentID ldvalue.OptionalString, + resultChan chan<- subsystems.DataSynchronizerResult, +) { + if event == nil { + sp.loggers.Errorf( + "Received streaming events with malformed JSON data (%s); will restart stream", + err, + ) + } else { + sp.loggers.Errorf( + "Received streaming \"%s\" event with malformed JSON data (%s); will restart stream", + event.Event(), + err, + ) + } + + resultChan <- subsystems.DataSynchronizerResult{ + State: interfaces.DataSourceStateInterrupted, + Error: interfaces.DataSourceErrorInfo{ + Kind: interfaces.DataSourceErrorKindInvalidData, + Message: err.Error(), + Time: time.Now(), + }, + EnvironmentID: environmentID, + } +} + func (sp *StreamProcessor) consumeStream(stream *es.Stream, resultChan chan<- subsystems.DataSynchronizerResult) { // Consume remaining Events and Errors so we can garbage collect defer func() { @@ -139,6 +171,10 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, resultChan chan<- su changeSetBuilder := subsystems.NewChangeSetBuilder() environmentID := ldvalue.OptionalString{} + // fallbackRequested is set when the server's response headers carry x-ld-fd-fallback: true. + // We finish applying the current payload before emitting the fallback signal, so evaluations + // can serve the server-provided data while FDv1 takes over. + fallbackRequested := false for { select { @@ -156,39 +192,20 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, resultChan chan<- su sp.logConnectionResult(true) shouldRestart := false + payloadApplied := false if eventWithHeaders, ok := event.(es.EventWithHeaders); ok { - environmentID = internal.NewInitMetadataFromHeaders(eventWithHeaders.Headers()).GetEnvironmentID() + headers := eventWithHeaders.Headers() + environmentID = internal.NewInitMetadataFromHeaders(headers).GetEnvironmentID() + if isFDv1FallbackRequested(headers) { + fallbackRequested = true + } } gotMalformedEvent := func(event es.Event, err error) { // The protocol should "forget" anything that happens upon receiving an error. changeSetBuilder = subsystems.NewChangeSetBuilder() - - if event == nil { - sp.loggers.Errorf( - "Received streaming events with malformed JSON data (%s); will restart stream", - err, - ) - } else { - sp.loggers.Errorf( - "Received streaming \"%s\" event with malformed JSON data (%s); will restart stream", - event.Event(), - err, - ) - } - - errorInfo := interfaces.DataSourceErrorInfo{ - Kind: interfaces.DataSourceErrorKindInvalidData, - Message: err.Error(), - Time: time.Now(), - } - resultChan <- subsystems.DataSynchronizerResult{ - State: interfaces.DataSourceStateInterrupted, - Error: errorInfo, - EnvironmentID: environmentID, - } - + sp.reportMalformedEvent(event, err, environmentID, resultChan) shouldRestart = true // scenario 1 in error handling comments at top of file } @@ -218,6 +235,7 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, resultChan chan<- su State: interfaces.DataSourceStateValid, EnvironmentID: environmentID, } + payloadApplied = true break } @@ -286,6 +304,7 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, resultChan chan<- su State: interfaces.DataSourceStateValid, EnvironmentID: environmentID, } + payloadApplied = true default: sp.loggers.Infof("Unexpected event found in stream: %s", event.Event()) @@ -295,6 +314,20 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, resultChan chan<- su stream.Restart() } + // Honor a pending FDv1 fallback signal once a payload has been applied this + // iteration — evaluations can now serve the server-provided data while FDv1 takes + // over. Events that don't complete a payload (ServerIntent with a transfer code, + // PutObject, DeleteObject, etc.) leave payloadApplied false so we keep consuming. + if fallbackRequested && payloadApplied { + resultChan <- subsystems.DataSynchronizerResult{ + State: interfaces.DataSourceStateOff, + RevertToFDv1: true, + EnvironmentID: environmentID, + } + stream.Close() + return + } + case <-sp.halt: stream.Close() return diff --git a/internal/datasourcev2/streaming_data_source_test.go b/internal/datasourcev2/streaming_data_source_test.go index 9cf5d4a0..010c145c 100644 --- a/internal/datasourcev2/streaming_data_source_test.go +++ b/internal/datasourcev2/streaming_data_source_test.go @@ -186,6 +186,48 @@ func TestStreamingProcessorHandlesFallbackToFDv1(t *testing.T) { }) } +func TestStreamingProcessorHandlesFallbackOnSuccessfulResponse(t *testing.T) { + data := ldservicesv2.NewServerSDKData().Flags(alwaysTrueFlag) + protocol := ldservicesv2.NewStreamingProtocol(). + WithIntent(subsystems.ServerIntent{Payload: subsystems.Payload{ + ID: "something-id", Target: 0, Code: subsystems.IntentTransferFull, Reason: "payload-missing", + }}). + WithPutObjects(data.ToPutObjects()). + WithTransferred("updated-state", 2) + streamHandler, _ := ldservices.ServerSideStreamingV2ServiceProtocolHandler(protocol) + + // Wrap the valid SSE handler so the response carries x-ld-fd-fallback: true. + fallbackOnSuccess := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("X-LD-FD-Fallback", "true") + streamHandler.ServeHTTP(w, r) + }) + + ds := mocks.NewMockDataSelector(subsystems.NoSelector()) + httphelpers.WithServer(fallbackOnSuccess, func(ts *httptest.Server) { + sp := NewStreamProcessor( + sharedtest.BasicClientContext(), + datasource.StreamConfig{ + URI: ts.URL, + InitialReconnectDelay: time.Millisecond * 50, + }, + ) + + defer sp.Close() + resultChan := sp.Sync(ds) + + // First result: the valid payload is still applied before we switch protocols. + first := <-resultChan + assert.Equal(t, interfaces.DataSourceStateValid, first.State) + assert.NotNil(t, first.ChangeSet) + assert.Len(t, first.ChangeSet.Changes(), 1) + + // Second result: FDv1 fallback. + second := <-resultChan + assert.Equal(t, interfaces.DataSourceStateOff, second.State) + assert.True(t, second.RevertToFDv1) + }) +} + func TestStreamingProcessorPreClosingShouldShutdownImmediately(t *testing.T) { ds := mocks.NewMockDataSelector(subsystems.NoSelector()) handler, _ := httphelpers.RecordingHandler(httphelpers.HandlerWithStatus(401)) // we don't care about getting valid stream data diff --git a/ldclient_end_to_end_fdv2_test.go b/ldclient_end_to_end_fdv2_test.go index f3be9d70..2fae690c 100644 --- a/ldclient_end_to_end_fdv2_test.go +++ b/ldclient_end_to_end_fdv2_test.go @@ -159,6 +159,58 @@ func TestFDV2InitializerFallbackWithoutFDv1FallbackTransitionsToOff(t *testing.T }) } +// When the streaming synchronizer receives a 200 response that carries both a valid SSE payload +// AND the x-ld-fd-fallback header, the SDK should apply the payload and then revert to FDv1. +// Without this behavior, the stream stays open against the FDv2 endpoint indefinitely. +func TestFDV2CanFallBackToV1FromStreamingSuccess(t *testing.T) { + dataV1 := ldservices.NewServerSDKData().Flags(alwaysFalseFlag) + dataV2 := ldservicesv2.NewServerSDKData().Flags(alwaysTrueFlag) + + protocol := ldservicesv2.NewStreamingProtocol(). + WithIntent(subsystems.ServerIntent{Payload: subsystems.Payload{ + ID: "fake-id", Target: 0, Code: subsystems.IntentTransferFull, Reason: "payload-missing", + }}). + WithPutObjects(dataV2.ToPutObjects()). + WithTransferred("state", 1) + + streamV2Handler, _ := ldservices.ServerSideStreamingV2ServiceProtocolHandler(protocol) + streamV2WithFallback := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("X-LD-FD-Fallback", "true") + streamV2Handler.ServeHTTP(w, r) + }) + + // Init phase: FDv2 poll returns 500. Sync phase: FDv2 stream returns valid SSE + fallback + // header. FDv1 fallback phase: FDv1 poll returns the V1 data (always-false flag). + pollV2InitHandler, _ := httphelpers.RecordingHandler(httphelpers.HandlerWithStatus(500)) + streamRecordingHandler, streamV2ReqCh := httphelpers.RecordingHandler(streamV2WithFallback) + pollV1SyncHandler, pollV1SyncReqCh := httphelpers.RecordingHandler(ldservices.ServerSidePollingServiceHandler(dataV1)) + + handler := httphelpers.SequentialHandler(pollV2InitHandler, streamRecordingHandler, pollV1SyncHandler) + + httphelpers.WithServer(handler, func(server *httptest.Server) { + logCapture := ldlogtest.NewMockLog() + + config := Config{ + Events: ldcomponents.NoEvents(), + Logging: ldcomponents.Logging().Loggers(logCapture.Loggers), + DataSystem: ldcomponents.DataSystem().WithRelayProxyEndpoints(server.URL).Default(), + } + + client, err := MakeCustomClient(testSdkKey, config, time.Second*5) + <-streamV2ReqCh + <-pollV1SyncReqCh + require.NoError(t, err) + defer client.Close() + + reached := client.GetDataSourceStatusProvider().WaitFor(interfaces.DataSourceStateValid, time.Second*5) + require.True(t, reached, "timed out waiting for data source to reach VALID state") + + // FDv1 data should win: alwaysFalseFlag is true-defaulted to check it flipped to false. + value, _ := client.BoolVariation(alwaysFalseFlag.Key, testUser, true) + assert.False(t, value) + }) +} + // When the polling initializer receives x-ld-fd-fallback from the server, the SDK should skip any // remaining FDv2 synchronizers and switch to the FDv1 polling synchronizer directly — without ever // attempting the FDv2 streaming synchronizer. From 65901beb40c3e27855e4576ee2c679cb9627f7c2 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Mon, 20 Apr 2026 14:08:51 -0400 Subject: [PATCH 05/10] refactor: Surface polling success-path fallback emission in Sync goroutine Previously poll() emitted both the Valid result and the Off/RevertToFDv1 result internally, so the Sync goroutine's exit on fallback looked like a bare `return` with no visible status signal. That made the emission easy to miss when reading the diff. Move the Off/RevertToFDv1 emission out of poll() and into the Sync goroutine, next to where the error-path fallback already emits the same signal at line 130. poll() now returns (environmentID, fallback, err): it still emits the Valid result on success, but the fallback emission happens at the Sync call site alongside the return. No behavior change. --- internal/datasourcev2/polling_data_source.go | 34 +++++++++----------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/internal/datasourcev2/polling_data_source.go b/internal/datasourcev2/polling_data_source.go index 56b1e36a..d0a056cd 100644 --- a/internal/datasourcev2/polling_data_source.go +++ b/internal/datasourcev2/polling_data_source.go @@ -7,6 +7,7 @@ import ( "time" "github.com/launchdarkly/go-sdk-common/v3/ldlog" + "github.com/launchdarkly/go-sdk-common/v3/ldvalue" "github.com/launchdarkly/go-server-sdk/v7/interfaces" "github.com/launchdarkly/go-server-sdk/v7/internal" "github.com/launchdarkly/go-server-sdk/v7/internal/datasource" @@ -107,8 +108,13 @@ func (pp *PollingProcessor) Sync(ds subsystems.DataSelector) <-chan subsystems.D close(resultChan) return case <-ticker.C: - fallback, err := pp.poll(ctx, ds, resultChan) + fallbackEnvID, fallback, err := pp.poll(ctx, ds, resultChan) if err == nil && fallback { + resultChan <- subsystems.DataSynchronizerResult{ + State: interfaces.DataSourceStateOff, + RevertToFDv1: true, + EnvironmentID: fallbackEnvID, + } return } if err != nil { @@ -176,17 +182,18 @@ func (pp *PollingProcessor) Sync(ds subsystems.DataSelector) <-chan subsystems.D return resultChan } -// poll performs a single polling request. It returns (fallback, err). A non-nil err indicates the -// request failed; the caller handles it per the existing error path. fallback=true indicates the -// server requested a revert to FDv1 via x-ld-fd-fallback: true on a SUCCESSFUL response; in that -// case poll has already emitted a Valid result with any accompanying ChangeSet, and has also -// emitted an Off/RevertToFDv1 result. The caller should exit the sync goroutine. +// poll performs a single polling request. On success it emits a Valid result with any +// accompanying ChangeSet. It returns (environmentID, fallback, err) where fallback=true indicates +// the response headers carried x-ld-fd-fallback: true; in that case the caller is responsible +// for emitting the Off/RevertToFDv1 signal and exiting the sync goroutine. A non-nil err means +// the request failed and no Valid result was emitted; the caller handles it per the existing +// error path. func (pp *PollingProcessor) poll( ctx context.Context, ds subsystems.DataSelector, resultChan chan<- subsystems.DataSynchronizerResult, -) (bool, error) { +) (ldvalue.OptionalString, bool, error) { changeSet, headers, err := pp.requester.Request(ctx, ds.Selector()) if err != nil { - return false, err + return ldvalue.OptionalString{}, false, err } environmentID := internal.NewInitMetadataFromHeaders(headers).GetEnvironmentID() @@ -196,16 +203,7 @@ func (pp *PollingProcessor) poll( EnvironmentID: environmentID, } - if isFDv1FallbackRequested(headers) { - resultChan <- subsystems.DataSynchronizerResult{ - State: interfaces.DataSourceStateOff, - RevertToFDv1: true, - EnvironmentID: environmentID, - } - return true, nil - } - - return false, nil + return environmentID, isFDv1FallbackRequested(headers), nil } //nolint:revive // no doc comment for standard method From 1aec421f2cd0227ab1ab40bacebdc3ae3730842f Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Mon, 20 Apr 2026 16:56:36 -0400 Subject: [PATCH 06/10] fix: Preserve initializer error on Off status during FDv1 fallback When an initializer requested FDv1 fallback but no fdv1FallbackBuilder was configured, the Off status was published with a zero-value DataSourceErrorInfo because the initializer's underlying error was only logged via Warnf, never recorded in status. The synchronizer path does not have this problem: consumeSynchronizerResults calls UpdateStatus(Interrupted, result.Error) before hitting the same UpdateStatus(Off, LastError) call, so LastError is populated. Thread the error info back from runInitializers to the caller so it can be passed directly to UpdateStatus when transitioning to Off. When fallback accompanies a successful response (no HTTP error), errorInfo stays empty, matching the synchronizer path's behavior for that case. Extend TestFDV2InitializerFallbackWithoutFDv1FallbackTransitionsToOff to assert LastError is non-empty so programmatic status monitors can see why the data source shut down. --- internal/datasystem/fdv2_datasystem.go | 30 +++++++++++++++++--------- ldclient_end_to_end_fdv2_test.go | 7 +++++- 2 files changed, 26 insertions(+), 11 deletions(-) diff --git a/internal/datasystem/fdv2_datasystem.go b/internal/datasystem/fdv2_datasystem.go index 2392e9ca..342f9b4d 100644 --- a/internal/datasystem/fdv2_datasystem.go +++ b/internal/datasystem/fdv2_datasystem.go @@ -205,7 +205,7 @@ func (f *FDv2) launchTask(task func()) { func (f *FDv2) run(ctx context.Context, closeWhenReady chan struct{}) { f.UpdateStatus(interfaces.DataSourceStateInitializing, interfaces.DataSourceErrorInfo{}) - if f.runInitializers(ctx, closeWhenReady) { + if fallback, errorInfo := f.runInitializers(ctx, closeWhenReady); fallback { if f.fdv1FallbackBuilder != nil { f.loggers.Warn("Reverting to FDv1 protocol") f.synchronizerBuilders = []func() (subsystems.DataSynchronizer, error){f.fdv1FallbackBuilder} @@ -213,7 +213,7 @@ func (f *FDv2) run(ctx context.Context, closeWhenReady chan struct{}) { } else { f.loggers.Warn("Initializer requested FDv1 fallback but none configured") f.synchronizerBuilders = nil - f.UpdateStatus(interfaces.DataSourceStateOff, f.getStatus().LastError) + f.UpdateStatus(interfaces.DataSourceStateOff, errorInfo) } } @@ -246,19 +246,29 @@ func (f *FDv2) runPersistentStoreOutageRecovery(ctx context.Context, statuses <- } // runInitializers runs each configured initializer in order until one succeeds, the context is -// cancelled, or an initializer signals a revert to FDv1. Returns true if an initializer signalled -// a revert to FDv1. If fallback is signalled alongside a valid Basis, that Basis is applied before -// returning so evaluations can serve the server-provided data while the FDv1 synchronizer spins up. -func (f *FDv2) runInitializers(ctx context.Context, closeWhenReady chan struct{}) (fallbackToFDv1 bool) { +// cancelled, or an initializer signals a revert to FDv1. Returns (fallbackToFDv1, errorInfo): +// fallbackToFDv1 is true when an initializer asked the SDK to switch to FDv1; errorInfo describes +// the underlying error for status reporting when no FDv1 fallback is configured (empty when the +// fallback accompanied a successful response). If fallback is signalled alongside a valid Basis, +// that Basis is applied before returning so evaluations can serve the server-provided data while +// the FDv1 synchronizer spins up. +func (f *FDv2) runInitializers( + ctx context.Context, closeWhenReady chan struct{}, +) (fallbackToFDv1 bool, errorInfo interfaces.DataSourceErrorInfo) { for _, initializer := range f.initializers { f.loggers.Infof("Attempting to initialize via %s", initializer.Name()) basis, fallback, err := initializer.Fetch(f.store, ctx) if errors.Is(err, context.Canceled) { - return false + return false, interfaces.DataSourceErrorInfo{} } if fallback { if err != nil { f.loggers.Warnf("Initializer %s requested fallback to FDv1 protocol: %v", initializer.Name(), err) + errorInfo = interfaces.DataSourceErrorInfo{ + Kind: interfaces.DataSourceErrorKindUnknown, + Message: err.Error(), + Time: time.Now(), + } } else { f.loggers.Warnf("Initializer %s requested fallback to FDv1 protocol", initializer.Name()) } @@ -272,7 +282,7 @@ func (f *FDv2) runInitializers(ctx context.Context, closeWhenReady chan struct{} }) } } - return true + return true, errorInfo } if err != nil { f.loggers.Warnf("Initializer %s failed: %v", initializer.Name(), err) @@ -285,10 +295,10 @@ func (f *FDv2) runInitializers(ctx context.Context, closeWhenReady chan struct{} f.readyOnce.Do(func() { close(closeWhenReady) }) - return false + return false, interfaces.DataSourceErrorInfo{} } } - return false + return false, interfaces.DataSourceErrorInfo{} } func (f *FDv2) runSynchronizers(ctx context.Context, closeWhenReady chan struct{}) { diff --git a/ldclient_end_to_end_fdv2_test.go b/ldclient_end_to_end_fdv2_test.go index 2fae690c..56888b83 100644 --- a/ldclient_end_to_end_fdv2_test.go +++ b/ldclient_end_to_end_fdv2_test.go @@ -148,10 +148,15 @@ func TestFDV2InitializerFallbackWithoutFDv1FallbackTransitionsToOff(t *testing.T // status to Off — if it stays at Initializing, MakeCustomClient treats it as an init // failure and we see initializationFailedErrorMessage here. Either way the status field // should end up Off, so assert that directly. + status := client.GetDataSourceStatusProvider().GetStatus() assert.Equal(t, interfaces.DataSourceStateOff, - client.GetDataSourceStatusProvider().GetStatus().State, + status.State, "status should transition to Off when initializer fallback requested but no FDv1 fallback configured") + // The underlying initializer error must be preserved on the Off status so programmatic + // monitors can see why the data source shut down, not just that it did. + assert.NotEqual(t, interfaces.DataSourceErrorInfo{}, status.LastError, + "LastError should carry the initializer error that accompanied the fallback signal") assert.Equal(t, initializationFailedErrorMessage, err.Error()) assert.Contains(t, logCapture.GetOutput(ldlog.Warn), From 6b6913f265fcee8f50fa41ddb448a1f817293024 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Tue, 21 Apr 2026 11:04:14 -0400 Subject: [PATCH 07/10] refactor: Bundle success-path RevertToFDv1 onto the Valid result MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Following review feedback, merge the two-emit dance (Valid with ChangeSet, then Off with RevertToFDv1) into a single DataSynchronizerResult that carries both the payload and the fallback signal. This matches how the error-path fallback has always worked — one result with RevertToFDv1 set — and removes an unnecessary status dip through Interrupted. Consumer (consumeSynchronizerResults) now honors RevertToFDv1 on Valid or Interrupted results too, not just Off. On Valid with RevertToFDv1, the ChangeSet is applied, status transitions to Valid, and control returns to runSynchronizers which swaps to the FDv1 fallback builder. Polling poll() returns (fallback, err) and emits a single Valid result carrying the ChangeSet and RevertToFDv1. Streaming consumeStream sets RevertToFDv1 on the Valid emit at IntentNone and EventPayloadTransferred when the response headers requested fallback; the stream is closed at the end of the iteration. Updated unit tests to assert a single result with all three of: State=Valid, ChangeSet populated, RevertToFDv1=true. The e2e test now uses assert.Eventually to wait for FDv1 data to replace FDv2 data, since there's no longer a status transition to synchronize on. --- internal/datasourcev2/polling_data_source.go | 33 +++++++++---------- .../datasourcev2/polling_data_source_test.go | 17 ++++------ .../datasourcev2/streaming_data_source.go | 15 ++++----- .../streaming_data_source_test.go | 17 ++++------ internal/datasystem/fdv2_datasystem.go | 8 +++++ ldclient_end_to_end_fdv2_test.go | 10 ++++-- 6 files changed, 50 insertions(+), 50 deletions(-) diff --git a/internal/datasourcev2/polling_data_source.go b/internal/datasourcev2/polling_data_source.go index d0a056cd..a6639915 100644 --- a/internal/datasourcev2/polling_data_source.go +++ b/internal/datasourcev2/polling_data_source.go @@ -7,7 +7,6 @@ import ( "time" "github.com/launchdarkly/go-sdk-common/v3/ldlog" - "github.com/launchdarkly/go-sdk-common/v3/ldvalue" "github.com/launchdarkly/go-server-sdk/v7/interfaces" "github.com/launchdarkly/go-server-sdk/v7/internal" "github.com/launchdarkly/go-server-sdk/v7/internal/datasource" @@ -108,13 +107,10 @@ func (pp *PollingProcessor) Sync(ds subsystems.DataSelector) <-chan subsystems.D close(resultChan) return case <-ticker.C: - fallbackEnvID, fallback, err := pp.poll(ctx, ds, resultChan) + fallback, err := pp.poll(ctx, ds, resultChan) if err == nil && fallback { - resultChan <- subsystems.DataSynchronizerResult{ - State: interfaces.DataSourceStateOff, - RevertToFDv1: true, - EnvironmentID: fallbackEnvID, - } + // poll already emitted a Valid result carrying RevertToFDv1=true; the + // consumer will apply any ChangeSet and then switch to the FDv1 fallback. return } if err != nil { @@ -182,28 +178,29 @@ func (pp *PollingProcessor) Sync(ds subsystems.DataSelector) <-chan subsystems.D return resultChan } -// poll performs a single polling request. On success it emits a Valid result with any -// accompanying ChangeSet. It returns (environmentID, fallback, err) where fallback=true indicates -// the response headers carried x-ld-fd-fallback: true; in that case the caller is responsible -// for emitting the Off/RevertToFDv1 signal and exiting the sync goroutine. A non-nil err means -// the request failed and no Valid result was emitted; the caller handles it per the existing -// error path. +// poll performs a single polling request and emits a Valid result on success. It returns +// (fallback, err). fallback=true indicates the response headers carried x-ld-fd-fallback: true; +// the emitted Valid result has RevertToFDv1=true set, so the consumer will apply any ChangeSet +// and then switch to the FDv1 fallback synchronizer. The caller should exit the sync goroutine +// on fallback=true. A non-nil err means the request failed and no Valid result was emitted; the +// caller handles it per the existing error path. func (pp *PollingProcessor) poll( ctx context.Context, ds subsystems.DataSelector, resultChan chan<- subsystems.DataSynchronizerResult, -) (ldvalue.OptionalString, bool, error) { +) (bool, error) { changeSet, headers, err := pp.requester.Request(ctx, ds.Selector()) if err != nil { - return ldvalue.OptionalString{}, false, err + return false, err } - environmentID := internal.NewInitMetadataFromHeaders(headers).GetEnvironmentID() + fallback := isFDv1FallbackRequested(headers) resultChan <- subsystems.DataSynchronizerResult{ ChangeSet: changeSet, State: interfaces.DataSourceStateValid, - EnvironmentID: environmentID, + EnvironmentID: internal.NewInitMetadataFromHeaders(headers).GetEnvironmentID(), + RevertToFDv1: fallback, } - return environmentID, isFDv1FallbackRequested(headers), nil + return fallback, nil } //nolint:revive // no doc comment for standard method diff --git a/internal/datasourcev2/polling_data_source_test.go b/internal/datasourcev2/polling_data_source_test.go index 4fdbce70..a300fca5 100644 --- a/internal/datasourcev2/polling_data_source_test.go +++ b/internal/datasourcev2/polling_data_source_test.go @@ -259,16 +259,13 @@ func TestPollingProcessorSynchronizerHandlesFallbackOnSuccessfulResponse(t *test resultChan := processor.Sync(ds) - // First result: valid payload applied before switching protocols. - first := <-resultChan - assert.Equal(t, interfaces.DataSourceStateValid, first.State) - require.NotNil(t, first.ChangeSet) - assert.Len(t, first.ChangeSet.Changes(), 1) - - // Second result: FDv1 fallback. - second := <-resultChan - assert.Equal(t, interfaces.DataSourceStateOff, second.State) - assert.True(t, second.RevertToFDv1) + // A single Valid result carries both the payload and the RevertToFDv1 signal — the + // consumer applies the ChangeSet first, then switches to the FDv1 synchronizer. + result := <-resultChan + assert.Equal(t, interfaces.DataSourceStateValid, result.State) + require.NotNil(t, result.ChangeSet) + assert.Len(t, result.ChangeSet.Changes(), 1) + assert.True(t, result.RevertToFDv1) }) } diff --git a/internal/datasourcev2/streaming_data_source.go b/internal/datasourcev2/streaming_data_source.go index 0fc759b8..ec664c6a 100644 --- a/internal/datasourcev2/streaming_data_source.go +++ b/internal/datasourcev2/streaming_data_source.go @@ -234,6 +234,7 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, resultChan chan<- su resultChan <- subsystems.DataSynchronizerResult{ State: interfaces.DataSourceStateValid, EnvironmentID: environmentID, + RevertToFDv1: fallbackRequested, } payloadApplied = true break @@ -303,6 +304,7 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, resultChan chan<- su ChangeSet: changeSet, State: interfaces.DataSourceStateValid, EnvironmentID: environmentID, + RevertToFDv1: fallbackRequested, } payloadApplied = true @@ -314,16 +316,11 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, resultChan chan<- su stream.Restart() } - // Honor a pending FDv1 fallback signal once a payload has been applied this - // iteration — evaluations can now serve the server-provided data while FDv1 takes - // over. Events that don't complete a payload (ServerIntent with a transfer code, - // PutObject, DeleteObject, etc.) leave payloadApplied false so we keep consuming. + // Once a payload has been applied with a pending FDv1 fallback signal, the Valid + // result emitted above carries RevertToFDv1=true; close the stream so we stop + // consuming. Events that don't complete a payload leave payloadApplied false so we + // keep consuming (fallbackRequested persists across iterations). if fallbackRequested && payloadApplied { - resultChan <- subsystems.DataSynchronizerResult{ - State: interfaces.DataSourceStateOff, - RevertToFDv1: true, - EnvironmentID: environmentID, - } stream.Close() return } diff --git a/internal/datasourcev2/streaming_data_source_test.go b/internal/datasourcev2/streaming_data_source_test.go index 010c145c..81973ca7 100644 --- a/internal/datasourcev2/streaming_data_source_test.go +++ b/internal/datasourcev2/streaming_data_source_test.go @@ -215,16 +215,13 @@ func TestStreamingProcessorHandlesFallbackOnSuccessfulResponse(t *testing.T) { defer sp.Close() resultChan := sp.Sync(ds) - // First result: the valid payload is still applied before we switch protocols. - first := <-resultChan - assert.Equal(t, interfaces.DataSourceStateValid, first.State) - assert.NotNil(t, first.ChangeSet) - assert.Len(t, first.ChangeSet.Changes(), 1) - - // Second result: FDv1 fallback. - second := <-resultChan - assert.Equal(t, interfaces.DataSourceStateOff, second.State) - assert.True(t, second.RevertToFDv1) + // A single Valid result carries both the payload and the RevertToFDv1 signal — the + // consumer applies the ChangeSet first, then switches to the FDv1 synchronizer. + result := <-resultChan + assert.Equal(t, interfaces.DataSourceStateValid, result.State) + assert.NotNil(t, result.ChangeSet) + assert.Len(t, result.ChangeSet.Changes(), 1) + assert.True(t, result.RevertToFDv1) }) } diff --git a/internal/datasystem/fdv2_datasystem.go b/internal/datasystem/fdv2_datasystem.go index 342f9b4d..2cef16fa 100644 --- a/internal/datasystem/fdv2_datasystem.go +++ b/internal/datasystem/fdv2_datasystem.go @@ -446,6 +446,14 @@ func (f *FDv2) consumeSynchronizerResults( } return syncRemove, nil } + + // RevertToFDv1 may ride along on a Valid or Interrupted result too — e.g. a + // successful response whose headers also requested the fallback. The Valid/ + // Interrupted branches above already applied any ChangeSet and updated status; + // now hand control to the FDv1 fallback synchronizer. + if result.RevertToFDv1 { + return syncFDv1, nil + } case <-ticker.C: // If there's only one synchronizer, don't check conditions if len(f.synchronizerBuilders) == 1 { diff --git a/ldclient_end_to_end_fdv2_test.go b/ldclient_end_to_end_fdv2_test.go index 56888b83..46da179e 100644 --- a/ldclient_end_to_end_fdv2_test.go +++ b/ldclient_end_to_end_fdv2_test.go @@ -210,9 +210,13 @@ func TestFDV2CanFallBackToV1FromStreamingSuccess(t *testing.T) { reached := client.GetDataSourceStatusProvider().WaitFor(interfaces.DataSourceStateValid, time.Second*5) require.True(t, reached, "timed out waiting for data source to reach VALID state") - // FDv1 data should win: alwaysFalseFlag is true-defaulted to check it flipped to false. - value, _ := client.BoolVariation(alwaysFalseFlag.Key, testUser, true) - assert.False(t, value) + // Status becomes Valid as soon as the FDv2 stream applies its payload (with RevertToFDv1 + // riding along on the same result), which happens before FDv1 has fetched its own data. + // Poll until the flag value reflects FDv1 data to verify the handoff completed. + assert.Eventually(t, func() bool { + value, _ := client.BoolVariation(alwaysFalseFlag.Key, testUser, true) + return value == false + }, time.Second*2, time.Millisecond*10, "expected FDv1 data (value=false) to replace FDv2 data") }) } From 9b1490027e63833021197463b69168d4b69abdb3 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Tue, 21 Apr 2026 13:30:24 -0400 Subject: [PATCH 08/10] fix: Honor fallback header on non-HTTP polling errors too MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit poll() only surfaced the fallback header when the request succeeded; a parse failure on a 200 response or any non-httpStatusError path dropped the signal. The initializer code in Fetch already honored the header regardless of error type — the synchronizer is now consistent. poll() now reads isFDv1FallbackRequested(headers) before checking err, so the returned fallback bool reflects the response regardless of parse success. The Sync goroutine's non-httpStatusError branch now emits Off/RevertToFDv1 on fallback instead of treating the parse failure as a regular interruption that would retry into FDv2. The httpStatusError branch switches from its local header check to the same fallback bool for consistency. New test TestPollingProcessorSynchronizerHandlesFallbackOnMalformedBody exercises the 200 + invalid JSON + fallback header path. --- internal/datasourcev2/polling_data_source.go | 26 +++++++++++------ .../datasourcev2/polling_data_source_test.go | 28 +++++++++++++++++++ 2 files changed, 46 insertions(+), 8 deletions(-) diff --git a/internal/datasourcev2/polling_data_source.go b/internal/datasourcev2/polling_data_source.go index a6639915..c31c1992 100644 --- a/internal/datasourcev2/polling_data_source.go +++ b/internal/datasourcev2/polling_data_source.go @@ -123,7 +123,7 @@ func (pp *PollingProcessor) Sync(ds subsystems.DataSelector) <-chan subsystems.D Time: time.Now(), } - if isFDv1FallbackRequested(hse.Header) { + if fallback { resultChan <- subsystems.DataSynchronizerResult{ State: interfaces.DataSourceStateOff, Error: errorInfo, @@ -163,6 +163,14 @@ func (pp *PollingProcessor) Sync(ds subsystems.DataSelector) <-chan subsystems.D if _, ok := err.(malformedJSONError); ok { errorInfo.Kind = interfaces.DataSourceErrorKindInvalidData } + if fallback { + resultChan <- subsystems.DataSynchronizerResult{ + State: interfaces.DataSourceStateOff, + Error: errorInfo, + RevertToFDv1: true, + } + return + } checkIfErrorIsRecoverableAndLog(pp.loggers, err.Error(), pollingErrorContext, 0, pollingWillRetryMessage) resultChan <- subsystems.DataSynchronizerResult{ State: interfaces.DataSourceStateInterrupted, @@ -179,20 +187,22 @@ func (pp *PollingProcessor) Sync(ds subsystems.DataSelector) <-chan subsystems.D } // poll performs a single polling request and emits a Valid result on success. It returns -// (fallback, err). fallback=true indicates the response headers carried x-ld-fd-fallback: true; -// the emitted Valid result has RevertToFDv1=true set, so the consumer will apply any ChangeSet -// and then switch to the FDv1 fallback synchronizer. The caller should exit the sync goroutine -// on fallback=true. A non-nil err means the request failed and no Valid result was emitted; the -// caller handles it per the existing error path. +// (fallback, err), where fallback reflects the x-ld-fd-fallback response header whether or not +// the request succeeded — a 500 response or a malformed-JSON body can still carry the fallback +// signal, and callers must honor it. On success with fallback=true, the emitted Valid result +// has RevertToFDv1=true set so the consumer will apply any ChangeSet before switching to FDv1. +// A non-nil err means the request failed and no Valid result was emitted; the caller handles +// it per the existing error path but should emit an Off/RevertToFDv1 result when fallback=true +// rather than retrying. func (pp *PollingProcessor) poll( ctx context.Context, ds subsystems.DataSelector, resultChan chan<- subsystems.DataSynchronizerResult, ) (bool, error) { changeSet, headers, err := pp.requester.Request(ctx, ds.Selector()) + fallback := isFDv1FallbackRequested(headers) if err != nil { - return false, err + return fallback, err } - fallback := isFDv1FallbackRequested(headers) resultChan <- subsystems.DataSynchronizerResult{ ChangeSet: changeSet, State: interfaces.DataSourceStateValid, diff --git a/internal/datasourcev2/polling_data_source_test.go b/internal/datasourcev2/polling_data_source_test.go index a300fca5..6074cb28 100644 --- a/internal/datasourcev2/polling_data_source_test.go +++ b/internal/datasourcev2/polling_data_source_test.go @@ -269,6 +269,34 @@ func TestPollingProcessorSynchronizerHandlesFallbackOnSuccessfulResponse(t *test }) } +func TestPollingProcessorSynchronizerHandlesFallbackOnMalformedBody(t *testing.T) { + // 200 OK with invalid JSON and the fallback header — a non-httpStatusError error path that + // must still honor the fallback signal rather than treating the parse failure as a retry. + ds := mocks.NewMockDataSelector(subsystems.NoSelector()) + + fallbackHeader := http.Header{ + "X-LD-FD-Fallback": []string{"true"}, + } + handler, _ := httphelpers.RecordingHandler(httphelpers.HandlerWithResponse(200, fallbackHeader, []byte("not json"))) + httphelpers.WithServer(handler, func(ts *httptest.Server) { + processor := NewPollingProcessor( + sharedtest.BasicClientContext(), + datasource.PollingConfig{ + BaseURI: ts.URL, + PollInterval: time.Minute * 30, + }, + ) + defer processor.Close() + + resultChan := processor.Sync(ds) + result := <-resultChan + + assert.Equal(t, interfaces.DataSourceStateOff, result.State) + assert.Equal(t, interfaces.DataSourceErrorKindInvalidData, result.Error.Kind) + assert.True(t, result.RevertToFDv1) + }) +} + func TestPollingProcessorSynchronizerHandlesFallbackToFDv2(t *testing.T) { ds := mocks.NewMockDataSelector(subsystems.NoSelector()) From 624cfb2a96b967514fd7c394059bda281370b2f7 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Tue, 21 Apr 2026 13:54:16 -0400 Subject: [PATCH 09/10] refactor: Return DataSynchronizerResult from poll() instead of publishing poll() no longer takes resultChan as a parameter. It performs one polling request, builds a DataSynchronizerResult describing the outcome (Valid+ChangeSet on success, Off+Error on failure, with RevertToFDv1 set from the response headers in either case), and returns it alongside the raw error. The caller (Sync's goroutine) decides how to dispatch: emit as-is on success or when fallback was requested, or downgrade the State from Off to Interrupted when the underlying HTTP error is recoverable and no fallback was requested. Recoverability logging stays in Sync where it has the richer "is this transient" context; error classification (Kind, StatusCode) happens in poll where the raw error is observed. poll() is now trivially unit-testable without a channel stub, and Sync's error branches collapse from ~60 lines of inline struct construction to mutation of a pre-built result. --- internal/datasourcev2/polling_data_source.go | 153 +++++++++---------- 1 file changed, 68 insertions(+), 85 deletions(-) diff --git a/internal/datasourcev2/polling_data_source.go b/internal/datasourcev2/polling_data_source.go index c31c1992..2b5df1e1 100644 --- a/internal/datasourcev2/polling_data_source.go +++ b/internal/datasourcev2/polling_data_source.go @@ -107,78 +107,41 @@ func (pp *PollingProcessor) Sync(ds subsystems.DataSelector) <-chan subsystems.D close(resultChan) return case <-ticker.C: - fallback, err := pp.poll(ctx, ds, resultChan) - if err == nil && fallback { - // poll already emitted a Valid result carrying RevertToFDv1=true; the - // consumer will apply any ChangeSet and then switch to the FDv1 fallback. + result, err := pp.poll(ctx, ds) + + // When the server requested FDv1 fallback, dispatch the result as-is — poll has + // already populated State (Valid on success, Off on error) and RevertToFDv1=true. + if result.RevertToFDv1 { + resultChan <- result return } - if err != nil { - if hse, ok := err.(httpStatusError); ok { - environmentID := internal.NewInitMetadataFromHeaders(hse.Header).GetEnvironmentID() - - errorInfo := interfaces.DataSourceErrorInfo{ - Kind: interfaces.DataSourceErrorKindErrorResponse, - StatusCode: hse.Code, - Time: time.Now(), - } - if fallback { - resultChan <- subsystems.DataSynchronizerResult{ - State: interfaces.DataSourceStateOff, - Error: errorInfo, - RevertToFDv1: true, - EnvironmentID: environmentID, - } - return - } + if err == nil { + resultChan <- result + continue + } - recoverable := checkIfErrorIsRecoverableAndLog( - pp.loggers, - httpErrorDescription(hse.Code), - pollingErrorContext, - hse.Code, - pollingWillRetryMessage, - ) - if recoverable { - resultChan <- subsystems.DataSynchronizerResult{ - State: interfaces.DataSourceStateInterrupted, - Error: errorInfo, - EnvironmentID: environmentID, - } - } else { - resultChan <- subsystems.DataSynchronizerResult{ - State: interfaces.DataSourceStateOff, - Error: errorInfo, - EnvironmentID: environmentID, - } - return - } - } else { - errorInfo := interfaces.DataSourceErrorInfo{ - Kind: interfaces.DataSourceErrorKindNetworkError, - Message: err.Error(), - Time: time.Now(), - } - if _, ok := err.(malformedJSONError); ok { - errorInfo.Kind = interfaces.DataSourceErrorKindInvalidData - } - if fallback { - resultChan <- subsystems.DataSynchronizerResult{ - State: interfaces.DataSourceStateOff, - Error: errorInfo, - RevertToFDv1: true, - } - return - } - checkIfErrorIsRecoverableAndLog(pp.loggers, err.Error(), pollingErrorContext, 0, pollingWillRetryMessage) - resultChan <- subsystems.DataSynchronizerResult{ - State: interfaces.DataSourceStateInterrupted, - Error: errorInfo, - } + // Non-fallback error: the caller may downgrade Off → Interrupted when the error + // is recoverable. Log at the appropriate level. + if hse, ok := err.(httpStatusError); ok { + if checkIfErrorIsRecoverableAndLog( + pp.loggers, + httpErrorDescription(hse.Code), + pollingErrorContext, + hse.Code, + pollingWillRetryMessage, + ) { + result.State = interfaces.DataSourceStateInterrupted + resultChan <- result + continue } - continue + resultChan <- result // poll set State=Off + return } + + checkIfErrorIsRecoverableAndLog(pp.loggers, err.Error(), pollingErrorContext, 0, pollingWillRetryMessage) + result.State = interfaces.DataSourceStateInterrupted + resultChan <- result } } }() @@ -186,31 +149,51 @@ func (pp *PollingProcessor) Sync(ds subsystems.DataSelector) <-chan subsystems.D return resultChan } -// poll performs a single polling request and emits a Valid result on success. It returns -// (fallback, err), where fallback reflects the x-ld-fd-fallback response header whether or not -// the request succeeded — a 500 response or a malformed-JSON body can still carry the fallback -// signal, and callers must honor it. On success with fallback=true, the emitted Valid result -// has RevertToFDv1=true set so the consumer will apply any ChangeSet before switching to FDv1. -// A non-nil err means the request failed and no Valid result was emitted; the caller handles -// it per the existing error path but should emit an Off/RevertToFDv1 result when fallback=true -// rather than retrying. +// poll performs a single polling request and builds a DataSynchronizerResult describing the +// outcome. The result's RevertToFDv1 flag is always populated from the x-ld-fd-fallback response +// header, whether or not the request succeeded — a 500 or a malformed-JSON body can still carry +// the fallback signal. +// +// On success: result.State = Valid, result.ChangeSet populated, err = nil. +// On error: result.State = Off (the safer default), result.Error populated with Kind/Message/ +// StatusCode as appropriate, err returned so the caller can apply context-specific logic +// (e.g. downgrade Off → Interrupted when the HTTP error is recoverable). +// +// The caller is responsible for publishing the result to its channel; poll does not touch any +// resultChan so it can be unit-tested in isolation. func (pp *PollingProcessor) poll( - ctx context.Context, ds subsystems.DataSelector, resultChan chan<- subsystems.DataSynchronizerResult, -) (bool, error) { + ctx context.Context, ds subsystems.DataSelector, +) (subsystems.DataSynchronizerResult, error) { changeSet, headers, err := pp.requester.Request(ctx, ds.Selector()) - fallback := isFDv1FallbackRequested(headers) - if err != nil { - return fallback, err + result := subsystems.DataSynchronizerResult{ + EnvironmentID: internal.NewInitMetadataFromHeaders(headers).GetEnvironmentID(), + RevertToFDv1: isFDv1FallbackRequested(headers), } - resultChan <- subsystems.DataSynchronizerResult{ - ChangeSet: changeSet, - State: interfaces.DataSourceStateValid, - EnvironmentID: internal.NewInitMetadataFromHeaders(headers).GetEnvironmentID(), - RevertToFDv1: fallback, + if err == nil { + result.ChangeSet = changeSet + result.State = interfaces.DataSourceStateValid + return result, nil } - return fallback, nil + result.State = interfaces.DataSourceStateOff + if hse, ok := err.(httpStatusError); ok { + result.Error = interfaces.DataSourceErrorInfo{ + Kind: interfaces.DataSourceErrorKindErrorResponse, + StatusCode: hse.Code, + Time: time.Now(), + } + } else { + result.Error = interfaces.DataSourceErrorInfo{ + Kind: interfaces.DataSourceErrorKindNetworkError, + Message: err.Error(), + Time: time.Now(), + } + if _, ok := err.(malformedJSONError); ok { + result.Error.Kind = interfaces.DataSourceErrorKindInvalidData + } + } + return result, err } //nolint:revive // no doc comment for standard method From eb51709f5395164cf23142e472f2dab7b67d5e16 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Tue, 21 Apr 2026 16:18:46 -0400 Subject: [PATCH 10/10] refactor: Rename RevertToFDv1 to FallbackToFDv1 for terminology consistency MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The codebase used "revert" and "fallback" interchangeably for the same concept — the server instructing the SDK to abandon FDv2 and switch to FDv1. Standardize on "fallback" across field names, comments, and log messages. The x-ld-fd-fallback response header uses the word "fallback," so matching SDK terminology keeps things consistent. Renamed: - subsystems.DataSynchronizerResult.RevertToFDv1 -> FallbackToFDv1 - interfaces.DataSynchronizerStatus.RevertToFDv1 -> FallbackToFDv1 Updated the log line "Reverting to FDv1 protocol" to "Falling back to FDv1 protocol" and reworded doc comments throughout. No behavior change. --- interfaces/data_source_status_provider.go | 6 ++--- internal/datasourcev2/helpers.go | 4 ++-- internal/datasourcev2/polling_data_source.go | 10 ++++---- .../datasourcev2/polling_data_source_test.go | 8 +++---- .../datasourcev2/streaming_data_source.go | 24 +++++++++---------- .../streaming_data_source_test.go | 6 ++--- internal/datasystem/fdv2_datasystem.go | 16 ++++++------- ldclient_end_to_end_fdv2_test.go | 6 ++--- ldfiledatav2/file_data_source_impl.go | 2 +- subsystems/data_source.go | 10 ++++---- subsystems/datasystem_configuration.go | 2 +- testhelpers/ldtestdatav2/test_data_source.go | 4 ++-- 12 files changed, 49 insertions(+), 49 deletions(-) diff --git a/interfaces/data_source_status_provider.go b/interfaces/data_source_status_provider.go index 78d5a3ea..999e2f20 100644 --- a/interfaces/data_source_status_provider.go +++ b/interfaces/data_source_status_provider.go @@ -88,9 +88,9 @@ type DataSourceStatusProvider interface { // // Synchronizers are expected to emit these status updates on the channel returned by Sync(). type DataSynchronizerStatus struct { - State DataSourceState - Error DataSourceErrorInfo - RevertToFDv1 bool + State DataSourceState + Error DataSourceErrorInfo + FallbackToFDv1 bool } // DataSourceStatus is information about the data source's status and the last status change. diff --git a/internal/datasourcev2/helpers.go b/internal/datasourcev2/helpers.go index 3053f9ab..d7442420 100644 --- a/internal/datasourcev2/helpers.go +++ b/internal/datasourcev2/helpers.go @@ -16,8 +16,8 @@ import ( // FDv2 protocol and fall back to FDv1. const fdv1FallbackHeader = "X-LD-FD-Fallback" -// isFDv1FallbackRequested reports whether the response headers signal that the SDK should revert -// to the FDv1 protocol. +// isFDv1FallbackRequested reports whether the response headers signal that the SDK should fall +// back to the FDv1 protocol. func isFDv1FallbackRequested(h http.Header) bool { return h.Get(fdv1FallbackHeader) == "true" } diff --git a/internal/datasourcev2/polling_data_source.go b/internal/datasourcev2/polling_data_source.go index 2b5df1e1..7e9dca3d 100644 --- a/internal/datasourcev2/polling_data_source.go +++ b/internal/datasourcev2/polling_data_source.go @@ -110,8 +110,8 @@ func (pp *PollingProcessor) Sync(ds subsystems.DataSelector) <-chan subsystems.D result, err := pp.poll(ctx, ds) // When the server requested FDv1 fallback, dispatch the result as-is — poll has - // already populated State (Valid on success, Off on error) and RevertToFDv1=true. - if result.RevertToFDv1 { + // already populated State (Valid on success, Off on error) and FallbackToFDv1=true. + if result.FallbackToFDv1 { resultChan <- result return } @@ -150,7 +150,7 @@ func (pp *PollingProcessor) Sync(ds subsystems.DataSelector) <-chan subsystems.D } // poll performs a single polling request and builds a DataSynchronizerResult describing the -// outcome. The result's RevertToFDv1 flag is always populated from the x-ld-fd-fallback response +// outcome. The result's FallbackToFDv1 flag is always populated from the x-ld-fd-fallback response // header, whether or not the request succeeded — a 500 or a malformed-JSON body can still carry // the fallback signal. // @@ -166,8 +166,8 @@ func (pp *PollingProcessor) poll( ) (subsystems.DataSynchronizerResult, error) { changeSet, headers, err := pp.requester.Request(ctx, ds.Selector()) result := subsystems.DataSynchronizerResult{ - EnvironmentID: internal.NewInitMetadataFromHeaders(headers).GetEnvironmentID(), - RevertToFDv1: isFDv1FallbackRequested(headers), + EnvironmentID: internal.NewInitMetadataFromHeaders(headers).GetEnvironmentID(), + FallbackToFDv1: isFDv1FallbackRequested(headers), } if err == nil { diff --git a/internal/datasourcev2/polling_data_source_test.go b/internal/datasourcev2/polling_data_source_test.go index 6074cb28..6f565004 100644 --- a/internal/datasourcev2/polling_data_source_test.go +++ b/internal/datasourcev2/polling_data_source_test.go @@ -259,13 +259,13 @@ func TestPollingProcessorSynchronizerHandlesFallbackOnSuccessfulResponse(t *test resultChan := processor.Sync(ds) - // A single Valid result carries both the payload and the RevertToFDv1 signal — the + // A single Valid result carries both the payload and the FallbackToFDv1 signal — the // consumer applies the ChangeSet first, then switches to the FDv1 synchronizer. result := <-resultChan assert.Equal(t, interfaces.DataSourceStateValid, result.State) require.NotNil(t, result.ChangeSet) assert.Len(t, result.ChangeSet.Changes(), 1) - assert.True(t, result.RevertToFDv1) + assert.True(t, result.FallbackToFDv1) }) } @@ -293,7 +293,7 @@ func TestPollingProcessorSynchronizerHandlesFallbackOnMalformedBody(t *testing.T assert.Equal(t, interfaces.DataSourceStateOff, result.State) assert.Equal(t, interfaces.DataSourceErrorKindInvalidData, result.Error.Kind) - assert.True(t, result.RevertToFDv1) + assert.True(t, result.FallbackToFDv1) }) } @@ -319,7 +319,7 @@ func TestPollingProcessorSynchronizerHandlesFallbackToFDv2(t *testing.T) { assert.Equal(t, result.State, interfaces.DataSourceStateOff) assert.Equal(t, result.Error.Kind, interfaces.DataSourceErrorKindErrorResponse) - assert.True(t, result.RevertToFDv1) + assert.True(t, result.FallbackToFDv1) }) } diff --git a/internal/datasourcev2/streaming_data_source.go b/internal/datasourcev2/streaming_data_source.go index ec664c6a..60eab05c 100644 --- a/internal/datasourcev2/streaming_data_source.go +++ b/internal/datasourcev2/streaming_data_source.go @@ -232,9 +232,9 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, resultChan chan<- su } resultChan <- subsystems.DataSynchronizerResult{ - State: interfaces.DataSourceStateValid, - EnvironmentID: environmentID, - RevertToFDv1: fallbackRequested, + State: interfaces.DataSourceStateValid, + EnvironmentID: environmentID, + FallbackToFDv1: fallbackRequested, } payloadApplied = true break @@ -301,10 +301,10 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, resultChan chan<- su } resultChan <- subsystems.DataSynchronizerResult{ - ChangeSet: changeSet, - State: interfaces.DataSourceStateValid, - EnvironmentID: environmentID, - RevertToFDv1: fallbackRequested, + ChangeSet: changeSet, + State: interfaces.DataSourceStateValid, + EnvironmentID: environmentID, + FallbackToFDv1: fallbackRequested, } payloadApplied = true @@ -317,7 +317,7 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, resultChan chan<- su } // Once a payload has been applied with a pending FDv1 fallback signal, the Valid - // result emitted above carries RevertToFDv1=true; close the stream so we stop + // result emitted above carries FallbackToFDv1=true; close the stream so we stop // consuming. Events that don't complete a payload leave payloadApplied false so we // keep consuming (fallbackRequested persists across iterations). if fallbackRequested && payloadApplied { @@ -385,10 +385,10 @@ func (sp *StreamProcessor) subscribe(ds subsystems.DataSelector, resultChan chan if isFDv1FallbackRequested(se.Header) { resultChan <- subsystems.DataSynchronizerResult{ - State: interfaces.DataSourceStateOff, - Error: errorInfo, - RevertToFDv1: true, - EnvironmentID: environmentID, + State: interfaces.DataSourceStateOff, + Error: errorInfo, + FallbackToFDv1: true, + EnvironmentID: environmentID, } return es.StreamErrorHandlerResult{CloseNow: true} } diff --git a/internal/datasourcev2/streaming_data_source_test.go b/internal/datasourcev2/streaming_data_source_test.go index 81973ca7..06bf39e1 100644 --- a/internal/datasourcev2/streaming_data_source_test.go +++ b/internal/datasourcev2/streaming_data_source_test.go @@ -182,7 +182,7 @@ func TestStreamingProcessorHandlesFallbackToFDv1(t *testing.T) { assert.Equal(t, result.State, interfaces.DataSourceStateOff) assert.Equal(t, result.Error.Kind, interfaces.DataSourceErrorKindErrorResponse) - assert.True(t, result.RevertToFDv1) + assert.True(t, result.FallbackToFDv1) }) } @@ -215,13 +215,13 @@ func TestStreamingProcessorHandlesFallbackOnSuccessfulResponse(t *testing.T) { defer sp.Close() resultChan := sp.Sync(ds) - // A single Valid result carries both the payload and the RevertToFDv1 signal — the + // A single Valid result carries both the payload and the FallbackToFDv1 signal — the // consumer applies the ChangeSet first, then switches to the FDv1 synchronizer. result := <-resultChan assert.Equal(t, interfaces.DataSourceStateValid, result.State) assert.NotNil(t, result.ChangeSet) assert.Len(t, result.ChangeSet.Changes(), 1) - assert.True(t, result.RevertToFDv1) + assert.True(t, result.FallbackToFDv1) }) } diff --git a/internal/datasystem/fdv2_datasystem.go b/internal/datasystem/fdv2_datasystem.go index 2cef16fa..2e9436f7 100644 --- a/internal/datasystem/fdv2_datasystem.go +++ b/internal/datasystem/fdv2_datasystem.go @@ -44,11 +44,11 @@ type FDv2 struct { initializers []subsystems.DataInitializer // Mutable list of synchronizer builders. Items are removed when they permanently fail. - // When reverting to FDv1, this list is replaced with a single FDv1 synchronizer. + // When falling back to FDv1, this list is replaced with a single FDv1 synchronizer. synchronizerBuilders []func() (subsystems.DataSynchronizer, error) currentSyncIndex int - // FDv1 fallback builder, used only when a synchronizer requests revert to FDv1 + // FDv1 fallback builder, used only when a synchronizer requests fallback to FDv1 fdv1FallbackBuilder func() (subsystems.DataSynchronizer, error) // Boolean used to track whether the datasystem was originally configured @@ -207,7 +207,7 @@ func (f *FDv2) run(ctx context.Context, closeWhenReady chan struct{}) { if fallback, errorInfo := f.runInitializers(ctx, closeWhenReady); fallback { if f.fdv1FallbackBuilder != nil { - f.loggers.Warn("Reverting to FDv1 protocol") + f.loggers.Warn("Falling back to FDv1 protocol") f.synchronizerBuilders = []func() (subsystems.DataSynchronizer, error){f.fdv1FallbackBuilder} f.currentSyncIndex = 0 } else { @@ -246,7 +246,7 @@ func (f *FDv2) runPersistentStoreOutageRecovery(ctx context.Context, statuses <- } // runInitializers runs each configured initializer in order until one succeeds, the context is -// cancelled, or an initializer signals a revert to FDv1. Returns (fallbackToFDv1, errorInfo): +// cancelled, or an initializer signals a fallback to FDv1. Returns (fallbackToFDv1, errorInfo): // fallbackToFDv1 is true when an initializer asked the SDK to switch to FDv1; errorInfo describes // the underlying error for status reporting when no FDv1 fallback is configured (empty when the // fallback accompanied a successful response). If fallback is signalled alongside a valid Basis, @@ -357,7 +357,7 @@ func (f *FDv2) runSynchronizers(ctx context.Context, closeWhenReady chan struct{ switch action { case syncFDv1: if f.fdv1FallbackBuilder != nil { - f.loggers.Warn("Reverting to FDv1 protocol") + f.loggers.Warn("Falling back to FDv1 protocol") // Replace entire list with single FDv1 synchronizer f.synchronizerBuilders = []func() (subsystems.DataSynchronizer, error){f.fdv1FallbackBuilder} f.currentSyncIndex = 0 @@ -441,17 +441,17 @@ func (f *FDv2) consumeSynchronizerResults( f.UpdateStatus(result.State, result.Error) case interfaces.DataSourceStateOff: f.UpdateStatus(interfaces.DataSourceStateInterrupted, result.Error) - if result.RevertToFDv1 { + if result.FallbackToFDv1 { return syncFDv1, nil } return syncRemove, nil } - // RevertToFDv1 may ride along on a Valid or Interrupted result too — e.g. a + // FallbackToFDv1 may ride along on a Valid or Interrupted result too — e.g. a // successful response whose headers also requested the fallback. The Valid/ // Interrupted branches above already applied any ChangeSet and updated status; // now hand control to the FDv1 fallback synchronizer. - if result.RevertToFDv1 { + if result.FallbackToFDv1 { return syncFDv1, nil } case <-ticker.C: diff --git a/ldclient_end_to_end_fdv2_test.go b/ldclient_end_to_end_fdv2_test.go index 46da179e..e3b5dcc2 100644 --- a/ldclient_end_to_end_fdv2_test.go +++ b/ldclient_end_to_end_fdv2_test.go @@ -165,7 +165,7 @@ func TestFDV2InitializerFallbackWithoutFDv1FallbackTransitionsToOff(t *testing.T } // When the streaming synchronizer receives a 200 response that carries both a valid SSE payload -// AND the x-ld-fd-fallback header, the SDK should apply the payload and then revert to FDv1. +// AND the x-ld-fd-fallback header, the SDK should apply the payload and then fall back to FDv1. // Without this behavior, the stream stays open against the FDv2 endpoint indefinitely. func TestFDV2CanFallBackToV1FromStreamingSuccess(t *testing.T) { dataV1 := ldservices.NewServerSDKData().Flags(alwaysFalseFlag) @@ -210,7 +210,7 @@ func TestFDV2CanFallBackToV1FromStreamingSuccess(t *testing.T) { reached := client.GetDataSourceStatusProvider().WaitFor(interfaces.DataSourceStateValid, time.Second*5) require.True(t, reached, "timed out waiting for data source to reach VALID state") - // Status becomes Valid as soon as the FDv2 stream applies its payload (with RevertToFDv1 + // Status becomes Valid as soon as the FDv2 stream applies its payload (with FallbackToFDv1 // riding along on the same result), which happens before FDv1 has fetched its own data. // Poll until the flag value reflects FDv1 data to verify the handoff completed. assert.Eventually(t, func() bool { @@ -230,7 +230,7 @@ func TestFDV2CanFallBackToV1FromInitializer(t *testing.T) { "X-LD-FD-Fallback": []string{"true"}, } - // FDv2 polling initializer: returns 500 + fallback header. Must trigger revert to FDv1 before + // FDv2 polling initializer: returns 500 + fallback header. Must trigger fallback to FDv1 before // the FDv2 streaming synchronizer is ever dialed. pollV2InitRecordingHandler, pollV2InitReqCh := httphelpers.RecordingHandler(httphelpers.HandlerWithResponse(500, header, nil)) // FDv1 polling synchronizer: returns valid FDv1 data. diff --git a/ldfiledatav2/file_data_source_impl.go b/ldfiledatav2/file_data_source_impl.go index 21765bfa..fb177d55 100644 --- a/ldfiledatav2/file_data_source_impl.go +++ b/ldfiledatav2/file_data_source_impl.go @@ -223,7 +223,7 @@ func (fs *fileDataSource) reload() { Message: err.Error(), Time: time.Time{}, }, - RevertToFDv1: false, + FallbackToFDv1: false, }) fs.loggers.Error(err) } diff --git a/subsystems/data_source.go b/subsystems/data_source.go index 1d8ffc2b..e6bcc102 100644 --- a/subsystems/data_source.go +++ b/subsystems/data_source.go @@ -55,11 +55,11 @@ type DataInitializer interface { // DataSynchronizerResult represents the results of a Synchronizer's ongoing Sync method. type DataSynchronizerResult struct { - ChangeSet *ChangeSet - State interfaces.DataSourceState - Error interfaces.DataSourceErrorInfo - RevertToFDv1 bool - EnvironmentID ldvalue.OptionalString + ChangeSet *ChangeSet + State interfaces.DataSourceState + Error interfaces.DataSourceErrorInfo + FallbackToFDv1 bool + EnvironmentID ldvalue.OptionalString } // DataSynchronizer represents a component capable of obtaining a Basis and subsequent delta updates asynchronously. diff --git a/subsystems/datasystem_configuration.go b/subsystems/datasystem_configuration.go index 473b3937..b7f0e688 100644 --- a/subsystems/datasystem_configuration.go +++ b/subsystems/datasystem_configuration.go @@ -8,7 +8,7 @@ type SynchronizersConfiguration struct { SynchronizerBuilders []func() (DataSynchronizer, error) // FDv1FallbackBuilder is a special fallback used only when a synchronizer - // returns RevertToFDv1=true. When activated, the system abandons the synchronizer list + // returns FallbackToFDv1=true. When activated, the system abandons the synchronizer list // and switches to FDv1-only mode. FDv1FallbackBuilder func() (DataSynchronizer, error) } diff --git a/testhelpers/ldtestdatav2/test_data_source.go b/testhelpers/ldtestdatav2/test_data_source.go index 860c5eae..90a17566 100644 --- a/testhelpers/ldtestdatav2/test_data_source.go +++ b/testhelpers/ldtestdatav2/test_data_source.go @@ -328,8 +328,8 @@ func (d *testDataSourceImpl) Sync(ds subsystems.DataSelector) <-chan subsystems. statusChan := d.owner.statusBroadcaster.AddListener() result := subsystems.DataSynchronizerResult{ - State: interfaces.DataSourceStateInitializing, - RevertToFDv1: false, + State: interfaces.DataSourceStateInitializing, + FallbackToFDv1: false, } go func() {