Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 26 additions & 37 deletions felix/calc/active_rules_calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ type FelixSender interface {
type PolicyMatchListener interface {
OnPolicyMatch(policyKey model.PolicyKey, endpointKey model.EndpointKey)
OnPolicyMatchStopped(policyKey model.PolicyKey, endpointKey model.EndpointKey)
OnComputedSelectorMatch(cs string, endpointKey model.EndpointKey)
OnComputedSelectorMatchStopped(cs string, endpointKey model.EndpointKey)
OnComputedSelectorMatch(cs string, caller any, endpointKey model.EndpointKey)
OnComputedSelectorMatchStopped(cs string, caller any, endpointKey model.EndpointKey)
}

// ActiveRulesCalculator calculates the set of policies and profiles (i.e. the rules) that
Expand Down Expand Up @@ -91,9 +91,6 @@ type ActiveRulesCalculator struct {
// log out those profiles at the end of the resync.
missingProfiles set.Set[string]

// Tracks components that have called AddExtraComputedSelector.
computedSelectorCallers map[string]set.Set[any]

// Callback objects.
RuleScanner ruleScanner
PolicyMatchListeners []PolicyMatchListener
Expand All @@ -102,7 +99,10 @@ type ActiveRulesCalculator struct {
OnAlive func()
}

type computedSelector string
type computedSelectorKey struct {
selector string
caller any
}

Comment thread
nelljerram marked this conversation as resolved.
func NewActiveRulesCalculator() *ActiveRulesCalculator {
arc := &ActiveRulesCalculator{
Expand All @@ -120,8 +120,6 @@ func NewActiveRulesCalculator() *ActiveRulesCalculator {

// Cache of profile IDs by local endpoint.
endpointKeyToProfileIDs: NewEndpointKeyToProfileIDMap(),

computedSelectorCallers: make(map[string]set.Set[any]),
}
arc.labelIndex = labelindex.NewInheritIndex(arc.onMatchStarted, arc.onMatchStopped)
return arc
Expand Down Expand Up @@ -297,39 +295,30 @@ func (arc *ActiveRulesCalculator) OnUpdate(update api.Update) (_ bool) {
// OnComputedSelectorMatch and OnComputedSelectorMatchStopped callbacks when that selector
// matches/stops matching local endpoints, allowing the expensive selector index to be shared.
//
// Registration is tracked per caller. The caller identity must therefore be stable, and the
// same caller value (including the same inferred type T) must be passed to
// RemoveExtraComputedSelector to remove that caller's registration. Repeated adds from the same
// caller are deduplicated.
// Registration is tracked per caller. The caller identity must therefore be stable, and the same
// caller value must be passed to RemoveExtraComputedSelector to remove that caller's registration.
// Repeated adds from the same caller are deduplicated.
//
// The underlying selector is added to the label index when the first caller registers it, and it
// is only removed after the last caller removes its registration. Callbacks for matches/stops
// matching continue to be delivered to all registered PolicyMatchListeners while the selector is
// present.
func AddExtraComputedSelector[T comparable](arc *ActiveRulesCalculator, cs string, caller T) {
callers := arc.computedSelectorCallers[cs]
if callers == nil {
callers = set.New[any]()
arc.computedSelectorCallers[cs] = callers
sel, err := selector.Parse(cs)
if err != nil {
log.WithError(err).Panicf("Failed to parse computed selector %#v", cs)
}
arc.labelIndex.UpdateSelector(computedSelector(cs), sel)
func (arc *ActiveRulesCalculator) AddExtraComputedSelector(cs string, caller any) {
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring claims the underlying selector is added once (first caller) and only removed after the last caller unregisters, but the current implementation adds/removes a distinct selector ID per (cs, caller). That changes semantics (e.g. removing one caller will trigger match-stopped for that caller even if other callers still want the selector) and defeats the stated goal of sharing the expensive selector index. Consider restoring per-selector ref-counting (track callers per cs) and only UpdateSelector/DeleteSelector for the shared selector when transitioning 0->1 and 1->0 callers, while still passing caller through the callbacks.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semantics and commenting here have now changed.

sel, err := selector.Parse(cs)
if err != nil {
log.WithError(err).Panicf("Failed to parse computed selector %#v", cs)
}
callers.Add(any(caller))
arc.labelIndex.UpdateSelector(computedSelectorKey{
selector: cs,
caller: caller,
}, sel)
}
Comment on lines +308 to 312
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using computedSelectorKey{selector: cs, caller: caller} as the selector ID means the same selector string will be evaluated separately for every caller (scanAllLabels/scanAllSelectors run per selector ID). If multiple components register the same selector, this will multiply selector evaluation work and increase memory usage. A shared selector ID plus a separate callers set would keep evaluation cost constant while still allowing per-caller callback attribution.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but this is the same as the existing situation for policy selectors.


func RemoveExtraComputedSelector[T comparable](arc *ActiveRulesCalculator, cs string, caller T) {
callers := arc.computedSelectorCallers[cs]
if callers == nil {
return
}
callers.Discard(any(caller))
if callers.Len() == 0 {
arc.labelIndex.DeleteSelector(computedSelector(cs))
delete(arc.computedSelectorCallers, cs)
}
func (arc *ActiveRulesCalculator) RemoveExtraComputedSelector(cs string, caller any) {
arc.labelIndex.DeleteSelector(computedSelectorKey{
selector: cs,
caller: caller,
})
}

func policyForceProgrammed(policy *model.Policy) bool {
Expand Down Expand Up @@ -394,10 +383,10 @@ func (arc *ActiveRulesCalculator) updateEndpointProfileIDs(key model.Key, profil
}

func (arc *ActiveRulesCalculator) onMatchStarted(selID, labelId any) {
if cs, ok := selID.(computedSelector); ok {
if key, ok := selID.(computedSelectorKey); ok {
for _, l := range arc.PolicyMatchListeners {
if labelId, ok := labelId.(model.EndpointKey); ok {
l.OnComputedSelectorMatch(string(cs), labelId)
l.OnComputedSelectorMatch(key.selector, key.caller, labelId)
}
}
return
Expand Down Expand Up @@ -426,10 +415,10 @@ func (arc *ActiveRulesCalculator) onMatchStarted(selID, labelId any) {
}

func (arc *ActiveRulesCalculator) onMatchStopped(selID, labelId any) {
if cs, ok := selID.(computedSelector); ok {
if key, ok := selID.(computedSelectorKey); ok {
for _, l := range arc.PolicyMatchListeners {
if labelId, ok := labelId.(model.EndpointKey); ok {
l.OnComputedSelectorMatchStopped(string(cs), labelId)
l.OnComputedSelectorMatchStopped(key.selector, key.caller, labelId)
}
}
return
Expand Down
53 changes: 27 additions & 26 deletions felix/calc/active_rules_calculator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type policyMatchEvent struct {

type computedSelectorMatchEvent struct {
Selector string
Caller any
EndpointKey model.EndpointKey
}

Expand All @@ -48,12 +49,12 @@ func (t *testPolicyMatchListener) OnPolicyMatchStopped(policyKey model.PolicyKey
t.policyMatchStops = append(t.policyMatchStops, policyMatchEvent{policyKey, endpointKey})
}

func (t *testPolicyMatchListener) OnComputedSelectorMatch(cs string, endpointKey model.EndpointKey) {
t.computedSelectorMatches = append(t.computedSelectorMatches, computedSelectorMatchEvent{cs, endpointKey})
func (t *testPolicyMatchListener) OnComputedSelectorMatch(cs string, caller any, endpointKey model.EndpointKey) {
t.computedSelectorMatches = append(t.computedSelectorMatches, computedSelectorMatchEvent{cs, caller, endpointKey})
}

func (t *testPolicyMatchListener) OnComputedSelectorMatchStopped(cs string, endpointKey model.EndpointKey) {
t.computedSelectorMatchStops = append(t.computedSelectorMatchStops, computedSelectorMatchEvent{cs, endpointKey})
func (t *testPolicyMatchListener) OnComputedSelectorMatchStopped(cs string, caller any, endpointKey model.EndpointKey) {
t.computedSelectorMatchStops = append(t.computedSelectorMatchStops, computedSelectorMatchEvent{cs, caller, endpointKey})
}

// noopRuleScanner satisfies the ruleScanner interface required by ActiveRulesCalculator.
Expand Down Expand Up @@ -95,7 +96,7 @@ func deleteEndpoint(arc *ActiveRulesCalculator, key model.WorkloadEndpointKey) {
func TestARC_ComputedSelector_MatchOnEndpointAdd(t *testing.T) {
arc, listener := createARC()

AddExtraComputedSelector(arc, "has(foo)", t)
arc.AddExtraComputedSelector("has(foo)", t)

epKey := model.WorkloadEndpointKey{
Hostname: "host1",
Expand All @@ -120,7 +121,7 @@ func TestARC_ComputedSelector_MatchOnEndpointAdd(t *testing.T) {
func TestARC_ComputedSelector_MatchStoppedOnEndpointRemove(t *testing.T) {
arc, listener := createARC()

AddExtraComputedSelector(arc, "has(foo)", t)
arc.AddExtraComputedSelector("has(foo)", t)

epKey := model.WorkloadEndpointKey{
Hostname: "host1",
Expand All @@ -147,7 +148,7 @@ func TestARC_ComputedSelector_MatchStoppedOnEndpointRemove(t *testing.T) {
func TestARC_ComputedSelector_NoMatchForNonMatchingEndpoint(t *testing.T) {
arc, listener := createARC()

AddExtraComputedSelector(arc, "has(foo)", t)
arc.AddExtraComputedSelector("has(foo)", t)

epKey := model.WorkloadEndpointKey{
Hostname: "host1",
Expand All @@ -169,7 +170,7 @@ func TestARC_ComputedSelector_NoMatchForNonMatchingEndpoint(t *testing.T) {
func TestARC_RemoveComputedSelector(t *testing.T) {
arc, listener := createARC()

AddExtraComputedSelector(arc, "has(foo)", t)
arc.AddExtraComputedSelector("has(foo)", t)

epKey := model.WorkloadEndpointKey{
Hostname: "host1",
Expand All @@ -184,7 +185,7 @@ func TestARC_RemoveComputedSelector(t *testing.T) {
}

// Remove the computed selector — should fire match-stopped.
RemoveExtraComputedSelector(arc, "has(foo)", t)
arc.RemoveExtraComputedSelector("has(foo)", t)

if len(listener.computedSelectorMatchStops) != 1 {
t.Fatalf("expected 1 match stop after removing selector, got %d", len(listener.computedSelectorMatchStops))
Expand Down Expand Up @@ -221,8 +222,8 @@ func TestARC_MultiCaller_BothGetCallbacks(t *testing.T) {
arc.RegisterPolicyMatchListener(listenerB)

// Two different components register the same selector.
AddExtraComputedSelector(arc, "has(foo)", listenerA)
AddExtraComputedSelector(arc, "has(foo)", listenerB)
arc.AddExtraComputedSelector("has(foo)", listenerA)
arc.AddExtraComputedSelector("has(foo)", listenerB)

epKey := model.WorkloadEndpointKey{
Hostname: "host1",
Expand Down Expand Up @@ -250,8 +251,8 @@ func TestARC_MultiCaller_RemoveOneStillActive(t *testing.T) {
arc.RegisterPolicyMatchListener(listenerA)
arc.RegisterPolicyMatchListener(listenerB)

AddExtraComputedSelector(arc, "has(foo)", listenerA)
AddExtraComputedSelector(arc, "has(foo)", listenerB)
arc.AddExtraComputedSelector("has(foo)", listenerA)
arc.AddExtraComputedSelector("has(foo)", listenerB)

epKey := model.WorkloadEndpointKey{
Hostname: "host1",
Expand All @@ -264,7 +265,7 @@ func TestARC_MultiCaller_RemoveOneStillActive(t *testing.T) {
listenerB.computedSelectorMatches = nil

// Remove caller A — B still holds a reference.
RemoveExtraComputedSelector(arc, "has(foo)", listenerA)
arc.RemoveExtraComputedSelector("has(foo)", listenerA)

// No match-stopped should fire on either listener because the selector is still active.
for name, l := range map[string]*testPolicyMatchListener{"A": listenerA, "B": listenerB} {
Expand Down Expand Up @@ -308,8 +309,8 @@ func TestARC_MultiCaller_RemoveBothDeactivates(t *testing.T) {
arc.RegisterPolicyMatchListener(listenerA)
arc.RegisterPolicyMatchListener(listenerB)

AddExtraComputedSelector(arc, "has(foo)", listenerA)
AddExtraComputedSelector(arc, "has(foo)", listenerB)
arc.AddExtraComputedSelector("has(foo)", listenerA)
arc.AddExtraComputedSelector("has(foo)", listenerB)

epKey := model.WorkloadEndpointKey{
Hostname: "host1",
Expand All @@ -320,8 +321,8 @@ func TestARC_MultiCaller_RemoveBothDeactivates(t *testing.T) {
addEndpoint(arc, epKey, map[string]string{"foo": "bar"})

// Remove both callers.
RemoveExtraComputedSelector(arc, "has(foo)", listenerA)
RemoveExtraComputedSelector(arc, "has(foo)", listenerB)
arc.RemoveExtraComputedSelector("has(foo)", listenerA)
arc.RemoveExtraComputedSelector("has(foo)", listenerB)

// Removing the last caller should fire match-stopped on both listeners.
for name, l := range map[string]*testPolicyMatchListener{"A": listenerA, "B": listenerB} {
Expand Down Expand Up @@ -357,8 +358,8 @@ func TestARC_MultiCaller_DuplicateAddFromSameCaller(t *testing.T) {
arc.RegisterPolicyMatchListener(listener)

// Same caller adds the same selector twice.
AddExtraComputedSelector(arc, "has(foo)", listener)
AddExtraComputedSelector(arc, "has(foo)", listener)
arc.AddExtraComputedSelector("has(foo)", listener)
arc.AddExtraComputedSelector("has(foo)", listener)

epKey := model.WorkloadEndpointKey{
Hostname: "host1",
Expand All @@ -373,7 +374,7 @@ func TestARC_MultiCaller_DuplicateAddFromSameCaller(t *testing.T) {
}

// A single Remove should be enough since the set deduplicates the caller.
RemoveExtraComputedSelector(arc, "has(foo)", listener)
arc.RemoveExtraComputedSelector("has(foo)", listener)

if len(listener.computedSelectorMatchStops) != 1 {
t.Fatalf("expected 1 match stop after single remove, got %d", len(listener.computedSelectorMatchStops))
Expand All @@ -386,7 +387,7 @@ func TestARC_MultiCaller_RemoveWithoutAdd(t *testing.T) {
arc.RegisterPolicyMatchListener(listener)

// Removing a selector that was never added should be a no-op.
RemoveExtraComputedSelector(arc, "has(foo)", listener)
arc.RemoveExtraComputedSelector("has(foo)", listener)

epKey := model.WorkloadEndpointKey{
Hostname: "host1",
Expand All @@ -411,7 +412,7 @@ func TestARC_MultiCaller_ReAddAfterFullRemoval(t *testing.T) {
arc.RegisterPolicyMatchListener(listenerA)
arc.RegisterPolicyMatchListener(listenerB)

AddExtraComputedSelector(arc, "has(foo)", listenerA)
arc.AddExtraComputedSelector("has(foo)", listenerA)

epKey := model.WorkloadEndpointKey{
Hostname: "host1",
Expand All @@ -422,7 +423,7 @@ func TestARC_MultiCaller_ReAddAfterFullRemoval(t *testing.T) {
addEndpoint(arc, epKey, map[string]string{"foo": "bar"})

// Remove fully.
RemoveExtraComputedSelector(arc, "has(foo)", listenerA)
arc.RemoveExtraComputedSelector("has(foo)", listenerA)
if len(listenerA.computedSelectorMatchStops) != 1 {
t.Fatalf("expected 1 match stop on listener A, got %d", len(listenerA.computedSelectorMatchStops))
}
Expand All @@ -434,7 +435,7 @@ func TestARC_MultiCaller_ReAddAfterFullRemoval(t *testing.T) {

// Re-add the same selector from a different caller — should re-match the
// existing endpoint, and both listeners should see it.
AddExtraComputedSelector(arc, "has(foo)", listenerB)
arc.AddExtraComputedSelector("has(foo)", listenerB)

for name, l := range map[string]*testPolicyMatchListener{"A": listenerA, "B": listenerB} {
if len(l.computedSelectorMatches) != 1 {
Expand All @@ -449,7 +450,7 @@ func TestARC_MultiCaller_ReAddAfterFullRemoval(t *testing.T) {
func TestARC_ComputedSelector_DoesNotTriggerPolicyCallbacks(t *testing.T) {
arc, listener := createARC()

AddExtraComputedSelector(arc, "has(foo)", t)
arc.AddExtraComputedSelector("has(foo)", t)

epKey := model.WorkloadEndpointKey{
Hostname: "host1",
Expand Down
6 changes: 3 additions & 3 deletions felix/calc/istio_calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func NewIstioCalculator(
// IP set. This will include local and remote endpoints.
ipSetCallbacks.OnIPSetAdded(rules.IPSetIDAllIstioWEPs, proto.IPSetUpdate_IP)
ipsetMemberIndex.UpdateIPSet(rules.IPSetIDAllIstioWEPs, sel, ipsetmember.ProtocolNone, "")
AddExtraComputedSelector(activeRulesCalc, istioSelector, ic)
activeRulesCalc.AddExtraComputedSelector(istioSelector, ic)
// Piggy-back on the active rules calculator's index of local endpoints
// to give us callbacks when a local endpoint is an Istio endpoint. (The
// index is expensive so we don't want a second copy here.)
Expand All @@ -78,15 +78,15 @@ func NewIstioCalculator(
func (ic *IstioCalculator) OnPolicyMatch(_ model.PolicyKey, _ model.EndpointKey) {}
func (ic *IstioCalculator) OnPolicyMatchStopped(_ model.PolicyKey, _ model.EndpointKey) {}

func (ic *IstioCalculator) OnComputedSelectorMatch(cs string, epKey model.EndpointKey) {
func (ic *IstioCalculator) OnComputedSelectorMatch(cs string, _ any, epKey model.EndpointKey) {
if wepKey, ok := epKey.(model.WorkloadEndpointKey); ok && cs == istioSelector {
// Always pass a newly created or cloned `computedData` instance to the handler.
// This ensures the dataplane never receives a mutable object shared elsewhere.
ic.onEndpointComputedData(wepKey, EPCompDataKindIstio, &ComputedIstioEndpoint{})
}
}

func (ic *IstioCalculator) OnComputedSelectorMatchStopped(cs string, epKey model.EndpointKey) {
func (ic *IstioCalculator) OnComputedSelectorMatchStopped(cs string, _ any, epKey model.EndpointKey) {
if wepKey, ok := epKey.(model.WorkloadEndpointKey); ok && cs == istioSelector {
ic.onEndpointComputedData(wepKey, EPCompDataKindIstio, nil)
}
Expand Down
14 changes: 10 additions & 4 deletions felix/calc/live_migration_calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,10 @@ func (lmc *LiveMigrationCalculator) OnUpdate(update api.Update) (_ bool) {
func (lmc *LiveMigrationCalculator) OnPolicyMatch(_ model.PolicyKey, _ model.EndpointKey) {}
func (lmc *LiveMigrationCalculator) OnPolicyMatchStopped(_ model.PolicyKey, _ model.EndpointKey) {}

func (lmc *LiveMigrationCalculator) OnComputedSelectorMatch(cs string, epKey model.EndpointKey) {
func (lmc *LiveMigrationCalculator) OnComputedSelectorMatch(cs string, caller any, epKey model.EndpointKey) {
if caller != lmc {
return
}
if _, ok := lmc.selectorKeys[cs]; !ok {
return
}
Expand All @@ -242,7 +245,10 @@ func (lmc *LiveMigrationCalculator) OnComputedSelectorMatch(cs string, epKey mod
}
}

func (lmc *LiveMigrationCalculator) OnComputedSelectorMatchStopped(cs string, epKey model.EndpointKey) {
func (lmc *LiveMigrationCalculator) OnComputedSelectorMatchStopped(cs string, caller any, epKey model.EndpointKey) {
if caller != lmc {
return
}
if _, ok := lmc.selectorKeys[cs]; !ok {
return
}
Expand Down Expand Up @@ -429,7 +435,7 @@ func (lmc *LiveMigrationCalculator) refSelector(
}
keys.Add(lmKey)
if keys.Len() == 1 {
AddExtraComputedSelector(lmc.activeRulesCalc, selector, lmc)
lmc.activeRulesCalc.AddExtraComputedSelector(selector, lmc)
}
}

Expand All @@ -441,7 +447,7 @@ func (lmc *LiveMigrationCalculator) unrefSelector(
if keys != nil {
keys.Discard(lmKey)
if keys.Len() == 0 {
RemoveExtraComputedSelector(lmc.activeRulesCalc, selector, lmc)
lmc.activeRulesCalc.RemoveExtraComputedSelector(selector, lmc)
delete(lmc.selectorKeys, selector)
}
}
Expand Down
4 changes: 2 additions & 2 deletions felix/calc/policy_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ func (pr *PolicyResolver) OnPolicyMatchStopped(policyKey model.PolicyKey, endpoi
pr.dirtyEndpoints.Add(endpointKey)
}

func (pr *PolicyResolver) OnComputedSelectorMatch(_ string, _ model.EndpointKey) {}
func (pr *PolicyResolver) OnComputedSelectorMatchStopped(_ string, _ model.EndpointKey) {}
func (pr *PolicyResolver) OnComputedSelectorMatch(_ string, _ any, _ model.EndpointKey) {}
func (pr *PolicyResolver) OnComputedSelectorMatchStopped(_ string, _ any, _ model.EndpointKey) {}

func (pr *PolicyResolver) Flush() {
if !pr.InSync {
Expand Down
Loading