diff --git a/pkg/ring/model.go b/pkg/ring/model.go index 82d0f9ccb3..c416439a93 100644 --- a/pkg/ring/model.go +++ b/pkg/ring/model.go @@ -241,7 +241,6 @@ func (d *Desc) mergeWithTime(mergeable memberlist.Mergeable, localCAS bool, now otherIngesterMap := other.Ingesters var updated []string - tokensChanged := false maxFutureLimit := now.Add(30 * time.Minute).Unix() for name, oing := range otherIngesterMap { @@ -252,9 +251,6 @@ func (d *Desc) mergeWithTime(mergeable memberlist.Mergeable, localCAS bool, now ting := thisIngesterMap[name] // ting.Timestamp will be 0, if there was no such ingester in our version if oing.Timestamp > ting.Timestamp { - if !tokensEqual(ting.Tokens, oing.Tokens) { - tokensChanged = true - } oing.Tokens = append([]uint32(nil), oing.Tokens...) // make a copy of tokens thisIngesterMap[name] = oing updated = append(updated, name) @@ -289,7 +285,7 @@ func (d *Desc) mergeWithTime(mergeable memberlist.Mergeable, localCAS bool, now } // resolveConflicts allocates lot of memory, so if we can avoid it, do that. - if tokensChanged && conflictingTokensExist(thisIngesterMap) { + if conflictingTokensExist(thisIngesterMap) { resolveConflicts(thisIngesterMap) } @@ -738,7 +734,6 @@ func (d *Desc) FindDifference(o codec.MultiKey) (any, []string, error) { toUpdated := NewDesc() toDelete := make([]string, 0) - tokensChanged := false // If both are null if d == nil && out == nil { @@ -762,7 +757,6 @@ func (d *Desc) FindDifference(o codec.MultiKey) (any, []string, error) { //If new added for name, oing := range out.Ingesters { if _, ok := d.Ingesters[name]; !ok { - tokensChanged = true toUpdated.Ingesters[name] = oing } } @@ -775,29 +769,30 @@ func (d *Desc) FindDifference(o codec.MultiKey) (any, []string, error) { } else if !ing.Equal(oing) { if oing.Timestamp > ing.Timestamp { toUpdated.Ingesters[name] = oing - if !tokensEqual(ing.Tokens, oing.Tokens) { - tokensChanged = true - } } else if oing.Timestamp == ing.Timestamp && ing.State != LEFT && oing.State == LEFT { // we accept LEFT even if timestamp hasn't changed toUpdated.Ingesters[name] = oing - if !tokensEqual(ing.Tokens, oing.Tokens) { - tokensChanged = true - } } } } // resolveConflicts allocates a lot of memory, so if we can avoid it, do that. - if tokensChanged && conflictingTokensExist(out.Ingesters) { + if conflictingTokensExist(out.Ingesters) { resolveConflicts(out.Ingesters) - //Recheck if any instance was updated by the resolveConflict - //All ingesters in toUpdated have already passed the timestamp check, so we can skip checking again + // Refresh all entries already in toUpdated (their tokens may have changed). for name := range toUpdated.Ingesters { - //name must appear in out Ingesters, so we can skip the contains key check toUpdated.Ingesters[name] = out.Ingesters[name] } + // Also include any existing ingester whose tokens were changed by resolveConflicts. + for name, oing := range out.Ingesters { + if _, alreadyUpdated := toUpdated.Ingesters[name]; alreadyUpdated { + continue + } + if ing, ok := d.Ingesters[name]; ok && !tokensEqual(ing.Tokens, oing.Tokens) { + toUpdated.Ingesters[name] = oing + } + } } return toUpdated, toDelete, nil diff --git a/pkg/ring/model_test.go b/pkg/ring/model_test.go index 16295ff354..974bb1cac7 100644 --- a/pkg/ring/model_test.go +++ b/pkg/ring/model_test.go @@ -5,6 +5,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestInstanceDesc_IsHealthy_ForIngesterOperations(t *testing.T) { @@ -746,6 +747,50 @@ func TestDesc_FindDifference(t *testing.T) { } } +func TestDesc_FindDifference_ConcurrentConflictResolutionIsDeterministic(t *testing.T) { + // Simulate two ingesters with duplicate tokens both doing CAS at the same time. + // Both read the same DDB state (current) and produce the same new state (out). + // FindDifference must produce identical toUpdate results so they don't write + // conflicting resolutions to DDB. + current := &Desc{Ingesters: map[string]InstanceDesc{ + "ing-A": {Addr: "addr-A", Tokens: []uint32{1, 2, 3, 10, 20}, Timestamp: 100, State: ACTIVE}, + "ing-B": {Addr: "addr-B", Tokens: []uint32{1, 2, 3, 30, 40}, Timestamp: 100, State: ACTIVE}, + }} + + // ing-A does a heartbeat CAS (only timestamp changes) + outA := &Desc{Ingesters: map[string]InstanceDesc{ + "ing-A": {Addr: "addr-A", Tokens: []uint32{1, 2, 3, 10, 20}, Timestamp: 110, State: ACTIVE}, + "ing-B": {Addr: "addr-B", Tokens: []uint32{1, 2, 3, 30, 40}, Timestamp: 100, State: ACTIVE}, + }} + + // ing-B does a heartbeat CAS (only timestamp changes) + outB := &Desc{Ingesters: map[string]InstanceDesc{ + "ing-A": {Addr: "addr-A", Tokens: []uint32{1, 2, 3, 10, 20}, Timestamp: 100, State: ACTIVE}, + "ing-B": {Addr: "addr-B", Tokens: []uint32{1, 2, 3, 30, 40}, Timestamp: 110, State: ACTIVE}, + }} + + toUpdateA, _, errA := current.FindDifference(outA) + toUpdateB, _, errB := current.FindDifference(outB) + + require.NoError(t, errA) + require.NoError(t, errB) + + // Both must resolve the conflict: ing-A wins tokens 1,2,3 (lower name), ing-B loses them. + updatedA := toUpdateA.(*Desc) + updatedB := toUpdateB.(*Desc) + + // ing-A's resolution should strip tokens 1,2,3 from ing-B + require.Contains(t, updatedA.Ingesters, "ing-B") + assert.Equal(t, []uint32{30, 40}, updatedA.Ingesters["ing-B"].Tokens) + + // ing-B's resolution should also strip tokens 1,2,3 from ing-B + require.Contains(t, updatedB.Ingesters, "ing-B") + assert.Equal(t, []uint32{30, 40}, updatedB.Ingesters["ing-B"].Tokens) + + // Both produce the same token assignment for ing-B — deterministic resolution. + assert.Equal(t, updatedA.Ingesters["ing-B"].Tokens, updatedB.Ingesters["ing-B"].Tokens) +} + func Test_resolveConflicts(t *testing.T) { tests := []struct { name string