Skip to content
25 changes: 19 additions & 6 deletions router/core/graph_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func (b BuildGraphMuxOptions) IsBaseGraph() bool {
type buildMultiGraphHandlerOptions struct {
baseMux *chi.Mux
featureFlagConfigs map[string]*nodev1.FeatureFlagRouterExecutionConfig
hashes map[string]routerconfig.HashInfo
reloadPersistentState *ReloadPersistentState
currentGraphMuxes map[string]*graphMux
changes *routerconfig.Changes
Expand Down Expand Up @@ -347,7 +348,11 @@ func newGraphServer(routerCtx context.Context, r *Router, response *routerconfig

if needNewBaseGraphMux {
// build new base grap mux
s.logger.Debug("Will build a new base graph mux for new graph server")
s.logger.With(
zap.String("old_execution_config_hash", response.Hashes[""].OldHash), // empty on router boot
zap.String("new_execution_config_hash", response.Hashes[""].NewHash),
).Debug("Will build a new base graph mux for new graph server")

gm, err = s.buildGraphMux(BuildGraphMuxOptions{
RouterConfigVersion: s.baseRouterConfigVersion,
EngineConfig: response.Config.GetEngineConfig(),
Expand All @@ -361,7 +366,9 @@ func newGraphServer(routerCtx context.Context, r *Router, response *routerconfig
return nil, fmt.Errorf("failed to build base mux: %w", err)
}
} else {
s.logger.Debug("Will reuse old base graph mux for new graph server")
s.logger.With(zap.String("execution_config_hash", response.Hashes[""].NewHash)).
Debug("Will reuse old base graph mux for new graph server")

gm = mux
reusedMuxes = append(reusedMuxes, reusedGraphMux{key: "", mux: mux})
}
Expand All @@ -377,6 +384,7 @@ func newGraphServer(routerCtx context.Context, r *Router, response *routerconfig
reloadPersistentState: r.reloadPersistentState,
currentGraphMuxes: currentMuxes,
changes: response.Changes,
hashes: response.Hashes,
defaultClientTLS: defaultGRPCClientTLS,
perSubgraphTLS: perSubgraphGRPCTLS,
})
Expand Down Expand Up @@ -581,17 +589,22 @@ func (s *graphServer) buildMultiGraphHandler(
if !hasChanged && !wasAdded {
oldGraphMux, exists := opts.currentGraphMuxes[featureFlagName]
if exists {
s.logger.Debug("will reuse feature flag mux for new graph server",
zap.String("flag", featureFlagName))
s.logger.Debug("Will reuse feature flag mux for new graph server",
zap.String("flag", featureFlagName),
zap.String("execution_config_hash", opts.hashes[featureFlagName].NewHash),
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
featureFlagToMux[featureFlagName] = oldGraphMux.mux
reused = append(reused, reusedGraphMux{key: featureFlagName, mux: oldGraphMux})
continue
}
}
}

s.logger.Debug("will create a new feature flag mux for new graph server",
zap.String("flag", featureFlagName))
s.logger.Debug("Will create a new feature flag mux for new graph server",
zap.String("flag", featureFlagName),
zap.String("old_execution_config_hash", opts.hashes[featureFlagName].OldHash), // empty on router boot
zap.String("new_execution_config_hash", opts.hashes[featureFlagName].NewHash),
)

gm, err := s.buildGraphMux(BuildGraphMuxOptions{
FeatureFlagName: featureFlagName,
Expand Down
26 changes: 23 additions & 3 deletions router/pkg/controlplane/configpoller/split_config_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,19 @@ func (p *splitConfigPoller) GetRouterConfig(ctx context.Context) (*routerconfig.
return nil, err
}

hashes := make(map[string]routerconfig.HashInfo, len(activeGraphs))
for ffName, hash := range activeGraphs {
hashes[ffName] = routerconfig.HashInfo{NewHash: hash}
}

p.knownHashes = activeGraphs
p.currentConfig = config
p.latestVersion = computeCompositeVersion(activeGraphs)

response := &routerconfig.Response{
Config: config,
Changes: nil, // purposefully nil to tell callers to rebuild everything since this is the initial fetch
Hashes: hashes,
}

return response, nil
Expand Down Expand Up @@ -214,23 +220,34 @@ func (p *splitConfigPoller) Subscribe(ctx context.Context, handler func(response
return
}

newVersion := computeCompositeVersion(mapperGraphs)
if newVersion == p.latestVersion {
Comment thread
dkorittki marked this conversation as resolved.
p.logger.Debug("No changes detected in engine config, keeping existing config")
return
}

// Determine what changed, was added, or was removed.
changes := routerconfig.Changes{
AddedConfigs: make(map[string]struct{}),
RemovedConfigs: make(map[string]struct{}),
ChangedConfigs: make(map[string]struct{}),
}
hashes := make(map[string]routerconfig.HashInfo, len(mapperGraphs))

for name, hash := range mapperGraphs {
if oldHash, exists := p.knownHashes[name]; !exists {
changes.AddedConfigs[name] = struct{}{}
hashes[name] = routerconfig.HashInfo{NewHash: hash}
} else if oldHash != hash {
changes.ChangedConfigs[name] = struct{}{}
hashes[name] = routerconfig.HashInfo{NewHash: hash, OldHash: oldHash}
}
}
for name := range p.knownHashes {
if _, exists := mapperGraphs[name]; !exists {
for name, oldHash := range p.knownHashes {
if newHash, exists := mapperGraphs[name]; !exists {
changes.RemovedConfigs[name] = struct{}{}
} else if oldHash == newHash {
hashes[name] = routerconfig.HashInfo{OldHash: oldHash, NewHash: newHash}
}
}

Expand Down Expand Up @@ -296,7 +313,9 @@ func (p *splitConfigPoller) Subscribe(ctx context.Context, handler func(response
}
}

newVersion := computeCompositeVersion(mapperGraphs)
// Edge case where the fetch loop removes missing feature flags
// from mapperGraphs, which could make the version match again
newVersion = computeCompositeVersion(mapperGraphs)
if newVersion == p.latestVersion {
p.logger.Debug("No changes detected in engine config, keeping existing config")
return
Expand All @@ -305,6 +324,7 @@ func (p *splitConfigPoller) Subscribe(ctx context.Context, handler func(response
response := &routerconfig.Response{
Config: patched,
Changes: &changes,
Hashes: hashes,
}

handlerStart := time.Now()
Expand Down
8 changes: 8 additions & 0 deletions router/pkg/routerconfig/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ type Response struct {
// have changed since the last successful config apply.
// Nil means changes are unknown -> expect everything to be changed.
Changes *Changes
// Hashes holds the execution config hashes (map value) of each feature flag (map key)
// currently used by the router as determined by the config poller.
Hashes map[string]HashInfo
}

type HashInfo struct {
OldHash string
NewHash string
Comment thread
dkorittki marked this conversation as resolved.
}

type Changes struct {
Expand Down
Loading