Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions interfaces/data_source_status_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ type DataSourceStatusProvider interface {
//
// Synchronizers are expected to emit these status updates on the channel returned by Sync().
type DataSynchronizerStatus struct {
State DataSourceState
Error DataSourceErrorInfo
RevertToFDv1 bool
State DataSourceState
Error DataSourceErrorInfo
FallbackToFDv1 bool
}

// DataSourceStatus is information about the data source's status and the last status change.
Expand Down
10 changes: 10 additions & 0 deletions internal/datasourcev2/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@ import (
"github.com/launchdarkly/go-sdk-common/v3/ldlog"
)

// fdv1FallbackHeader is the response header LaunchDarkly uses to instruct an SDK to abandon the
// FDv2 protocol and fall back to FDv1.
const fdv1FallbackHeader = "X-LD-FD-Fallback"

// isFDv1FallbackRequested reports whether the response headers signal that the SDK should fall
// back to the FDv1 protocol.
func isFDv1FallbackRequested(h http.Header) bool {
return h.Get(fdv1FallbackHeader) == "true"
}

type httpStatusError struct {
Message string
Code int
Expand Down
144 changes: 76 additions & 68 deletions internal/datasourcev2/polling_data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,14 @@ func (pp *PollingProcessor) Name() string {
}

//nolint:revive // DataInitializer method.
func (pp *PollingProcessor) Fetch(ds subsystems.DataSelector, ctx context.Context) (*subsystems.Basis, error) {
func (pp *PollingProcessor) Fetch(ds subsystems.DataSelector, ctx context.Context) (*subsystems.Basis, bool, error) {
changeSet, headers, err := pp.requester.Request(ctx, ds.Selector())
fallback := isFDv1FallbackRequested(headers)
if err != nil {
return nil, err
return nil, fallback, err
}
environmentID := internal.NewInitMetadataFromHeaders(headers).GetEnvironmentID()
return &subsystems.Basis{ChangeSet: *changeSet, Persist: true, EnvironmentID: environmentID}, nil
return &subsystems.Basis{ChangeSet: *changeSet, Persist: true, EnvironmentID: environmentID}, fallback, nil
}

//nolint:revive // DataSynchronizer method.
Expand Down Expand Up @@ -106,86 +107,93 @@ func (pp *PollingProcessor) Sync(ds subsystems.DataSelector) <-chan subsystems.D
close(resultChan)
return
case <-ticker.C:
if err := pp.poll(ctx, ds, resultChan); err != nil {
if hse, ok := err.(httpStatusError); ok {
environmentID := internal.NewInitMetadataFromHeaders(hse.Header).GetEnvironmentID()

errorInfo := interfaces.DataSourceErrorInfo{
Kind: interfaces.DataSourceErrorKindErrorResponse,
StatusCode: hse.Code,
Time: time.Now(),
}

if hse.Header.Get("X-LD-FD-Fallback") == "true" {
resultChan <- subsystems.DataSynchronizerResult{
State: interfaces.DataSourceStateOff,
Error: errorInfo,
RevertToFDv1: true,
EnvironmentID: environmentID,
}
return
}

recoverable := checkIfErrorIsRecoverableAndLog(
pp.loggers,
httpErrorDescription(hse.Code),
pollingErrorContext,
hse.Code,
pollingWillRetryMessage,
)
if recoverable {
resultChan <- subsystems.DataSynchronizerResult{
State: interfaces.DataSourceStateInterrupted,
Error: errorInfo,
EnvironmentID: environmentID,
}
} else {
resultChan <- subsystems.DataSynchronizerResult{
State: interfaces.DataSourceStateOff,
Error: errorInfo,
EnvironmentID: environmentID,
}
return
}
} else {
errorInfo := interfaces.DataSourceErrorInfo{
Kind: interfaces.DataSourceErrorKindNetworkError,
Message: err.Error(),
Time: time.Now(),
}
if _, ok := err.(malformedJSONError); ok {
errorInfo.Kind = interfaces.DataSourceErrorKindInvalidData
}
checkIfErrorIsRecoverableAndLog(pp.loggers, err.Error(), pollingErrorContext, 0, pollingWillRetryMessage)
resultChan <- subsystems.DataSynchronizerResult{
State: interfaces.DataSourceStateInterrupted,
Error: errorInfo,
}
}
result, err := pp.poll(ctx, ds)

// When the server requested FDv1 fallback, dispatch the result as-is -- poll has
// already populated State (Valid on success, Off on error) and FallbackToFDv1=true.
if result.FallbackToFDv1 {
resultChan <- result
return
}

if err == nil {
resultChan <- result
continue
}

// Non-fallback error: the caller may downgrade Off --> Interrupted when the error
// is recoverable. Log at the appropriate level.
if hse, ok := err.(httpStatusError); ok {
if checkIfErrorIsRecoverableAndLog(
pp.loggers,
httpErrorDescription(hse.Code),
pollingErrorContext,
hse.Code,
pollingWillRetryMessage,
) {
result.State = interfaces.DataSourceStateInterrupted
resultChan <- result
continue
}
resultChan <- result // poll set State=Off
return
}

checkIfErrorIsRecoverableAndLog(pp.loggers, err.Error(), pollingErrorContext, 0, pollingWillRetryMessage)
result.State = interfaces.DataSourceStateInterrupted
resultChan <- result
}
}
}()

return resultChan
}

// poll performs a single polling request and builds a DataSynchronizerResult describing the
// outcome. The result's FallbackToFDv1 flag is always populated from the x-ld-fd-fallback response
// header, whether or not the request succeeded -- a 500 or a malformed-JSON body can still carry
// the fallback signal.
//
// On success: result.State = Valid, result.ChangeSet populated, err = nil.
// On error: result.State = Off (the safer default), result.Error populated with Kind/Message/
// StatusCode as appropriate, err returned so the caller can apply context-specific logic
// (e.g. downgrade Off --> Interrupted when the HTTP error is recoverable).
//
// The caller is responsible for publishing the result to its channel; poll does not touch any
// resultChan so it can be unit-tested in isolation.
func (pp *PollingProcessor) poll(
ctx context.Context, ds subsystems.DataSelector, resultChan chan<- subsystems.DataSynchronizerResult,
) error {
ctx context.Context, ds subsystems.DataSelector,
) (subsystems.DataSynchronizerResult, error) {
changeSet, headers, err := pp.requester.Request(ctx, ds.Selector())
if err != nil {
return err
result := subsystems.DataSynchronizerResult{
EnvironmentID: internal.NewInitMetadataFromHeaders(headers).GetEnvironmentID(),
FallbackToFDv1: isFDv1FallbackRequested(headers),
}

resultChan <- subsystems.DataSynchronizerResult{
ChangeSet: changeSet,
State: interfaces.DataSourceStateValid,
EnvironmentID: internal.NewInitMetadataFromHeaders(headers).GetEnvironmentID(),
if err == nil {
result.ChangeSet = changeSet
result.State = interfaces.DataSourceStateValid
return result, nil
}

return nil
result.State = interfaces.DataSourceStateOff
if hse, ok := err.(httpStatusError); ok {
result.Error = interfaces.DataSourceErrorInfo{
Kind: interfaces.DataSourceErrorKindErrorResponse,
StatusCode: hse.Code,
Time: time.Now(),
}
} else {
result.Error = interfaces.DataSourceErrorInfo{
Kind: interfaces.DataSourceErrorKindNetworkError,
Message: err.Error(),
Time: time.Now(),
}
if _, ok := err.(malformedJSONError); ok {
result.Error.Kind = interfaces.DataSourceErrorKindInvalidData
}
}
return result, err
}

//nolint:revive // no doc comment for standard method
Expand Down
Loading
Loading