diff --git a/interfaces/data_source_status_provider.go b/interfaces/data_source_status_provider.go index 78d5a3ea..999e2f20 100644 --- a/interfaces/data_source_status_provider.go +++ b/interfaces/data_source_status_provider.go @@ -88,9 +88,9 @@ type DataSourceStatusProvider interface { // // Synchronizers are expected to emit these status updates on the channel returned by Sync(). type DataSynchronizerStatus struct { - State DataSourceState - Error DataSourceErrorInfo - RevertToFDv1 bool + State DataSourceState + Error DataSourceErrorInfo + FallbackToFDv1 bool } // DataSourceStatus is information about the data source's status and the last status change. diff --git a/internal/datasourcev2/helpers.go b/internal/datasourcev2/helpers.go index 54a02311..d7442420 100644 --- a/internal/datasourcev2/helpers.go +++ b/internal/datasourcev2/helpers.go @@ -12,6 +12,16 @@ import ( "github.com/launchdarkly/go-sdk-common/v3/ldlog" ) +// fdv1FallbackHeader is the response header LaunchDarkly uses to instruct an SDK to abandon the +// FDv2 protocol and fall back to FDv1. +const fdv1FallbackHeader = "X-LD-FD-Fallback" + +// isFDv1FallbackRequested reports whether the response headers signal that the SDK should fall +// back to the FDv1 protocol. +func isFDv1FallbackRequested(h http.Header) bool { + return h.Get(fdv1FallbackHeader) == "true" +} + type httpStatusError struct { Message string Code int diff --git a/internal/datasourcev2/polling_data_source.go b/internal/datasourcev2/polling_data_source.go index 253fd2c0..27c6edcf 100644 --- a/internal/datasourcev2/polling_data_source.go +++ b/internal/datasourcev2/polling_data_source.go @@ -68,13 +68,14 @@ func (pp *PollingProcessor) Name() string { } //nolint:revive // DataInitializer method. -func (pp *PollingProcessor) Fetch(ds subsystems.DataSelector, ctx context.Context) (*subsystems.Basis, error) { +func (pp *PollingProcessor) Fetch(ds subsystems.DataSelector, ctx context.Context) (*subsystems.Basis, bool, error) { changeSet, headers, err := pp.requester.Request(ctx, ds.Selector()) + fallback := isFDv1FallbackRequested(headers) if err != nil { - return nil, err + return nil, fallback, err } environmentID := internal.NewInitMetadataFromHeaders(headers).GetEnvironmentID() - return &subsystems.Basis{ChangeSet: *changeSet, Persist: true, EnvironmentID: environmentID}, nil + return &subsystems.Basis{ChangeSet: *changeSet, Persist: true, EnvironmentID: environmentID}, fallback, nil } //nolint:revive // DataSynchronizer method. @@ -106,64 +107,41 @@ func (pp *PollingProcessor) Sync(ds subsystems.DataSelector) <-chan subsystems.D close(resultChan) return case <-ticker.C: - if err := pp.poll(ctx, ds, resultChan); err != nil { - if hse, ok := err.(httpStatusError); ok { - environmentID := internal.NewInitMetadataFromHeaders(hse.Header).GetEnvironmentID() - - errorInfo := interfaces.DataSourceErrorInfo{ - Kind: interfaces.DataSourceErrorKindErrorResponse, - StatusCode: hse.Code, - Time: time.Now(), - } - - if hse.Header.Get("X-LD-FD-Fallback") == "true" { - resultChan <- subsystems.DataSynchronizerResult{ - State: interfaces.DataSourceStateOff, - Error: errorInfo, - RevertToFDv1: true, - EnvironmentID: environmentID, - } - return - } - - recoverable := checkIfErrorIsRecoverableAndLog( - pp.loggers, - httpErrorDescription(hse.Code), - pollingErrorContext, - hse.Code, - pollingWillRetryMessage, - ) - if recoverable { - resultChan <- subsystems.DataSynchronizerResult{ - State: interfaces.DataSourceStateInterrupted, - Error: errorInfo, - EnvironmentID: environmentID, - } - } else { - resultChan <- subsystems.DataSynchronizerResult{ - State: interfaces.DataSourceStateOff, - Error: errorInfo, - EnvironmentID: environmentID, - } - return - } - } else { - errorInfo := interfaces.DataSourceErrorInfo{ - Kind: interfaces.DataSourceErrorKindNetworkError, - Message: err.Error(), - Time: time.Now(), - } - if _, ok := err.(malformedJSONError); ok { - errorInfo.Kind = interfaces.DataSourceErrorKindInvalidData - } - checkIfErrorIsRecoverableAndLog(pp.loggers, err.Error(), pollingErrorContext, 0, pollingWillRetryMessage) - resultChan <- subsystems.DataSynchronizerResult{ - State: interfaces.DataSourceStateInterrupted, - Error: errorInfo, - } - } + result, err := pp.poll(ctx, ds) + + // When the server requested FDv1 fallback, dispatch the result as-is -- poll has + // already populated State (Valid on success, Off on error) and FallbackToFDv1=true. + if result.FallbackToFDv1 { + resultChan <- result + return + } + + if err == nil { + resultChan <- result continue } + + // Non-fallback error: the caller may downgrade Off --> Interrupted when the error + // is recoverable. Log at the appropriate level. + if hse, ok := err.(httpStatusError); ok { + if checkIfErrorIsRecoverableAndLog( + pp.loggers, + httpErrorDescription(hse.Code), + pollingErrorContext, + hse.Code, + pollingWillRetryMessage, + ) { + result.State = interfaces.DataSourceStateInterrupted + resultChan <- result + continue + } + resultChan <- result // poll set State=Off + return + } + + checkIfErrorIsRecoverableAndLog(pp.loggers, err.Error(), pollingErrorContext, 0, pollingWillRetryMessage) + result.State = interfaces.DataSourceStateInterrupted + resultChan <- result } } }() @@ -171,21 +149,51 @@ func (pp *PollingProcessor) Sync(ds subsystems.DataSelector) <-chan subsystems.D return resultChan } +// poll performs a single polling request and builds a DataSynchronizerResult describing the +// outcome. The result's FallbackToFDv1 flag is always populated from the x-ld-fd-fallback response +// header, whether or not the request succeeded -- a 500 or a malformed-JSON body can still carry +// the fallback signal. +// +// On success: result.State = Valid, result.ChangeSet populated, err = nil. +// On error: result.State = Off (the safer default), result.Error populated with Kind/Message/ +// StatusCode as appropriate, err returned so the caller can apply context-specific logic +// (e.g. downgrade Off --> Interrupted when the HTTP error is recoverable). +// +// The caller is responsible for publishing the result to its channel; poll does not touch any +// resultChan so it can be unit-tested in isolation. func (pp *PollingProcessor) poll( - ctx context.Context, ds subsystems.DataSelector, resultChan chan<- subsystems.DataSynchronizerResult, -) error { + ctx context.Context, ds subsystems.DataSelector, +) (subsystems.DataSynchronizerResult, error) { changeSet, headers, err := pp.requester.Request(ctx, ds.Selector()) - if err != nil { - return err + result := subsystems.DataSynchronizerResult{ + EnvironmentID: internal.NewInitMetadataFromHeaders(headers).GetEnvironmentID(), + FallbackToFDv1: isFDv1FallbackRequested(headers), } - resultChan <- subsystems.DataSynchronizerResult{ - ChangeSet: changeSet, - State: interfaces.DataSourceStateValid, - EnvironmentID: internal.NewInitMetadataFromHeaders(headers).GetEnvironmentID(), + if err == nil { + result.ChangeSet = changeSet + result.State = interfaces.DataSourceStateValid + return result, nil } - return nil + result.State = interfaces.DataSourceStateOff + if hse, ok := err.(httpStatusError); ok { + result.Error = interfaces.DataSourceErrorInfo{ + Kind: interfaces.DataSourceErrorKindErrorResponse, + StatusCode: hse.Code, + Time: time.Now(), + } + } else { + result.Error = interfaces.DataSourceErrorInfo{ + Kind: interfaces.DataSourceErrorKindNetworkError, + Message: err.Error(), + Time: time.Now(), + } + if _, ok := err.(malformedJSONError); ok { + result.Error.Kind = interfaces.DataSourceErrorKindInvalidData + } + } + return result, err } //nolint:revive // no doc comment for standard method diff --git a/internal/datasourcev2/polling_data_source_test.go b/internal/datasourcev2/polling_data_source_test.go index c2534571..e9b4cf5c 100644 --- a/internal/datasourcev2/polling_data_source_test.go +++ b/internal/datasourcev2/polling_data_source_test.go @@ -21,6 +21,7 @@ import ( "github.com/launchdarkly/go-server-sdk/v7/testhelpers/ldservicesv2" "github.com/launchdarkly/go-test-helpers/v3/httphelpers" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) var alwaysTrueFlag = ldbuilders.NewFlagBuilder("always-true-flag").SingleVariation(ldvalue.Bool(true)).Build() @@ -40,7 +41,7 @@ func TestPollingProcessorInitializerCanMakeSuccessfulRequest(t *testing.T) { ) defer processor.Close() - basis, err := processor.Fetch(ds, context.Background()) + basis, _, err := processor.Fetch(ds, context.Background()) assert.NoError(t, err) assert.Len(t, basis.ChangeSet.Changes(), 1) assert.Equal(t, basis.ChangeSet.IntentCode(), subsystems.IntentTransferFull) @@ -67,7 +68,7 @@ func TestPollingProcessorInitializerAppendsFilterParameter(t *testing.T) { }, ) defer processor.Close() - _, err := processor.Fetch(ds, context.Background()) + _, _, err := processor.Fetch(ds, context.Background()) assert.NoError(t, err) r := <-requestsCh @@ -90,7 +91,7 @@ func TestPollingProcessorInitializerAppendsBasisParameter(t *testing.T) { defer processor.Close() ds := mocks.NewMockDataSelector(subsystems.NewSelector("test-state", 1)) - _, err := processor.Fetch(ds, context.Background()) + _, _, err := processor.Fetch(ds, context.Background()) assert.NoError(t, err) r := <-requestsCh @@ -115,7 +116,7 @@ func TestPollingProcessorSynchronizerAppendsFilterParameter(t *testing.T) { ) defer processor.Close() - _, err := processor.Fetch(ds, context.Background()) + _, _, err := processor.Fetch(ds, context.Background()) assert.NoError(t, err) r := <-requestsCh @@ -235,6 +236,67 @@ func TestPollingProcessorSynchronizerHandlesInvalidJSON(t *testing.T) { }) } +func TestPollingProcessorSynchronizerHandlesFallbackOnSuccessfulResponse(t *testing.T) { + ds := mocks.NewMockDataSelector(subsystems.NoSelector()) + data := ldservicesv2.NewServerSDKData().Flags(alwaysTrueFlag).ToInitializerPayload(subsystems.NewSelector("test-state", 1)) + + // Wrap the valid 200 handler so the response also carries x-ld-fd-fallback: true. + underlying := ldservices.ServerSidePollingV2ServiceHandler(data) + fallbackOnSuccess := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("X-LD-FD-Fallback", "true") + underlying.ServeHTTP(w, r) + }) + + httphelpers.WithServer(fallbackOnSuccess, func(ts *httptest.Server) { + processor := NewPollingProcessor( + sharedtest.BasicClientContext(), + datasource.PollingConfig{ + BaseURI: ts.URL, + PollInterval: time.Minute * 30, + }, + ) + defer processor.Close() + + resultChan := processor.Sync(ds) + + // A single Valid result carries both the payload and the FallbackToFDv1 signal -- the + // consumer applies the ChangeSet first, then switches to the FDv1 synchronizer. + result := <-resultChan + assert.Equal(t, interfaces.DataSourceStateValid, result.State) + require.NotNil(t, result.ChangeSet) + assert.Len(t, result.ChangeSet.Changes(), 1) + assert.True(t, result.FallbackToFDv1) + }) +} + +func TestPollingProcessorSynchronizerHandlesFallbackOnMalformedBody(t *testing.T) { + // 200 OK with invalid JSON and the fallback header -- a non-httpStatusError error path that + // must still honor the fallback signal rather than treating the parse failure as a retry. + ds := mocks.NewMockDataSelector(subsystems.NoSelector()) + + fallbackHeader := http.Header{ + "X-LD-FD-Fallback": []string{"true"}, + } + handler, _ := httphelpers.RecordingHandler(httphelpers.HandlerWithResponse(200, fallbackHeader, []byte("not json"))) + httphelpers.WithServer(handler, func(ts *httptest.Server) { + processor := NewPollingProcessor( + sharedtest.BasicClientContext(), + datasource.PollingConfig{ + BaseURI: ts.URL, + PollInterval: time.Minute * 30, + }, + ) + defer processor.Close() + + resultChan := processor.Sync(ds) + result := <-resultChan + + assert.Equal(t, interfaces.DataSourceStateOff, result.State) + assert.Equal(t, interfaces.DataSourceErrorKindInvalidData, result.Error.Kind) + assert.True(t, result.FallbackToFDv1) + }) +} + func TestPollingProcessorSynchronizerHandlesFallbackToFDv2(t *testing.T) { ds := mocks.NewMockDataSelector(subsystems.NoSelector()) @@ -257,7 +319,84 @@ func TestPollingProcessorSynchronizerHandlesFallbackToFDv2(t *testing.T) { assert.Equal(t, result.State, interfaces.DataSourceStateOff) assert.Equal(t, result.Error.Kind, interfaces.DataSourceErrorKindErrorResponse) - assert.True(t, result.RevertToFDv1) + assert.True(t, result.FallbackToFDv1) + }) +} + +func TestPollingProcessorInitializerHandlesFallbackOnSuccessfulResponse(t *testing.T) { + ds := mocks.NewMockDataSelector(subsystems.NoSelector()) + data := ldservicesv2.NewServerSDKData().Flags(alwaysTrueFlag).ToInitializerPayload(subsystems.NewSelector("test-state", 1)) + + // Wrap the valid 200 handler to inject X-LD-FD-Fallback alongside a well-formed payload. + underlying := ldservices.ServerSidePollingV2ServiceHandler(data) + fallbackOn200 := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("X-LD-FD-Fallback", "true") + underlying.ServeHTTP(w, r) + }) + + httphelpers.WithServer(fallbackOn200, func(ts *httptest.Server) { + processor := NewPollingProcessor( + sharedtest.BasicClientContext(), + datasource.PollingConfig{ + BaseURI: ts.URL, + PollInterval: time.Minute * 30, + }, + ) + defer processor.Close() + + basis, fallback, err := processor.Fetch(ds, context.Background()) + assert.True(t, fallback) + assert.NoError(t, err) + // Even when the server signals fallback, a valid payload in the same response + // should still be surfaced so the caller can apply it before switching protocols. + require.NotNil(t, basis) + assert.Len(t, basis.ChangeSet.Changes(), 1) + assert.Equal(t, "test-state", basis.ChangeSet.Selector().State()) + }) +} + +func TestPollingProcessorInitializerHandlesFallbackOnErrorResponse(t *testing.T) { + ds := mocks.NewMockDataSelector(subsystems.NoSelector()) + + fallbackHeader := http.Header{ + "X-LD-FD-Fallback": []string{"true"}, + } + handler, _ := httphelpers.RecordingHandler(httphelpers.HandlerWithResponse(500, fallbackHeader, nil)) + httphelpers.WithServer(handler, func(ts *httptest.Server) { + processor := NewPollingProcessor( + sharedtest.BasicClientContext(), + datasource.PollingConfig{ + BaseURI: ts.URL, + PollInterval: time.Minute * 30, + }, + ) + defer processor.Close() + + basis, fallback, err := processor.Fetch(ds, context.Background()) + assert.Nil(t, basis) + assert.True(t, fallback) + assert.Error(t, err, "underlying HTTP error should still be surfaced alongside the fallback signal") + }) +} + +func TestPollingProcessorInitializerErrorWithoutFallbackHeaderReturnsRegularError(t *testing.T) { + ds := mocks.NewMockDataSelector(subsystems.NoSelector()) + + handler, _ := httphelpers.RecordingHandler(httphelpers.HandlerWithStatus(500)) + httphelpers.WithServer(handler, func(ts *httptest.Server) { + processor := NewPollingProcessor( + sharedtest.BasicClientContext(), + datasource.PollingConfig{ + BaseURI: ts.URL, + PollInterval: time.Minute * 30, + }, + ) + defer processor.Close() + + basis, fallback, err := processor.Fetch(ds, context.Background()) + assert.Nil(t, basis) + assert.False(t, fallback) + assert.Error(t, err) }) } diff --git a/internal/datasourcev2/streaming_data_source.go b/internal/datasourcev2/streaming_data_source.go index d76d7cd8..60eab05c 100644 --- a/internal/datasourcev2/streaming_data_source.go +++ b/internal/datasourcev2/streaming_data_source.go @@ -106,8 +106,8 @@ func (sp *StreamProcessor) Name() string { } //nolint:revive // DataInitializer method. -func (sp *StreamProcessor) Fetch(ds subsystems.DataSelector, _ context.Context) (*subsystems.Basis, error) { - return nil, errors.New("StreamProcessor does not implement Fetch capability") +func (sp *StreamProcessor) Fetch(ds subsystems.DataSelector, _ context.Context) (*subsystems.Basis, bool, error) { + return nil, false, errors.New("StreamProcessor does not implement Fetch capability") } //nolint:revive // DataSynchronizer method. @@ -126,6 +126,38 @@ func (sp *StreamProcessor) Sync(ds subsystems.DataSelector) <-chan subsystems.Da return resultChan } +// reportMalformedEvent logs a malformed-event error and pushes an Interrupted result on resultChan. +// Callers are responsible for resetting their own change-set builder and triggering a restart. +func (sp *StreamProcessor) reportMalformedEvent( + event es.Event, + err error, + environmentID ldvalue.OptionalString, + resultChan chan<- subsystems.DataSynchronizerResult, +) { + if event == nil { + sp.loggers.Errorf( + "Received streaming events with malformed JSON data (%s); will restart stream", + err, + ) + } else { + sp.loggers.Errorf( + "Received streaming \"%s\" event with malformed JSON data (%s); will restart stream", + event.Event(), + err, + ) + } + + resultChan <- subsystems.DataSynchronizerResult{ + State: interfaces.DataSourceStateInterrupted, + Error: interfaces.DataSourceErrorInfo{ + Kind: interfaces.DataSourceErrorKindInvalidData, + Message: err.Error(), + Time: time.Now(), + }, + EnvironmentID: environmentID, + } +} + func (sp *StreamProcessor) consumeStream(stream *es.Stream, resultChan chan<- subsystems.DataSynchronizerResult) { // Consume remaining Events and Errors so we can garbage collect defer func() { @@ -139,6 +171,10 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, resultChan chan<- su changeSetBuilder := subsystems.NewChangeSetBuilder() environmentID := ldvalue.OptionalString{} + // fallbackRequested is set when the server's response headers carry x-ld-fd-fallback: true. + // We finish applying the current payload before emitting the fallback signal, so evaluations + // can serve the server-provided data while FDv1 takes over. + fallbackRequested := false for { select { @@ -156,39 +192,20 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, resultChan chan<- su sp.logConnectionResult(true) shouldRestart := false + payloadApplied := false if eventWithHeaders, ok := event.(es.EventWithHeaders); ok { - environmentID = internal.NewInitMetadataFromHeaders(eventWithHeaders.Headers()).GetEnvironmentID() + headers := eventWithHeaders.Headers() + environmentID = internal.NewInitMetadataFromHeaders(headers).GetEnvironmentID() + if isFDv1FallbackRequested(headers) { + fallbackRequested = true + } } gotMalformedEvent := func(event es.Event, err error) { // The protocol should "forget" anything that happens upon receiving an error. changeSetBuilder = subsystems.NewChangeSetBuilder() - - if event == nil { - sp.loggers.Errorf( - "Received streaming events with malformed JSON data (%s); will restart stream", - err, - ) - } else { - sp.loggers.Errorf( - "Received streaming \"%s\" event with malformed JSON data (%s); will restart stream", - event.Event(), - err, - ) - } - - errorInfo := interfaces.DataSourceErrorInfo{ - Kind: interfaces.DataSourceErrorKindInvalidData, - Message: err.Error(), - Time: time.Now(), - } - resultChan <- subsystems.DataSynchronizerResult{ - State: interfaces.DataSourceStateInterrupted, - Error: errorInfo, - EnvironmentID: environmentID, - } - + sp.reportMalformedEvent(event, err, environmentID, resultChan) shouldRestart = true // scenario 1 in error handling comments at top of file } @@ -215,9 +232,11 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, resultChan chan<- su } resultChan <- subsystems.DataSynchronizerResult{ - State: interfaces.DataSourceStateValid, - EnvironmentID: environmentID, + State: interfaces.DataSourceStateValid, + EnvironmentID: environmentID, + FallbackToFDv1: fallbackRequested, } + payloadApplied = true break } @@ -282,10 +301,12 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, resultChan chan<- su } resultChan <- subsystems.DataSynchronizerResult{ - ChangeSet: changeSet, - State: interfaces.DataSourceStateValid, - EnvironmentID: environmentID, + ChangeSet: changeSet, + State: interfaces.DataSourceStateValid, + EnvironmentID: environmentID, + FallbackToFDv1: fallbackRequested, } + payloadApplied = true default: sp.loggers.Infof("Unexpected event found in stream: %s", event.Event()) @@ -295,6 +316,15 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, resultChan chan<- su stream.Restart() } + // Once a payload has been applied with a pending FDv1 fallback signal, the Valid + // result emitted above carries FallbackToFDv1=true; close the stream so we stop + // consuming. Events that don't complete a payload leave payloadApplied false so we + // keep consuming (fallbackRequested persists across iterations). + if fallbackRequested && payloadApplied { + stream.Close() + return + } + case <-sp.halt: stream.Close() return @@ -353,12 +383,12 @@ func (sp *StreamProcessor) subscribe(ds subsystems.DataSelector, resultChan chan Time: time.Now(), } - if se.Header.Get("X-LD-FD-Fallback") == "true" { + if isFDv1FallbackRequested(se.Header) { resultChan <- subsystems.DataSynchronizerResult{ - State: interfaces.DataSourceStateOff, - Error: errorInfo, - RevertToFDv1: true, - EnvironmentID: environmentID, + State: interfaces.DataSourceStateOff, + Error: errorInfo, + FallbackToFDv1: true, + EnvironmentID: environmentID, } return es.StreamErrorHandlerResult{CloseNow: true} } diff --git a/internal/datasourcev2/streaming_data_source_test.go b/internal/datasourcev2/streaming_data_source_test.go index 5bcc1ef8..1acb905e 100644 --- a/internal/datasourcev2/streaming_data_source_test.go +++ b/internal/datasourcev2/streaming_data_source_test.go @@ -60,8 +60,9 @@ func TestStreamingDoesNotWorkAsInitializer(t *testing.T) { ) defer sp.Close() - basis, err := sp.Fetch(ds, context.Background()) + basis, fallback, err := sp.Fetch(ds, context.Background()) assert.Nil(t, basis) + assert.False(t, fallback) assert.NotNil(t, err) } @@ -181,7 +182,46 @@ func TestStreamingProcessorHandlesFallbackToFDv1(t *testing.T) { assert.Equal(t, result.State, interfaces.DataSourceStateOff) assert.Equal(t, result.Error.Kind, interfaces.DataSourceErrorKindErrorResponse) - assert.True(t, result.RevertToFDv1) + assert.True(t, result.FallbackToFDv1) + }) +} + +func TestStreamingProcessorHandlesFallbackOnSuccessfulResponse(t *testing.T) { + data := ldservicesv2.NewServerSDKData().Flags(alwaysTrueFlag) + protocol := ldservicesv2.NewStreamingProtocol(). + WithIntent(subsystems.ServerIntent{Payload: subsystems.Payload{ + ID: "something-id", Target: 0, Code: subsystems.IntentTransferFull, Reason: "payload-missing", + }}). + WithPutObjects(data.ToPutObjects()). + WithTransferred("updated-state", 2) + streamHandler, _ := ldservices.ServerSideStreamingV2ServiceProtocolHandler(protocol) + + // Wrap the valid SSE handler so the response carries x-ld-fd-fallback: true. + fallbackOnSuccess := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("X-LD-FD-Fallback", "true") + streamHandler.ServeHTTP(w, r) + }) + + ds := mocks.NewMockDataSelector(subsystems.NoSelector()) + httphelpers.WithServer(fallbackOnSuccess, func(ts *httptest.Server) { + sp := NewStreamProcessor( + sharedtest.BasicClientContext(), + datasource.StreamConfig{ + URI: ts.URL, + InitialReconnectDelay: time.Millisecond * 50, + }, + ) + + defer sp.Close() + resultChan := sp.Sync(ds) + + // A single Valid result carries both the payload and the FallbackToFDv1 signal -- the + // consumer applies the ChangeSet first, then switches to the FDv1 synchronizer. + result := <-resultChan + assert.Equal(t, interfaces.DataSourceStateValid, result.State) + assert.NotNil(t, result.ChangeSet) + assert.Len(t, result.ChangeSet.Changes(), 1) + assert.True(t, result.FallbackToFDv1) }) } diff --git a/internal/datasystem/fdv2_datasystem.go b/internal/datasystem/fdv2_datasystem.go index 8d279ab1..68df7675 100644 --- a/internal/datasystem/fdv2_datasystem.go +++ b/internal/datasystem/fdv2_datasystem.go @@ -44,11 +44,11 @@ type FDv2 struct { initializers []subsystems.DataInitializer // Mutable list of synchronizer builders. Items are removed when they permanently fail. - // When reverting to FDv1, this list is replaced with a single FDv1 synchronizer. + // When falling back to FDv1, this list is replaced with a single FDv1 synchronizer. synchronizerBuilders []func() (subsystems.DataSynchronizer, error) currentSyncIndex int - // FDv1 fallback builder, used only when a synchronizer requests revert to FDv1 + // FDv1 fallback builder, used only when a synchronizer requests fallback to FDv1 fdv1FallbackBuilder func() (subsystems.DataSynchronizer, error) // Boolean used to track whether the datasystem was originally configured @@ -205,7 +205,17 @@ func (f *FDv2) launchTask(task func()) { func (f *FDv2) run(ctx context.Context, closeWhenReady chan struct{}) { f.UpdateStatus(interfaces.DataSourceStateInitializing, interfaces.DataSourceErrorInfo{}) - f.runInitializers(ctx, closeWhenReady) + if fallback, errorInfo := f.runInitializers(ctx, closeWhenReady); fallback { + if f.fdv1FallbackBuilder != nil { + f.loggers.Warn("Falling back to FDv1 protocol") + f.synchronizerBuilders = []func() (subsystems.DataSynchronizer, error){f.fdv1FallbackBuilder} + f.currentSyncIndex = 0 + } else { + f.loggers.Warn("Initializer requested FDv1 fallback but none configured") + f.synchronizerBuilders = nil + f.UpdateStatus(interfaces.DataSourceStateOff, errorInfo) + } + } if f.configuredWithDataSources && f.dataStoreStatusProvider.IsStatusMonitoringEnabled() { f.launchTask(func() { @@ -235,12 +245,44 @@ func (f *FDv2) runPersistentStoreOutageRecovery(ctx context.Context, statuses <- } } -func (f *FDv2) runInitializers(ctx context.Context, closeWhenReady chan struct{}) { +// runInitializers runs each configured initializer in order until one succeeds, the context is +// cancelled, or an initializer signals a fallback to FDv1. Returns (fallbackToFDv1, errorInfo): +// fallbackToFDv1 is true when an initializer asked the SDK to switch to FDv1; errorInfo describes +// the underlying error for status reporting when no FDv1 fallback is configured (empty when the +// fallback accompanied a successful response). If fallback is signalled alongside a valid Basis, +// that Basis is applied before returning so evaluations can serve the server-provided data while +// the FDv1 synchronizer spins up. +func (f *FDv2) runInitializers( + ctx context.Context, closeWhenReady chan struct{}, +) (fallbackToFDv1 bool, errorInfo interfaces.DataSourceErrorInfo) { for _, initializer := range f.initializers { f.loggers.Infof("Attempting to initialize via %s", initializer.Name()) - basis, err := initializer.Fetch(f.store, ctx) + basis, fallback, err := initializer.Fetch(f.store, ctx) if errors.Is(err, context.Canceled) { - return + return false, interfaces.DataSourceErrorInfo{} + } + if fallback { + if err != nil { + f.loggers.Warnf("Initializer %s requested fallback to FDv1 protocol: %v", initializer.Name(), err) + errorInfo = interfaces.DataSourceErrorInfo{ + Kind: interfaces.DataSourceErrorKindUnknown, + Message: err.Error(), + Time: time.Now(), + } + } else { + f.loggers.Warnf("Initializer %s requested fallback to FDv1 protocol", initializer.Name()) + } + if basis != nil { + f.environmentIDProvider.SetEnvironmentID(basis.EnvironmentID) + f.store.Apply(basis.ChangeSet, basis.Persist) + if basis.ChangeSet.Selector().IsDefined() { + f.loggers.Infof("Applied payload from %s before falling back to FDv1", initializer.Name()) + f.readyOnce.Do(func() { + close(closeWhenReady) + }) + } + } + return true, errorInfo } if err != nil { f.loggers.Warnf("Initializer %s failed: %v", initializer.Name(), err) @@ -253,9 +295,10 @@ func (f *FDv2) runInitializers(ctx context.Context, closeWhenReady chan struct{} f.readyOnce.Do(func() { close(closeWhenReady) }) - return + return false, interfaces.DataSourceErrorInfo{} } } + return false, interfaces.DataSourceErrorInfo{} } func (f *FDv2) runSynchronizers(ctx context.Context, closeWhenReady chan struct{}) { @@ -314,7 +357,7 @@ func (f *FDv2) runSynchronizers(ctx context.Context, closeWhenReady chan struct{ switch action { case syncFDv1: if f.fdv1FallbackBuilder != nil { - f.loggers.Warn("Reverting to FDv1 protocol") + f.loggers.Warn("Falling back to FDv1 protocol") // Replace entire list with single FDv1 synchronizer f.synchronizerBuilders = []func() (subsystems.DataSynchronizer, error){f.fdv1FallbackBuilder} f.currentSyncIndex = 0 @@ -398,11 +441,19 @@ func (f *FDv2) consumeSynchronizerResults( f.UpdateStatus(result.State, result.Error) case interfaces.DataSourceStateOff: f.UpdateStatus(interfaces.DataSourceStateInterrupted, result.Error) - if result.RevertToFDv1 { + if result.FallbackToFDv1 { return syncFDv1, nil } return syncRemove, nil } + + // FallbackToFDv1 may ride along on a Valid or Interrupted result too -- e.g. a + // successful response whose headers also requested the fallback. The Valid/ + // Interrupted branches above already applied any ChangeSet and updated status; + // now hand control to the FDv1 fallback synchronizer. + if result.FallbackToFDv1 { + return syncFDv1, nil + } case <-ticker.C: // If there's only one synchronizer, don't check conditions if len(f.synchronizerBuilders) == 1 { diff --git a/ldclient_end_to_end_fdv2_test.go b/ldclient_end_to_end_fdv2_test.go index 32c7a92f..e3b5dcc2 100644 --- a/ldclient_end_to_end_fdv2_test.go +++ b/ldclient_end_to_end_fdv2_test.go @@ -116,6 +116,158 @@ func TestFDV2CanFallBackToV1(t *testing.T) { }) } +// When an initializer requests FDv1 fallback but no FDv1 fallback is configured, the data source +// status must transition to Off rather than staying stuck at Initializing. This mirrors the +// synchronizer-triggered path when fdv1FallbackBuilder is nil. +func TestFDV2InitializerFallbackWithoutFDv1FallbackTransitionsToOff(t *testing.T) { + header := http.Header{ + "X-LD-FD-Fallback": []string{"true"}, + } + handler, requestsCh := httphelpers.RecordingHandler(httphelpers.HandlerWithResponse(500, header, nil)) + + httphelpers.WithServer(handler, func(server *httptest.Server) { + logCapture := ldlogtest.NewMockLog() + + // Custom data system: a polling initializer, no synchronizers, no FDv1 fallback. + config := Config{ + Events: ldcomponents.NoEvents(), + Logging: ldcomponents.Logging().Loggers(logCapture.Loggers), + DataSystem: ldcomponents.DataSystem().Custom().Initializers( + ldcomponents.PollingDataSourceV2().BaseURI(server.URL).AsInitializer(), + ), + } + + client, err := MakeCustomClient(testSdkKey, config, time.Second*5) + require.Error(t, err) + require.NotNil(t, client) + defer client.Close() + + <-requestsCh + + // With no FDv1 fallback configured, an initializer-triggered fallback must transition the + // status to Off — if it stays at Initializing, MakeCustomClient treats it as an init + // failure and we see initializationFailedErrorMessage here. Either way the status field + // should end up Off, so assert that directly. + status := client.GetDataSourceStatusProvider().GetStatus() + assert.Equal(t, + interfaces.DataSourceStateOff, + status.State, + "status should transition to Off when initializer fallback requested but no FDv1 fallback configured") + // The underlying initializer error must be preserved on the Off status so programmatic + // monitors can see why the data source shut down, not just that it did. + assert.NotEqual(t, interfaces.DataSourceErrorInfo{}, status.LastError, + "LastError should carry the initializer error that accompanied the fallback signal") + assert.Equal(t, initializationFailedErrorMessage, err.Error()) + + assert.Contains(t, logCapture.GetOutput(ldlog.Warn), + "Initializer requested FDv1 fallback but none configured") + }) +} + +// When the streaming synchronizer receives a 200 response that carries both a valid SSE payload +// AND the x-ld-fd-fallback header, the SDK should apply the payload and then fall back to FDv1. +// Without this behavior, the stream stays open against the FDv2 endpoint indefinitely. +func TestFDV2CanFallBackToV1FromStreamingSuccess(t *testing.T) { + dataV1 := ldservices.NewServerSDKData().Flags(alwaysFalseFlag) + dataV2 := ldservicesv2.NewServerSDKData().Flags(alwaysTrueFlag) + + protocol := ldservicesv2.NewStreamingProtocol(). + WithIntent(subsystems.ServerIntent{Payload: subsystems.Payload{ + ID: "fake-id", Target: 0, Code: subsystems.IntentTransferFull, Reason: "payload-missing", + }}). + WithPutObjects(dataV2.ToPutObjects()). + WithTransferred("state", 1) + + streamV2Handler, _ := ldservices.ServerSideStreamingV2ServiceProtocolHandler(protocol) + streamV2WithFallback := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("X-LD-FD-Fallback", "true") + streamV2Handler.ServeHTTP(w, r) + }) + + // Init phase: FDv2 poll returns 500. Sync phase: FDv2 stream returns valid SSE + fallback + // header. FDv1 fallback phase: FDv1 poll returns the V1 data (always-false flag). + pollV2InitHandler, _ := httphelpers.RecordingHandler(httphelpers.HandlerWithStatus(500)) + streamRecordingHandler, streamV2ReqCh := httphelpers.RecordingHandler(streamV2WithFallback) + pollV1SyncHandler, pollV1SyncReqCh := httphelpers.RecordingHandler(ldservices.ServerSidePollingServiceHandler(dataV1)) + + handler := httphelpers.SequentialHandler(pollV2InitHandler, streamRecordingHandler, pollV1SyncHandler) + + httphelpers.WithServer(handler, func(server *httptest.Server) { + logCapture := ldlogtest.NewMockLog() + + config := Config{ + Events: ldcomponents.NoEvents(), + Logging: ldcomponents.Logging().Loggers(logCapture.Loggers), + DataSystem: ldcomponents.DataSystem().WithRelayProxyEndpoints(server.URL).Default(), + } + + client, err := MakeCustomClient(testSdkKey, config, time.Second*5) + <-streamV2ReqCh + <-pollV1SyncReqCh + require.NoError(t, err) + defer client.Close() + + reached := client.GetDataSourceStatusProvider().WaitFor(interfaces.DataSourceStateValid, time.Second*5) + require.True(t, reached, "timed out waiting for data source to reach VALID state") + + // Status becomes Valid as soon as the FDv2 stream applies its payload (with FallbackToFDv1 + // riding along on the same result), which happens before FDv1 has fetched its own data. + // Poll until the flag value reflects FDv1 data to verify the handoff completed. + assert.Eventually(t, func() bool { + value, _ := client.BoolVariation(alwaysFalseFlag.Key, testUser, true) + return value == false + }, time.Second*2, time.Millisecond*10, "expected FDv1 data (value=false) to replace FDv2 data") + }) +} + +// When the polling initializer receives x-ld-fd-fallback from the server, the SDK should skip any +// remaining FDv2 synchronizers and switch to the FDv1 polling synchronizer directly — without ever +// attempting the FDv2 streaming synchronizer. +func TestFDV2CanFallBackToV1FromInitializer(t *testing.T) { + dataV1 := ldservices.NewServerSDKData().Flags(alwaysFalseFlag) + + header := http.Header{ + "X-LD-FD-Fallback": []string{"true"}, + } + + // FDv2 polling initializer: returns 500 + fallback header. Must trigger fallback to FDv1 before + // the FDv2 streaming synchronizer is ever dialed. + pollV2InitRecordingHandler, pollV2InitReqCh := httphelpers.RecordingHandler(httphelpers.HandlerWithResponse(500, header, nil)) + // FDv1 polling synchronizer: returns valid FDv1 data. + pollV1SyncRecordingHandler, pollV1SyncReqCh := httphelpers.RecordingHandler(ldservices.ServerSidePollingServiceHandler(dataV1)) + // FDv2 streaming synchronizer: should never be hit. If it is, the test will surface it via + // streamV2SyncReqCh. + streamHandler, streamV2SyncReqCh := httphelpers.RecordingHandler(httphelpers.HandlerWithResponse(500, header, nil)) + + handler := httphelpers.SequentialHandler(pollV2InitRecordingHandler, pollV1SyncRecordingHandler, streamHandler) + + httphelpers.WithServer(handler, func(server *httptest.Server) { + logCapture := ldlogtest.NewMockLog() + + config := Config{ + Events: ldcomponents.NoEvents(), + Logging: ldcomponents.Logging().Loggers(logCapture.Loggers), + DataSystem: ldcomponents.DataSystem().WithRelayProxyEndpoints(server.URL).Default(), + } + + client, err := MakeCustomClient(testSdkKey, config, time.Second*5) + + <-pollV2InitReqCh + <-pollV1SyncReqCh + + require.NoError(t, err) + defer client.Close() + + reached := client.GetDataSourceStatusProvider().WaitFor(interfaces.DataSourceStateValid, time.Second*5) + require.True(t, reached, "timed out waiting for data source to reach VALID state") + + assertNoMoreRequests(t, streamV2SyncReqCh) + + value, _ := client.BoolVariation(alwaysFalseFlag.Key, testUser, true) + assert.False(t, value) + }) +} + func TestFDV2StreamingSynchronizer(t *testing.T) { data := ldservicesv2.NewServerSDKData().Flags(alwaysTrueFlag) diff --git a/ldfiledatav2/file_data_source_impl.go b/ldfiledatav2/file_data_source_impl.go index e1496a31..fb177d55 100644 --- a/ldfiledatav2/file_data_source_impl.go +++ b/ldfiledatav2/file_data_source_impl.go @@ -135,7 +135,7 @@ func (fs *fileDataSource) Sync(ds subsystems.DataSelector) <-chan subsystems.Dat return resultChan } -func (fs *fileDataSource) Fetch(ds subsystems.DataSelector, ctx context.Context) (*subsystems.Basis, error) { +func (fs *fileDataSource) Fetch(ds subsystems.DataSelector, ctx context.Context) (*subsystems.Basis, bool, error) { changeSetChan := fs.changeSetBroadcaster.AddListener() statusChan := fs.statusBroadcaster.AddListener() @@ -178,7 +178,7 @@ func (fs *fileDataSource) Fetch(ds subsystems.DataSelector, ctx context.Context) wg.Wait() - return basis, err + return basis, false, err } // Reload tells the data source to immediately attempt to reread all of the configured source files @@ -223,7 +223,7 @@ func (fs *fileDataSource) reload() { Message: err.Error(), Time: time.Time{}, }, - RevertToFDv1: false, + FallbackToFDv1: false, }) fs.loggers.Error(err) } diff --git a/ldfiledatav2/file_data_source_test.go b/ldfiledatav2/file_data_source_test.go index 1f50945f..a5d42d09 100644 --- a/ldfiledatav2/file_data_source_test.go +++ b/ldfiledatav2/file_data_source_test.go @@ -284,8 +284,9 @@ func TestSuccessfullyLoadsJsonFlagsAsInitializer(t *testing.T) { initializer, err := factory.AsInitializer().Build(subsystems.BasicClientContext{}) assert.NoError(t, err) - basis, err := initializer.Fetch(mocks.NewMockDataSelector(subsystems.NoSelector()), context.Background()) + basis, fallback, err := initializer.Fetch(mocks.NewMockDataSelector(subsystems.NoSelector()), context.Background()) assert.NoError(t, err) + assert.False(t, fallback) assert.NotNil(t, basis) assert.Len(t, basis.ChangeSet.Changes(), 2) @@ -310,7 +311,7 @@ func TestInitializerReturnsErrorIfFileDoesNotExist(t *testing.T) { initializer, err := factory.AsInitializer().Build(subsystems.BasicClientContext{}) assert.NoError(t, err) - _, err = initializer.Fetch(mocks.NewMockDataSelector(subsystems.NoSelector()), context.Background()) + _, _, err = initializer.Fetch(mocks.NewMockDataSelector(subsystems.NoSelector()), context.Background()) assert.Error(t, err) }) } diff --git a/subsystems/data_source.go b/subsystems/data_source.go index 82f0cb29..e6bcc102 100644 --- a/subsystems/data_source.go +++ b/subsystems/data_source.go @@ -45,18 +45,21 @@ type DataSelector interface { type DataInitializer interface { // Name returns the name of the data initializer. Name() string - // Fetch returns a Basis, or an error if the Basis could not be retrieved. If the context has expired, - // return the context's error. - Fetch(ds DataSelector, ctx context.Context) (*Basis, error) + // Fetch returns a Basis, or an error if the Basis could not be retrieved. If the context has + // expired, it returns the context's error. If the LaunchDarkly server has instructed the SDK + // to fall back to the FDv1 protocol (via the x-ld-fd-fallback response header), fallbackToFDv1 + // is true; in that case the Basis is nil and err may be nil or carry the underlying HTTP + // error for logging. Callers should branch on fallbackToFDv1 before err. + Fetch(ds DataSelector, ctx context.Context) (basis *Basis, fallbackToFDv1 bool, err error) } // DataSynchronizerResult represents the results of a Synchronizer's ongoing Sync method. type DataSynchronizerResult struct { - ChangeSet *ChangeSet - State interfaces.DataSourceState - Error interfaces.DataSourceErrorInfo - RevertToFDv1 bool - EnvironmentID ldvalue.OptionalString + ChangeSet *ChangeSet + State interfaces.DataSourceState + Error interfaces.DataSourceErrorInfo + FallbackToFDv1 bool + EnvironmentID ldvalue.OptionalString } // DataSynchronizer represents a component capable of obtaining a Basis and subsequent delta updates asynchronously. diff --git a/subsystems/datasystem_configuration.go b/subsystems/datasystem_configuration.go index 473b3937..b7f0e688 100644 --- a/subsystems/datasystem_configuration.go +++ b/subsystems/datasystem_configuration.go @@ -8,7 +8,7 @@ type SynchronizersConfiguration struct { SynchronizerBuilders []func() (DataSynchronizer, error) // FDv1FallbackBuilder is a special fallback used only when a synchronizer - // returns RevertToFDv1=true. When activated, the system abandons the synchronizer list + // returns FallbackToFDv1=true. When activated, the system abandons the synchronizer list // and switches to FDv1-only mode. FDv1FallbackBuilder func() (DataSynchronizer, error) } diff --git a/testhelpers/ldtestdatav2/test_data_source.go b/testhelpers/ldtestdatav2/test_data_source.go index f252ef3e..90a17566 100644 --- a/testhelpers/ldtestdatav2/test_data_source.go +++ b/testhelpers/ldtestdatav2/test_data_source.go @@ -316,8 +316,9 @@ func (d *testDataSourceImpl) Name() string { return "TestDataSynchronizer" } -func (d *testDataSourceImpl) Fetch(ds subsystems.DataSelector, ctx context.Context) (*subsystems.Basis, error) { - return d.owner.makeBasis() +func (d *testDataSourceImpl) Fetch(ds subsystems.DataSelector, ctx context.Context) (*subsystems.Basis, bool, error) { + basis, err := d.owner.makeBasis() + return basis, false, err } func (d *testDataSourceImpl) Sync(ds subsystems.DataSelector) <-chan subsystems.DataSynchronizerResult { @@ -327,8 +328,8 @@ func (d *testDataSourceImpl) Sync(ds subsystems.DataSelector) <-chan subsystems. statusChan := d.owner.statusBroadcaster.AddListener() result := subsystems.DataSynchronizerResult{ - State: interfaces.DataSourceStateInitializing, - RevertToFDv1: false, + State: interfaces.DataSourceStateInitializing, + FallbackToFDv1: false, } go func() { diff --git a/testhelpers/ldtestdatav2/test_data_source_test.go b/testhelpers/ldtestdatav2/test_data_source_test.go index 0710800a..e518ce61 100644 --- a/testhelpers/ldtestdatav2/test_data_source_test.go +++ b/testhelpers/ldtestdatav2/test_data_source_test.go @@ -53,7 +53,7 @@ func TestCanBeUsedAsInitializer(t *testing.T) { assert.NoError(t, err) selector := mocks.NewMockDataSelector(subsystems.NoSelector()) - basis, err := initializer.Fetch(selector, context.Background()) + basis, _, err := initializer.Fetch(selector, context.Background()) assert.NoError(t, err) changes := basis.ChangeSet.Changes() @@ -70,7 +70,7 @@ func TestCanBeUsedAsInitializer(t *testing.T) { td.Update(td.Flag("flag1").On(false)) - basis, err = initializer.Fetch(selector, context.Background()) + basis, _, err = initializer.Fetch(selector, context.Background()) assert.NoError(t, err) changes = basis.ChangeSet.Changes()