Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions internal/datasourcev2/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@ import (
"github.com/launchdarkly/go-sdk-common/v3/ldlog"
)

// fdv1FallbackHeader is the response header LaunchDarkly uses to instruct an SDK to abandon the
// FDv2 protocol and fall back to FDv1.
const fdv1FallbackHeader = "X-LD-FD-Fallback"

// isFDv1FallbackRequested reports whether the response headers signal that the SDK should revert
// to the FDv1 protocol.
func isFDv1FallbackRequested(h http.Header) bool {
return h.Get(fdv1FallbackHeader) == "true"
}

type httpStatusError struct {
Message string
Code int
Expand Down
9 changes: 5 additions & 4 deletions internal/datasourcev2/polling_data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,14 @@ func (pp *PollingProcessor) Name() string {
}

//nolint:revive // DataInitializer method.
func (pp *PollingProcessor) Fetch(ds subsystems.DataSelector, ctx context.Context) (*subsystems.Basis, error) {
func (pp *PollingProcessor) Fetch(ds subsystems.DataSelector, ctx context.Context) (*subsystems.Basis, bool, error) {
changeSet, headers, err := pp.requester.Request(ctx, ds.Selector())
fallback := isFDv1FallbackRequested(headers)
if err != nil {
return nil, err
return nil, fallback, err
}
environmentID := internal.NewInitMetadataFromHeaders(headers).GetEnvironmentID()
return &subsystems.Basis{ChangeSet: *changeSet, Persist: true, EnvironmentID: environmentID}, nil
return &subsystems.Basis{ChangeSet: *changeSet, Persist: true, EnvironmentID: environmentID}, fallback, nil
}

//nolint:revive // DataSynchronizer method.
Expand Down Expand Up @@ -116,7 +117,7 @@ func (pp *PollingProcessor) Sync(ds subsystems.DataSelector) <-chan subsystems.D
Time: time.Now(),
}

if hse.Header.Get("X-LD-FD-Fallback") == "true" {
if isFDv1FallbackRequested(hse.Header) {
resultChan <- subsystems.DataSynchronizerResult{
State: interfaces.DataSourceStateOff,
Error: errorInfo,
Expand Down
86 changes: 82 additions & 4 deletions internal/datasourcev2/polling_data_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/launchdarkly/go-server-sdk/v7/testhelpers/ldservicesv2"
"github.com/launchdarkly/go-test-helpers/v3/httphelpers"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var alwaysTrueFlag = ldbuilders.NewFlagBuilder("always-true-flag").SingleVariation(ldvalue.Bool(true)).Build()
Expand All @@ -40,7 +41,7 @@ func TestPollingProcessorInitializerCanMakeSuccessfulRequest(t *testing.T) {
)
defer processor.Close()

basis, err := processor.Fetch(ds, context.Background())
basis, _, err := processor.Fetch(ds, context.Background())
assert.NoError(t, err)
assert.Len(t, basis.ChangeSet.Changes(), 1)
assert.Equal(t, basis.ChangeSet.IntentCode(), subsystems.IntentTransferFull)
Expand All @@ -67,7 +68,7 @@ func TestPollingProcessorInitializerAppendsFilterParameter(t *testing.T) {
},
)
defer processor.Close()
_, err := processor.Fetch(ds, context.Background())
_, _, err := processor.Fetch(ds, context.Background())
assert.NoError(t, err)

r := <-requestsCh
Expand All @@ -90,7 +91,7 @@ func TestPollingProcessorInitializerAppendsBasisParameter(t *testing.T) {
defer processor.Close()

ds := mocks.NewMockDataSelector(subsystems.NewSelector("test-state", 1))
_, err := processor.Fetch(ds, context.Background())
_, _, err := processor.Fetch(ds, context.Background())
assert.NoError(t, err)

r := <-requestsCh
Expand All @@ -115,7 +116,7 @@ func TestPollingProcessorSynchronizerAppendsFilterParameter(t *testing.T) {
)
defer processor.Close()

_, err := processor.Fetch(ds, context.Background())
_, _, err := processor.Fetch(ds, context.Background())
assert.NoError(t, err)

r := <-requestsCh
Expand Down Expand Up @@ -261,6 +262,83 @@ func TestPollingProcessorSynchronizerHandlesFallbackToFDv2(t *testing.T) {
})
}

func TestPollingProcessorInitializerHandlesFallbackOnSuccessfulResponse(t *testing.T) {
ds := mocks.NewMockDataSelector(subsystems.NoSelector())
data := ldservicesv2.NewServerSDKData().Flags(alwaysTrueFlag).ToInitializerPayload(subsystems.NewSelector("test-state", 1))

// Wrap the valid 200 handler to inject X-LD-FD-Fallback alongside a well-formed payload.
underlying := ldservices.ServerSidePollingV2ServiceHandler(data)
fallbackOn200 := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("X-LD-FD-Fallback", "true")
underlying.ServeHTTP(w, r)
})

httphelpers.WithServer(fallbackOn200, func(ts *httptest.Server) {
processor := NewPollingProcessor(
sharedtest.BasicClientContext(),
datasource.PollingConfig{
BaseURI: ts.URL,
PollInterval: time.Minute * 30,
},
)
defer processor.Close()

basis, fallback, err := processor.Fetch(ds, context.Background())
assert.True(t, fallback)
assert.NoError(t, err)
// Even when the server signals fallback, a valid payload in the same response
// should still be surfaced so the caller can apply it before switching protocols.
require.NotNil(t, basis)
assert.Len(t, basis.ChangeSet.Changes(), 1)
assert.Equal(t, "test-state", basis.ChangeSet.Selector().State())
})
}

func TestPollingProcessorInitializerHandlesFallbackOnErrorResponse(t *testing.T) {
ds := mocks.NewMockDataSelector(subsystems.NoSelector())

fallbackHeader := http.Header{
"X-LD-FD-Fallback": []string{"true"},
}
handler, _ := httphelpers.RecordingHandler(httphelpers.HandlerWithResponse(500, fallbackHeader, nil))
httphelpers.WithServer(handler, func(ts *httptest.Server) {
processor := NewPollingProcessor(
sharedtest.BasicClientContext(),
datasource.PollingConfig{
BaseURI: ts.URL,
PollInterval: time.Minute * 30,
},
)
defer processor.Close()

basis, fallback, err := processor.Fetch(ds, context.Background())
assert.Nil(t, basis)
assert.True(t, fallback)
assert.Error(t, err, "underlying HTTP error should still be surfaced alongside the fallback signal")
})
}

func TestPollingProcessorInitializerErrorWithoutFallbackHeaderReturnsRegularError(t *testing.T) {
ds := mocks.NewMockDataSelector(subsystems.NoSelector())

handler, _ := httphelpers.RecordingHandler(httphelpers.HandlerWithStatus(500))
httphelpers.WithServer(handler, func(ts *httptest.Server) {
processor := NewPollingProcessor(
sharedtest.BasicClientContext(),
datasource.PollingConfig{
BaseURI: ts.URL,
PollInterval: time.Minute * 30,
},
)
defer processor.Close()

basis, fallback, err := processor.Fetch(ds, context.Background())
assert.Nil(t, basis)
assert.False(t, fallback)
assert.Error(t, err)
})
}

func TestPollingProcessorSynchronizerHandlesRecoverableErrors(t *testing.T) {
for _, statusCode := range []int{400, 408, 429, 500, 503} {
t.Run(fmt.Sprintf("handles recoverable error %d", statusCode), func(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions internal/datasourcev2/streaming_data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ func (sp *StreamProcessor) Name() string {
}

//nolint:revive // DataInitializer method.
func (sp *StreamProcessor) Fetch(ds subsystems.DataSelector, _ context.Context) (*subsystems.Basis, error) {
return nil, errors.New("StreamProcessor does not implement Fetch capability")
func (sp *StreamProcessor) Fetch(ds subsystems.DataSelector, _ context.Context) (*subsystems.Basis, bool, error) {
return nil, false, errors.New("StreamProcessor does not implement Fetch capability")
}

//nolint:revive // DataSynchronizer method.
Expand Down Expand Up @@ -353,7 +353,7 @@ func (sp *StreamProcessor) subscribe(ds subsystems.DataSelector, resultChan chan
Time: time.Now(),
}

if se.Header.Get("X-LD-FD-Fallback") == "true" {
if isFDv1FallbackRequested(se.Header) {
resultChan <- subsystems.DataSynchronizerResult{
State: interfaces.DataSourceStateOff,
Error: errorInfo,
Expand Down
3 changes: 2 additions & 1 deletion internal/datasourcev2/streaming_data_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ func TestStreamingDoesNotWorkAsInitializer(t *testing.T) {
)

defer sp.Close()
basis, err := sp.Fetch(ds, context.Background())
basis, fallback, err := sp.Fetch(ds, context.Background())
assert.Nil(t, basis)
assert.False(t, fallback)
assert.NotNil(t, err)
}

Expand Down
42 changes: 37 additions & 5 deletions internal/datasystem/fdv2_datasystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,17 @@ func (f *FDv2) launchTask(task func()) {
func (f *FDv2) run(ctx context.Context, closeWhenReady chan struct{}) {
f.UpdateStatus(interfaces.DataSourceStateInitializing, interfaces.DataSourceErrorInfo{})

f.runInitializers(ctx, closeWhenReady)
if f.runInitializers(ctx, closeWhenReady) {
if f.fdv1FallbackBuilder != nil {
f.loggers.Warn("Reverting to FDv1 protocol")
f.synchronizerBuilders = []func() (subsystems.DataSynchronizer, error){f.fdv1FallbackBuilder}
f.currentSyncIndex = 0
} else {
f.loggers.Warn("Initializer requested FDv1 fallback but none configured")
f.synchronizerBuilders = nil
f.UpdateStatus(interfaces.DataSourceStateOff, f.getStatus().LastError)
}
Comment thread
cursor[bot] marked this conversation as resolved.
Comment thread
cursor[bot] marked this conversation as resolved.
}

if f.configuredWithDataSources && f.dataStoreStatusProvider.IsStatusMonitoringEnabled() {
f.launchTask(func() {
Expand Down Expand Up @@ -235,12 +245,33 @@ func (f *FDv2) runPersistentStoreOutageRecovery(ctx context.Context, statuses <-
}
}

func (f *FDv2) runInitializers(ctx context.Context, closeWhenReady chan struct{}) {
// runInitializers runs each configured initializer in order until one succeeds, the context is
// cancelled, or an initializer signals a revert to FDv1. Returns true if an initializer signalled
// a revert to FDv1. If fallback is signalled alongside a valid Basis, that Basis is applied before
// returning so evaluations can serve the server-provided data while the FDv1 synchronizer spins up.
func (f *FDv2) runInitializers(ctx context.Context, closeWhenReady chan struct{}) (fallbackToFDv1 bool) {
for _, initializer := range f.initializers {
f.loggers.Infof("Attempting to initialize via %s", initializer.Name())
basis, err := initializer.Fetch(f.store, ctx)
basis, fallback, err := initializer.Fetch(f.store, ctx)
if errors.Is(err, context.Canceled) {
return
return false
}
if fallback {
if err != nil {
f.loggers.Warnf("Initializer %s requested fallback to FDv1 protocol: %v", initializer.Name(), err)
} else {
f.loggers.Warnf("Initializer %s requested fallback to FDv1 protocol", initializer.Name())
}
if basis != nil {
f.environmentIDProvider.SetEnvironmentID(basis.EnvironmentID)
f.store.Apply(basis.ChangeSet, basis.Persist)
Copy link
Copy Markdown
Contributor

@tanderson-ld tanderson-ld Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels like there could be a way to set this up such that the same apply block down below could be reused, but also not gonna hold up the PR on this. This block of code will probably go away long term.

Do you want f.loggers.Infof("Initialized via %s", initializer.Name())?

if basis.ChangeSet.Selector().IsDefined() {
f.readyOnce.Do(func() {
close(closeWhenReady)
})
}
}
return true
}
if err != nil {
f.loggers.Warnf("Initializer %s failed: %v", initializer.Name(), err)
Expand All @@ -253,9 +284,10 @@ func (f *FDv2) runInitializers(ctx context.Context, closeWhenReady chan struct{}
f.readyOnce.Do(func() {
close(closeWhenReady)
})
return
return false
}
}
return false
}

func (f *FDv2) runSynchronizers(ctx context.Context, closeWhenReady chan struct{}) {
Expand Down
91 changes: 91 additions & 0 deletions ldclient_end_to_end_fdv2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,97 @@ func TestFDV2CanFallBackToV1(t *testing.T) {
})
}

// When an initializer requests FDv1 fallback but no FDv1 fallback is configured, the data source
// status must transition to Off rather than staying stuck at Initializing. This mirrors the
// synchronizer-triggered path when fdv1FallbackBuilder is nil.
func TestFDV2InitializerFallbackWithoutFDv1FallbackTransitionsToOff(t *testing.T) {
header := http.Header{
"X-LD-FD-Fallback": []string{"true"},
}
handler, requestsCh := httphelpers.RecordingHandler(httphelpers.HandlerWithResponse(500, header, nil))

httphelpers.WithServer(handler, func(server *httptest.Server) {
logCapture := ldlogtest.NewMockLog()

// Custom data system: a polling initializer, no synchronizers, no FDv1 fallback.
config := Config{
Events: ldcomponents.NoEvents(),
Logging: ldcomponents.Logging().Loggers(logCapture.Loggers),
DataSystem: ldcomponents.DataSystem().Custom().Initializers(
ldcomponents.PollingDataSourceV2().BaseURI(server.URL).AsInitializer(),
),
}

client, err := MakeCustomClient(testSdkKey, config, time.Second*5)
require.Error(t, err)
require.NotNil(t, client)
defer client.Close()

<-requestsCh

// With no FDv1 fallback configured, an initializer-triggered fallback must transition the
// status to Off — if it stays at Initializing, MakeCustomClient treats it as an init
// failure and we see initializationFailedErrorMessage here. Either way the status field
// should end up Off, so assert that directly.
assert.Equal(t,
interfaces.DataSourceStateOff,
client.GetDataSourceStatusProvider().GetStatus().State,
"status should transition to Off when initializer fallback requested but no FDv1 fallback configured")
assert.Equal(t, initializationFailedErrorMessage, err.Error())

assert.Contains(t, logCapture.GetOutput(ldlog.Warn),
"Initializer requested FDv1 fallback but none configured")
})
}

// When the polling initializer receives x-ld-fd-fallback from the server, the SDK should skip any
// remaining FDv2 synchronizers and switch to the FDv1 polling synchronizer directly — without ever
// attempting the FDv2 streaming synchronizer.
func TestFDV2CanFallBackToV1FromInitializer(t *testing.T) {
dataV1 := ldservices.NewServerSDKData().Flags(alwaysFalseFlag)

header := http.Header{
"X-LD-FD-Fallback": []string{"true"},
}

// FDv2 polling initializer: returns 500 + fallback header. Must trigger revert to FDv1 before
// the FDv2 streaming synchronizer is ever dialed.
pollV2InitRecordingHandler, pollV2InitReqCh := httphelpers.RecordingHandler(httphelpers.HandlerWithResponse(500, header, nil))
// FDv1 polling synchronizer: returns valid FDv1 data.
pollV1SyncRecordingHandler, pollV1SyncReqCh := httphelpers.RecordingHandler(ldservices.ServerSidePollingServiceHandler(dataV1))
// FDv2 streaming synchronizer: should never be hit. If it is, the test will surface it via
// streamV2SyncReqCh.
streamHandler, streamV2SyncReqCh := httphelpers.RecordingHandler(httphelpers.HandlerWithResponse(500, header, nil))

handler := httphelpers.SequentialHandler(pollV2InitRecordingHandler, pollV1SyncRecordingHandler, streamHandler)

httphelpers.WithServer(handler, func(server *httptest.Server) {
logCapture := ldlogtest.NewMockLog()

config := Config{
Events: ldcomponents.NoEvents(),
Logging: ldcomponents.Logging().Loggers(logCapture.Loggers),
DataSystem: ldcomponents.DataSystem().WithRelayProxyEndpoints(server.URL).Default(),
}

client, err := MakeCustomClient(testSdkKey, config, time.Second*5)

<-pollV2InitReqCh
<-pollV1SyncReqCh

require.NoError(t, err)
defer client.Close()

reached := client.GetDataSourceStatusProvider().WaitFor(interfaces.DataSourceStateValid, time.Second*5)
require.True(t, reached, "timed out waiting for data source to reach VALID state")

assertNoMoreRequests(t, streamV2SyncReqCh)

value, _ := client.BoolVariation(alwaysFalseFlag.Key, testUser, true)
assert.False(t, value)
})
}

func TestFDV2StreamingSynchronizer(t *testing.T) {
data := ldservicesv2.NewServerSDKData().Flags(alwaysTrueFlag)

Expand Down
4 changes: 2 additions & 2 deletions ldfiledatav2/file_data_source_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (fs *fileDataSource) Sync(ds subsystems.DataSelector) <-chan subsystems.Dat
return resultChan
}

func (fs *fileDataSource) Fetch(ds subsystems.DataSelector, ctx context.Context) (*subsystems.Basis, error) {
func (fs *fileDataSource) Fetch(ds subsystems.DataSelector, ctx context.Context) (*subsystems.Basis, bool, error) {
changeSetChan := fs.changeSetBroadcaster.AddListener()
statusChan := fs.statusBroadcaster.AddListener()

Expand Down Expand Up @@ -178,7 +178,7 @@ func (fs *fileDataSource) Fetch(ds subsystems.DataSelector, ctx context.Context)

wg.Wait()

return basis, err
return basis, false, err
}

// Reload tells the data source to immediately attempt to reread all of the configured source files
Expand Down
Loading
Loading