Skip to content
25 changes: 19 additions & 6 deletions router/core/graph_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func (b BuildGraphMuxOptions) IsBaseGraph() bool {
type buildMultiGraphHandlerOptions struct {
baseMux *chi.Mux
featureFlagConfigs map[string]*nodev1.FeatureFlagRouterExecutionConfig
hashes map[string]routerconfig.HashInfo
reloadPersistentState *ReloadPersistentState
currentGraphMuxes map[string]*graphMux
changes *routerconfig.Changes
Expand Down Expand Up @@ -347,7 +348,11 @@ func newGraphServer(routerCtx context.Context, r *Router, response *routerconfig

if needNewBaseGraphMux {
// build new base grap mux
s.logger.Debug("Will build a new base graph mux for new graph server")
s.logger.With(
zap.String("old_execution_config_hash", response.Hashes[""].OldHash), // empty on router boot
zap.String("new_execution_config_hash", response.Hashes[""].NewHash),
).Debug("Will build a new base graph mux for new graph server")

gm, err = s.buildGraphMux(BuildGraphMuxOptions{
RouterConfigVersion: s.baseRouterConfigVersion,
EngineConfig: response.Config.GetEngineConfig(),
Expand All @@ -361,7 +366,9 @@ func newGraphServer(routerCtx context.Context, r *Router, response *routerconfig
return nil, fmt.Errorf("failed to build base mux: %w", err)
}
} else {
s.logger.Debug("Will reuse old base graph mux for new graph server")
s.logger.With(zap.String("execution_config_hash", response.Hashes[""].NewHash)).
Debug("Will reuse old base graph mux for new graph server")

gm = mux
reusedMuxes = append(reusedMuxes, reusedGraphMux{key: "", mux: mux})
}
Expand All @@ -377,6 +384,7 @@ func newGraphServer(routerCtx context.Context, r *Router, response *routerconfig
reloadPersistentState: r.reloadPersistentState,
currentGraphMuxes: currentMuxes,
changes: response.Changes,
hashes: response.Hashes,
defaultClientTLS: defaultGRPCClientTLS,
perSubgraphTLS: perSubgraphGRPCTLS,
})
Expand Down Expand Up @@ -581,17 +589,22 @@ func (s *graphServer) buildMultiGraphHandler(
if !hasChanged && !wasAdded {
oldGraphMux, exists := opts.currentGraphMuxes[featureFlagName]
if exists {
s.logger.Debug("will reuse feature flag mux for new graph server",
zap.String("flag", featureFlagName))
s.logger.Debug("Will reuse feature flag mux for new graph server",
zap.String("flag", featureFlagName),
zap.String("execution_config_hash", opts.hashes[featureFlagName].NewHash),
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
featureFlagToMux[featureFlagName] = oldGraphMux.mux
reused = append(reused, reusedGraphMux{key: featureFlagName, mux: oldGraphMux})
continue
}
}
}

s.logger.Debug("will create a new feature flag mux for new graph server",
zap.String("flag", featureFlagName))
s.logger.Debug("Will create a new feature flag mux for new graph server",
zap.String("flag", featureFlagName),
zap.String("old_execution_config_hash", opts.hashes[featureFlagName].OldHash), // empty on router boot
zap.String("new_execution_config_hash", opts.hashes[featureFlagName].NewHash),
)

gm, err := s.buildGraphMux(BuildGraphMuxOptions{
FeatureFlagName: featureFlagName,
Expand Down
29 changes: 26 additions & 3 deletions router/pkg/controlplane/configpoller/split_config_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func (p *splitConfigPoller) fetchAndAssembleAll(ctx context.Context, activeGraph
if hasIgnoredFeatureFlags {
if _, ok := p.configRules.IgnoredFeatureFlags[name]; ok {
p.logger.Info("Feature flag is ignored, skipping", zap.String("feature_flag", name))
delete(activeGraphs, name)
continue
}
}
Expand All @@ -136,6 +137,7 @@ func (p *splitConfigPoller) fetchAndAssembleAll(ctx context.Context, activeGraph
if err != nil {
if p.shouldIgnoreMissingFeatureFlag(err) {
p.logger.Warn("Feature flag config not found, skipping", zap.String("feature_flag", name))
delete(activeGraphs, name)
continue
}
return nil, fmt.Errorf("failed to fetch config for feature flag %q: %w", name, err)
Expand Down Expand Up @@ -177,13 +179,19 @@ func (p *splitConfigPoller) GetRouterConfig(ctx context.Context) (*routerconfig.
return nil, err
}

hashes := make(map[string]routerconfig.HashInfo, len(activeGraphs))
for ffName, hash := range activeGraphs {
hashes[ffName] = routerconfig.HashInfo{NewHash: hash}
}

p.knownHashes = activeGraphs
p.currentConfig = config
p.latestVersion = computeCompositeVersion(activeGraphs)

response := &routerconfig.Response{
Config: config,
Changes: nil, // purposefully nil to tell callers to rebuild everything since this is the initial fetch
Hashes: hashes,
}

return response, nil
Expand Down Expand Up @@ -214,23 +222,34 @@ func (p *splitConfigPoller) Subscribe(ctx context.Context, handler func(response
return
}

newVersion := computeCompositeVersion(mapperGraphs)
if newVersion == p.latestVersion {
Comment thread
dkorittki marked this conversation as resolved.
p.logger.Debug("No changes detected in engine config, keeping existing config")
return
}

// Determine what changed, was added, or was removed.
changes := routerconfig.Changes{
AddedConfigs: make(map[string]struct{}),
RemovedConfigs: make(map[string]struct{}),
ChangedConfigs: make(map[string]struct{}),
}
hashes := make(map[string]routerconfig.HashInfo, len(mapperGraphs))

for name, hash := range mapperGraphs {
if oldHash, exists := p.knownHashes[name]; !exists {
changes.AddedConfigs[name] = struct{}{}
hashes[name] = routerconfig.HashInfo{NewHash: hash}
} else if oldHash != hash {
changes.ChangedConfigs[name] = struct{}{}
hashes[name] = routerconfig.HashInfo{NewHash: hash, OldHash: oldHash}
}
}
for name := range p.knownHashes {
if _, exists := mapperGraphs[name]; !exists {
for name, oldHash := range p.knownHashes {
if newHash, exists := mapperGraphs[name]; !exists {
changes.RemovedConfigs[name] = struct{}{}
} else if oldHash == newHash {
hashes[name] = routerconfig.HashInfo{OldHash: oldHash, NewHash: newHash}
}
}

Expand All @@ -252,6 +271,7 @@ func (p *splitConfigPoller) Subscribe(ctx context.Context, handler func(response
delete(mapperGraphs, name)
delete(changes.ChangedConfigs, name)
delete(changes.AddedConfigs, name)
delete(hashes, name)

continue
}
Expand Down Expand Up @@ -296,7 +316,9 @@ func (p *splitConfigPoller) Subscribe(ctx context.Context, handler func(response
}
}

newVersion := computeCompositeVersion(mapperGraphs)
// Edge case where the fetch loop removes missing feature flags
// from mapperGraphs, which could make the version match again
newVersion = computeCompositeVersion(mapperGraphs)
if newVersion == p.latestVersion {
p.logger.Debug("No changes detected in engine config, keeping existing config")
return
Expand All @@ -305,6 +327,7 @@ func (p *splitConfigPoller) Subscribe(ctx context.Context, handler func(response
response := &routerconfig.Response{
Config: patched,
Changes: &changes,
Hashes: hashes,
}

handlerStart := time.Now()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ func TestSplitGetRouterConfig_BaseOnly(t *testing.T) {
assert.Nil(t, resp.Config.FeatureFlagConfigs)
assert.Equal(t, "hash-base", p.knownHashes[""])
assert.Contains(t, p.latestVersion, "split-")

require.Len(t, resp.Hashes, 1)
assert.Equal(t, routerconfig.HashInfo{NewHash: "hash-base"}, resp.Hashes[""])
}

func TestSplitGetRouterConfig_MissingBaseGraph(t *testing.T) {
Expand Down Expand Up @@ -125,6 +128,10 @@ func TestSplitGetRouterConfig_WithFeatureFlags(t *testing.T) {

assert.Equal(t, "hash-base", p.knownHashes[""])
assert.Equal(t, "hash-ff1", p.knownHashes["ff1"])

require.Len(t, resp.Hashes, 2)
assert.Equal(t, routerconfig.HashInfo{NewHash: "hash-base"}, resp.Hashes[""])
assert.Equal(t, routerconfig.HashInfo{NewHash: "hash-ff1"}, resp.Hashes["ff1"])
}

func TestSplitGetRouterConfig_MapperError(t *testing.T) {
Expand Down Expand Up @@ -249,6 +256,73 @@ func TestSplitGetRouterConfig_SkipMissingFeatureFlag_FileNotFoundSkipped(t *test
require.NotNil(t, resp.Config.FeatureFlagConfigs)
assert.Contains(t, resp.Config.FeatureFlagConfigs.ConfigByFeatureFlagName, "available")
assert.NotContains(t, resp.Config.FeatureFlagConfigs.ConfigByFeatureFlagName, "missing")
assert.Len(t, resp.Hashes, 2)
assert.Empty(t, resp.Hashes["missing"])
assert.NotEmpty(t, resp.Hashes["available"])
}

// TestSplitGetRouterConfig_SkipMissingFF_NotRetainedInKnownHashesOrVersion verifies
// that a feature flag skipped during the initial fetch is not stored in knownHashes
// and does not contribute to latestVersion. If either were true, a subsequent poll
// where the flag becomes available with the same mapper hash would either exit early
// (newVersion == latestVersion) or silently skip the flag (treated as unchanged in
// knownHashes), preventing the router from ever applying a new engine config.
func TestSplitGetRouterConfig_SkipMissingFF_NotRetainedInKnownHashesOrVersion(t *testing.T) {
baseCfg := makeRouterConfig("v1")
missingCfg := makeRouterConfig("missing-v1")

mock := &mockSplitFetcher{
mapperResult: map[string]string{
"": "hash-base",
"missing": "hash-missing",
},
configResults: map[string]*nodev1.RouterConfig{
"": baseCfg,
},
configErrors: map[string]error{
"missing": errs.ErrFileNotFound,
},
}

p := newTestPoller(mock)
p.configRules = ConfigRules{SkipMissingFeatureFlags: true}

_, err := p.GetRouterConfig(context.Background())
require.NoError(t, err)

// The skipped flag must not appear in knownHashes; otherwise a subsequent poll
// with the same mapper hash would see it as "unchanged" and never retry the fetch.
assert.NotContains(t, p.knownHashes, "missing",
"skipped flag must not be recorded in knownHashes")

// latestVersion must not factor in the skipped flag's hash; otherwise the poll
// exits early when the mapper still reports the same hash.
assert.Equal(t, computeCompositeVersion(map[string]string{"": "hash-base"}), p.latestVersion,
"latestVersion must only reflect actually-assembled graphs")

// Simulate the flag becoming available on the CDN with the same hash as before.
// Reset mapperResult explicitly: the fix may delete skipped flags from activeGraphs
// in-place, and since the mock returns that map by reference, it could be mutated.
mock.mapperResult = map[string]string{
"": "hash-base",
"missing": "hash-missing",
}
delete(mock.configErrors, "missing")
mock.configResults["missing"] = missingCfg
mock.fetchConfigCalls = nil

var received *routerconfig.Response
pollOnce(p, func(resp *routerconfig.Response) error {
received = resp
return nil
})

require.NotNil(t, received, "handler must be called when a previously-skipped flag becomes available")
require.NotNil(t, received.Config.FeatureFlagConfigs)
assert.Contains(t, received.Config.FeatureFlagConfigs.ConfigByFeatureFlagName, "missing",
"previously-skipped flag must be assembled into the config once it is available")
assert.Contains(t, mock.fetchConfigCalls, "missing",
"previously-skipped flag must be fetched in the first poll after it becomes available")
}

func TestSplitGetRouterConfig_SkipMissingFeatureFlag_DisabledByDefault(t *testing.T) {
Expand Down Expand Up @@ -411,6 +485,10 @@ func TestSplitSubscribe_SkipMissingFeatureFlag_ExcludedFromChangesAndKnownHashes
"skipped changed flag must keep its previous engine config")
assert.NotContains(t, received.Config.FeatureFlagConfigs.ConfigByFeatureFlagName, "missing-add",
"skipped new flag must not be assembled into the config")

assert.Len(t, received.Hashes, 2)
assert.Equal(t, routerconfig.HashInfo{OldHash: "hash-base", NewHash: "hash-base"}, received.Hashes[""])
assert.Equal(t, routerconfig.HashInfo{OldHash: "hash-keep-old", NewHash: "hash-keep-new"}, received.Hashes["keep"])
}

// ---- Subscribe / polling tests ----
Expand Down
9 changes: 9 additions & 0 deletions router/pkg/routerconfig/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@ type Response struct {
// have changed since the last successful config apply.
// Nil means changes are unknown -> expect everything to be changed.
Changes *Changes
// Hashes holds the execution config hashes of base + feature flag graphs.
// It contains all newly received, changed, unchanged hashes.
// It does not contains hashes that have been removed, ignored or failed to fetch.
Hashes map[string]HashInfo
}

type HashInfo struct {
OldHash string
NewHash string
Comment thread
dkorittki marked this conversation as resolved.
}

type Changes struct {
Expand Down
Loading