Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions internal/datasourcev2/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@ import (
"github.com/launchdarkly/go-sdk-common/v3/ldlog"
)

// fdv1FallbackHeader is the response header LaunchDarkly uses to instruct an SDK to abandon the
// FDv2 protocol and fall back to FDv1.
const fdv1FallbackHeader = "X-LD-FD-Fallback"

// isFDv1FallbackRequested reports whether the response headers signal that the SDK should revert
// to the FDv1 protocol.
func isFDv1FallbackRequested(h http.Header) bool {
return h.Get(fdv1FallbackHeader) == "true"
}

type httpStatusError struct {
Message string
Code int
Expand Down
36 changes: 27 additions & 9 deletions internal/datasourcev2/polling_data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/launchdarkly/go-sdk-common/v3/ldlog"
"github.com/launchdarkly/go-sdk-common/v3/ldvalue"
"github.com/launchdarkly/go-server-sdk/v7/interfaces"
"github.com/launchdarkly/go-server-sdk/v7/internal"
"github.com/launchdarkly/go-server-sdk/v7/internal/datasource"
Expand Down Expand Up @@ -68,13 +69,14 @@ func (pp *PollingProcessor) Name() string {
}

//nolint:revive // DataInitializer method.
func (pp *PollingProcessor) Fetch(ds subsystems.DataSelector, ctx context.Context) (*subsystems.Basis, error) {
func (pp *PollingProcessor) Fetch(ds subsystems.DataSelector, ctx context.Context) (*subsystems.Basis, bool, error) {
changeSet, headers, err := pp.requester.Request(ctx, ds.Selector())
fallback := isFDv1FallbackRequested(headers)
if err != nil {
return nil, err
return nil, fallback, err
}
environmentID := internal.NewInitMetadataFromHeaders(headers).GetEnvironmentID()
return &subsystems.Basis{ChangeSet: *changeSet, Persist: true, EnvironmentID: environmentID}, nil
return &subsystems.Basis{ChangeSet: *changeSet, Persist: true, EnvironmentID: environmentID}, fallback, nil
}

//nolint:revive // DataSynchronizer method.
Expand Down Expand Up @@ -106,7 +108,16 @@ func (pp *PollingProcessor) Sync(ds subsystems.DataSelector) <-chan subsystems.D
close(resultChan)
return
case <-ticker.C:
if err := pp.poll(ctx, ds, resultChan); err != nil {
fallbackEnvID, fallback, err := pp.poll(ctx, ds, resultChan)
if err == nil && fallback {
resultChan <- subsystems.DataSynchronizerResult{
State: interfaces.DataSourceStateOff,
RevertToFDv1: true,
EnvironmentID: fallbackEnvID,
}
return
}
if err != nil {
if hse, ok := err.(httpStatusError); ok {
environmentID := internal.NewInitMetadataFromHeaders(hse.Header).GetEnvironmentID()

Expand All @@ -116,7 +127,7 @@ func (pp *PollingProcessor) Sync(ds subsystems.DataSelector) <-chan subsystems.D
Time: time.Now(),
}

if hse.Header.Get("X-LD-FD-Fallback") == "true" {
if isFDv1FallbackRequested(hse.Header) {
resultChan <- subsystems.DataSynchronizerResult{
State: interfaces.DataSourceStateOff,
Error: errorInfo,
Expand Down Expand Up @@ -171,21 +182,28 @@ func (pp *PollingProcessor) Sync(ds subsystems.DataSelector) <-chan subsystems.D
return resultChan
}

// poll performs a single polling request. On success it emits a Valid result with any
// accompanying ChangeSet. It returns (environmentID, fallback, err) where fallback=true indicates
// the response headers carried x-ld-fd-fallback: true; in that case the caller is responsible
// for emitting the Off/RevertToFDv1 signal and exiting the sync goroutine. A non-nil err means
// the request failed and no Valid result was emitted; the caller handles it per the existing
// error path.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the pros/cons of returning the bool from this function (and having the polling data source deal with it) vs sending the bool through resultChan via DataSynchronizerResult.RevertToFDv1?

func (pp *PollingProcessor) poll(
ctx context.Context, ds subsystems.DataSelector, resultChan chan<- subsystems.DataSynchronizerResult,
) error {
) (ldvalue.OptionalString, bool, error) {
changeSet, headers, err := pp.requester.Request(ctx, ds.Selector())
if err != nil {
return err
return ldvalue.OptionalString{}, false, err
}

environmentID := internal.NewInitMetadataFromHeaders(headers).GetEnvironmentID()
resultChan <- subsystems.DataSynchronizerResult{
ChangeSet: changeSet,
State: interfaces.DataSourceStateValid,
EnvironmentID: internal.NewInitMetadataFromHeaders(headers).GetEnvironmentID(),
EnvironmentID: environmentID,
}

return nil
return environmentID, isFDv1FallbackRequested(headers), nil
}

//nolint:revive // no doc comment for standard method
Expand Down
122 changes: 118 additions & 4 deletions internal/datasourcev2/polling_data_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/launchdarkly/go-server-sdk/v7/testhelpers/ldservicesv2"
"github.com/launchdarkly/go-test-helpers/v3/httphelpers"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var alwaysTrueFlag = ldbuilders.NewFlagBuilder("always-true-flag").SingleVariation(ldvalue.Bool(true)).Build()
Expand All @@ -40,7 +41,7 @@ func TestPollingProcessorInitializerCanMakeSuccessfulRequest(t *testing.T) {
)
defer processor.Close()

basis, err := processor.Fetch(ds, context.Background())
basis, _, err := processor.Fetch(ds, context.Background())
assert.NoError(t, err)
assert.Len(t, basis.ChangeSet.Changes(), 1)
assert.Equal(t, basis.ChangeSet.IntentCode(), subsystems.IntentTransferFull)
Expand All @@ -67,7 +68,7 @@ func TestPollingProcessorInitializerAppendsFilterParameter(t *testing.T) {
},
)
defer processor.Close()
_, err := processor.Fetch(ds, context.Background())
_, _, err := processor.Fetch(ds, context.Background())
assert.NoError(t, err)

r := <-requestsCh
Expand All @@ -90,7 +91,7 @@ func TestPollingProcessorInitializerAppendsBasisParameter(t *testing.T) {
defer processor.Close()

ds := mocks.NewMockDataSelector(subsystems.NewSelector("test-state", 1))
_, err := processor.Fetch(ds, context.Background())
_, _, err := processor.Fetch(ds, context.Background())
assert.NoError(t, err)

r := <-requestsCh
Expand All @@ -115,7 +116,7 @@ func TestPollingProcessorSynchronizerAppendsFilterParameter(t *testing.T) {
)
defer processor.Close()

_, err := processor.Fetch(ds, context.Background())
_, _, err := processor.Fetch(ds, context.Background())
assert.NoError(t, err)

r := <-requestsCh
Expand Down Expand Up @@ -235,6 +236,42 @@ func TestPollingProcessorSynchronizerHandlesInvalidJSON(t *testing.T) {
})
}

func TestPollingProcessorSynchronizerHandlesFallbackOnSuccessfulResponse(t *testing.T) {
ds := mocks.NewMockDataSelector(subsystems.NoSelector())
data := ldservicesv2.NewServerSDKData().Flags(alwaysTrueFlag).ToInitializerPayload(subsystems.NewSelector("test-state", 1))

// Wrap the valid 200 handler so the response also carries x-ld-fd-fallback: true.
underlying := ldservices.ServerSidePollingV2ServiceHandler(data)
fallbackOnSuccess := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("X-LD-FD-Fallback", "true")
underlying.ServeHTTP(w, r)
})

httphelpers.WithServer(fallbackOnSuccess, func(ts *httptest.Server) {
processor := NewPollingProcessor(
sharedtest.BasicClientContext(),
datasource.PollingConfig{
BaseURI: ts.URL,
PollInterval: time.Minute * 30,
},
)
defer processor.Close()

resultChan := processor.Sync(ds)

// First result: valid payload applied before switching protocols.
first := <-resultChan
assert.Equal(t, interfaces.DataSourceStateValid, first.State)
require.NotNil(t, first.ChangeSet)
assert.Len(t, first.ChangeSet.Changes(), 1)

// Second result: FDv1 fallback.
second := <-resultChan
assert.Equal(t, interfaces.DataSourceStateOff, second.State)
assert.True(t, second.RevertToFDv1)
})
}

func TestPollingProcessorSynchronizerHandlesFallbackToFDv2(t *testing.T) {
ds := mocks.NewMockDataSelector(subsystems.NoSelector())

Expand All @@ -261,6 +298,83 @@ func TestPollingProcessorSynchronizerHandlesFallbackToFDv2(t *testing.T) {
})
}

func TestPollingProcessorInitializerHandlesFallbackOnSuccessfulResponse(t *testing.T) {
ds := mocks.NewMockDataSelector(subsystems.NoSelector())
data := ldservicesv2.NewServerSDKData().Flags(alwaysTrueFlag).ToInitializerPayload(subsystems.NewSelector("test-state", 1))

// Wrap the valid 200 handler to inject X-LD-FD-Fallback alongside a well-formed payload.
underlying := ldservices.ServerSidePollingV2ServiceHandler(data)
fallbackOn200 := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("X-LD-FD-Fallback", "true")
underlying.ServeHTTP(w, r)
})

httphelpers.WithServer(fallbackOn200, func(ts *httptest.Server) {
processor := NewPollingProcessor(
sharedtest.BasicClientContext(),
datasource.PollingConfig{
BaseURI: ts.URL,
PollInterval: time.Minute * 30,
},
)
defer processor.Close()

basis, fallback, err := processor.Fetch(ds, context.Background())
assert.True(t, fallback)
assert.NoError(t, err)
// Even when the server signals fallback, a valid payload in the same response
// should still be surfaced so the caller can apply it before switching protocols.
require.NotNil(t, basis)
assert.Len(t, basis.ChangeSet.Changes(), 1)
assert.Equal(t, "test-state", basis.ChangeSet.Selector().State())
})
}

func TestPollingProcessorInitializerHandlesFallbackOnErrorResponse(t *testing.T) {
ds := mocks.NewMockDataSelector(subsystems.NoSelector())

fallbackHeader := http.Header{
"X-LD-FD-Fallback": []string{"true"},
}
handler, _ := httphelpers.RecordingHandler(httphelpers.HandlerWithResponse(500, fallbackHeader, nil))
httphelpers.WithServer(handler, func(ts *httptest.Server) {
processor := NewPollingProcessor(
sharedtest.BasicClientContext(),
datasource.PollingConfig{
BaseURI: ts.URL,
PollInterval: time.Minute * 30,
},
)
defer processor.Close()

basis, fallback, err := processor.Fetch(ds, context.Background())
assert.Nil(t, basis)
assert.True(t, fallback)
assert.Error(t, err, "underlying HTTP error should still be surfaced alongside the fallback signal")
})
}

func TestPollingProcessorInitializerErrorWithoutFallbackHeaderReturnsRegularError(t *testing.T) {
ds := mocks.NewMockDataSelector(subsystems.NoSelector())

handler, _ := httphelpers.RecordingHandler(httphelpers.HandlerWithStatus(500))
httphelpers.WithServer(handler, func(ts *httptest.Server) {
processor := NewPollingProcessor(
sharedtest.BasicClientContext(),
datasource.PollingConfig{
BaseURI: ts.URL,
PollInterval: time.Minute * 30,
},
)
defer processor.Close()

basis, fallback, err := processor.Fetch(ds, context.Background())
assert.Nil(t, basis)
assert.False(t, fallback)
assert.Error(t, err)
})
}

func TestPollingProcessorSynchronizerHandlesRecoverableErrors(t *testing.T) {
for _, statusCode := range []int{400, 408, 429, 500, 503} {
t.Run(fmt.Sprintf("handles recoverable error %d", statusCode), func(t *testing.T) {
Expand Down
Loading
Loading