From 449a206b411149f4d77a0b45fc5c067e598c533a Mon Sep 17 00:00:00 2001 From: Alex Le Date: Thu, 21 May 2026 15:29:43 -0700 Subject: [PATCH 1/3] avoid duplicated tokens when using minimize spread token strategy Signed-off-by: Alex Le --- pkg/ring/token_generator.go | 51 +++++++++++++++++++++++++++++--- pkg/ring/token_generator_test.go | 50 +++++++++++++++++++++++++++++-- 2 files changed, 95 insertions(+), 6 deletions(-) diff --git a/pkg/ring/token_generator.go b/pkg/ring/token_generator.go index 59f3db23a3c..7ade2fe802a 100644 --- a/pkg/ring/token_generator.go +++ b/pkg/ring/token_generator.go @@ -2,6 +2,7 @@ package ring import ( "container/heap" + "hash/fnv" "math" "math/rand" "slices" @@ -65,6 +66,15 @@ func (g *RandomTokenGenerator) GenerateTokens(ring *Desc, _, _ string, numTokens return tokens } +// instanceHash returns a deterministic uint32 derived from the ingester ID and a round counter. +// Used to select among candidate gaps so that concurrent ingesters pick different gaps. +func instanceHash(id string, round int) uint32 { + h := fnv.New32a() + h.Write([]byte(id)) + h.Write([]byte{byte(round >> 24), byte(round >> 16), byte(round >> 8), byte(round)}) + return h.Sum32() +} + type MinimizeSpreadTokenGenerator struct { innerGenerator TokenGenerator } @@ -160,6 +170,9 @@ func (g *MinimizeSpreadTokenGenerator) GenerateTokens(ring *Desc, id, zone strin distancesHeap := &tokenDistanceHeap{} for _, perInstance := range tokensPerInstanceWithDistance { + if len(perInstance.tokens) == 0 { + continue + } sort.Slice(perInstance.tokens, func(i, j int) bool { return perInstance.tokens[i].distance > perInstance.tokens[j].distance }) @@ -182,7 +195,27 @@ func (g *MinimizeSpreadTokenGenerator) GenerateTokens(ring *Desc, id, zone strin // Calculating the expected distance per step taking in consideration the tokens already created expectedDistanceStep := (expectedOwnershipDistance - currentInstance.totalDistance) / int64(numTokens-len(r)) - m := heap.Pop(distancesHeap).(*totalTokenPerInstance) + // Collect up to 4 candidates from the top of the heap. Two concurrent ingesters + // use a per-instance hash to pick different candidates, avoiding splitting the + // same instance's gap and causing uneven distribution. + const maxCandidates = 4 + candidates := make([]*totalTokenPerInstance, 0, maxCandidates) + for len(*distancesHeap) > 0 && len(candidates) < maxCandidates { + candidates = append(candidates, heap.Pop(distancesHeap).(*totalTokenPerInstance)) + } + + var pick int + if len(candidates) > 1 { + pick = int(instanceHash(id, len(r))) % len(candidates) + } + m := candidates[pick] + + // Push back the non-selected candidates. + for idx, c := range candidates { + if idx != pick { + heap.Push(distancesHeap, c) + } + } i := findFirst(len(m.tokens), func(x int) bool { return m.tokens[x].distance > expectedDistanceStep @@ -199,10 +232,20 @@ func (g *MinimizeSpreadTokenGenerator) GenerateTokens(ring *Desc, id, zone strin } var newToken uint32 - if int64(tokenToSplit.prev)+expectedDistanceStep > maxTokenValue { - newToken = uint32(int64(tokenToSplit.prev) + expectedDistanceStep - maxTokenValue) + // When only one candidate was available, two concurrent ingesters will pick + // the same instance and gap. Apply a small per-instance jitter to differentiate. + offset := expectedDistanceStep + if len(candidates) <= 1 { + h := instanceHash(id, len(r)) + maxJitter := expectedDistanceStep / 100 + if maxJitter > 0 { + offset += int64(h) % maxJitter + } + } + if int64(tokenToSplit.prev)+offset > maxTokenValue { + newToken = uint32(int64(tokenToSplit.prev) + offset - maxTokenValue) } else { - newToken = uint32(int64(tokenToSplit.prev) + expectedDistanceStep) + newToken = uint32(int64(tokenToSplit.prev) + offset) } if _, ok := usedTokens[newToken]; !ok { diff --git a/pkg/ring/token_generator_test.go b/pkg/ring/token_generator_test.go index ab536f97dbc..b473c91bcc8 100644 --- a/pkg/ring/token_generator_test.go +++ b/pkg/ring/token_generator_test.go @@ -91,9 +91,12 @@ func TestMinimizeSpreadTokenGenerator(t *testing.T) { require.Equal(t, mTokenGenerator.called, len(zones)) // Should Generate tokens based on the ring state + // Tolerance is 5% (vs original 2%) because candidate selection among heap entries + // trades a small amount of optimality for collision avoidance. The impact is only + // visible with very few ingesters; with many ingesters the distribution converges. for i := range 50 { generateTokensForIngesters(t, rindDesc, fmt.Sprintf("minimize-%v", i), zones, minimizeTokenGenerator, dups) - assertDistancePerIngester(t, rindDesc, 0.02) + assertDistancePerIngester(t, rindDesc, 0.05) } require.Equal(t, mTokenGenerator.called, len(zones)) @@ -103,7 +106,7 @@ func TestMinimizeSpreadTokenGenerator(t *testing.T) { rindDesc.AddIngester("partial", "partial", zones[0], rTokens, ACTIVE, time.Now()) nTokens := minimizeTokenGenerator.GenerateTokens(rindDesc, "partial", zones[0], 256, true) rindDesc.AddIngester("partial", "partial", zones[0], append(rTokens, nTokens...), ACTIVE, time.Now()) - assertDistancePerIngester(t, rindDesc, 0.02) + assertDistancePerIngester(t, rindDesc, 0.05) mTokenGenerator.called = 0 // Should fallback to random generator when more than 1 ingester does not have tokens and force flag is set @@ -204,3 +207,46 @@ func assertDistancePerIngester(t testing.TB, d *Desc, tolerance float64) { }, "[%v] expected and real distance error is greater than %v -> %v[%v/%v]", s, tolerance, 1-math.Abs(expectedDistance/realDistance), expectedDistance, realDistance) } } + +func TestMinimizeSpreadTokenGenerator_NoDuplicatesOnConcurrentJoin(t *testing.T) { + // Simulate two ingesters joining concurrently: both see the same ring state + // and generate tokens independently. With candidate gap selection, they must + // produce different tokens. + zones := []string{"zone1", "zone2", "zone3"} + tg := NewMinimizeSpreadTokenGenerator() + + // Set up a ring with existing ingesters so MinimizeSpread uses its deterministic path. + ringDesc := NewDesc() + for i := range 3 { + for _, zone := range zones { + id := fmt.Sprintf("existing-%d-%s", i, zone) + tokens := tg.GenerateTokens(ringDesc, id, zone, 512, true) + ringDesc.AddIngester(id, id, zone, tokens, ACTIVE, time.Now()) + } + } + + // Two new ingesters in the same zone read the same ring state. + // Register both with no tokens so they both attempt MinimizeSpread. + now := time.Now() + ringDesc.AddIngester("new-ingester-A", "new-ingester-A", zones[0], []uint32{}, ACTIVE, now) + ringDesc.AddIngester("new-ingester-B", "new-ingester-B", zones[0], []uint32{}, ACTIVE, now) + + tokensA := tg.GenerateTokens(ringDesc, "new-ingester-A", zones[0], 512, true) + tokensB := tg.GenerateTokens(ringDesc, "new-ingester-B", zones[0], 512, true) + + setA := make(map[uint32]bool, len(tokensA)) + for _, tok := range tokensA { + setA[tok] = true + } + + var duplicates []uint32 + for _, tok := range tokensB { + if setA[tok] { + duplicates = append(duplicates, tok) + } + } + + require.Empty(t, duplicates, "ingesters A and B produced %d duplicate tokens from the same ring state", len(duplicates)) + require.Len(t, tokensA, 512) + require.Len(t, tokensB, 512) +} From db8308bea4aee7655c3f22844ac604cfbcb59e81 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Fri, 22 May 2026 13:21:49 -0700 Subject: [PATCH 2/3] Revert "avoid duplicated tokens when using minimize spread token strategy" This reverts commit aff3b18446d782f98673979fc1e7f39ddaf72c6a. Signed-off-by: Alex Le --- pkg/ring/token_generator.go | 51 +++----------------------------- pkg/ring/token_generator_test.go | 50 ++----------------------------- 2 files changed, 6 insertions(+), 95 deletions(-) diff --git a/pkg/ring/token_generator.go b/pkg/ring/token_generator.go index 7ade2fe802a..59f3db23a3c 100644 --- a/pkg/ring/token_generator.go +++ b/pkg/ring/token_generator.go @@ -2,7 +2,6 @@ package ring import ( "container/heap" - "hash/fnv" "math" "math/rand" "slices" @@ -66,15 +65,6 @@ func (g *RandomTokenGenerator) GenerateTokens(ring *Desc, _, _ string, numTokens return tokens } -// instanceHash returns a deterministic uint32 derived from the ingester ID and a round counter. -// Used to select among candidate gaps so that concurrent ingesters pick different gaps. -func instanceHash(id string, round int) uint32 { - h := fnv.New32a() - h.Write([]byte(id)) - h.Write([]byte{byte(round >> 24), byte(round >> 16), byte(round >> 8), byte(round)}) - return h.Sum32() -} - type MinimizeSpreadTokenGenerator struct { innerGenerator TokenGenerator } @@ -170,9 +160,6 @@ func (g *MinimizeSpreadTokenGenerator) GenerateTokens(ring *Desc, id, zone strin distancesHeap := &tokenDistanceHeap{} for _, perInstance := range tokensPerInstanceWithDistance { - if len(perInstance.tokens) == 0 { - continue - } sort.Slice(perInstance.tokens, func(i, j int) bool { return perInstance.tokens[i].distance > perInstance.tokens[j].distance }) @@ -195,27 +182,7 @@ func (g *MinimizeSpreadTokenGenerator) GenerateTokens(ring *Desc, id, zone strin // Calculating the expected distance per step taking in consideration the tokens already created expectedDistanceStep := (expectedOwnershipDistance - currentInstance.totalDistance) / int64(numTokens-len(r)) - // Collect up to 4 candidates from the top of the heap. Two concurrent ingesters - // use a per-instance hash to pick different candidates, avoiding splitting the - // same instance's gap and causing uneven distribution. - const maxCandidates = 4 - candidates := make([]*totalTokenPerInstance, 0, maxCandidates) - for len(*distancesHeap) > 0 && len(candidates) < maxCandidates { - candidates = append(candidates, heap.Pop(distancesHeap).(*totalTokenPerInstance)) - } - - var pick int - if len(candidates) > 1 { - pick = int(instanceHash(id, len(r))) % len(candidates) - } - m := candidates[pick] - - // Push back the non-selected candidates. - for idx, c := range candidates { - if idx != pick { - heap.Push(distancesHeap, c) - } - } + m := heap.Pop(distancesHeap).(*totalTokenPerInstance) i := findFirst(len(m.tokens), func(x int) bool { return m.tokens[x].distance > expectedDistanceStep @@ -232,20 +199,10 @@ func (g *MinimizeSpreadTokenGenerator) GenerateTokens(ring *Desc, id, zone strin } var newToken uint32 - // When only one candidate was available, two concurrent ingesters will pick - // the same instance and gap. Apply a small per-instance jitter to differentiate. - offset := expectedDistanceStep - if len(candidates) <= 1 { - h := instanceHash(id, len(r)) - maxJitter := expectedDistanceStep / 100 - if maxJitter > 0 { - offset += int64(h) % maxJitter - } - } - if int64(tokenToSplit.prev)+offset > maxTokenValue { - newToken = uint32(int64(tokenToSplit.prev) + offset - maxTokenValue) + if int64(tokenToSplit.prev)+expectedDistanceStep > maxTokenValue { + newToken = uint32(int64(tokenToSplit.prev) + expectedDistanceStep - maxTokenValue) } else { - newToken = uint32(int64(tokenToSplit.prev) + offset) + newToken = uint32(int64(tokenToSplit.prev) + expectedDistanceStep) } if _, ok := usedTokens[newToken]; !ok { diff --git a/pkg/ring/token_generator_test.go b/pkg/ring/token_generator_test.go index b473c91bcc8..ab536f97dbc 100644 --- a/pkg/ring/token_generator_test.go +++ b/pkg/ring/token_generator_test.go @@ -91,12 +91,9 @@ func TestMinimizeSpreadTokenGenerator(t *testing.T) { require.Equal(t, mTokenGenerator.called, len(zones)) // Should Generate tokens based on the ring state - // Tolerance is 5% (vs original 2%) because candidate selection among heap entries - // trades a small amount of optimality for collision avoidance. The impact is only - // visible with very few ingesters; with many ingesters the distribution converges. for i := range 50 { generateTokensForIngesters(t, rindDesc, fmt.Sprintf("minimize-%v", i), zones, minimizeTokenGenerator, dups) - assertDistancePerIngester(t, rindDesc, 0.05) + assertDistancePerIngester(t, rindDesc, 0.02) } require.Equal(t, mTokenGenerator.called, len(zones)) @@ -106,7 +103,7 @@ func TestMinimizeSpreadTokenGenerator(t *testing.T) { rindDesc.AddIngester("partial", "partial", zones[0], rTokens, ACTIVE, time.Now()) nTokens := minimizeTokenGenerator.GenerateTokens(rindDesc, "partial", zones[0], 256, true) rindDesc.AddIngester("partial", "partial", zones[0], append(rTokens, nTokens...), ACTIVE, time.Now()) - assertDistancePerIngester(t, rindDesc, 0.05) + assertDistancePerIngester(t, rindDesc, 0.02) mTokenGenerator.called = 0 // Should fallback to random generator when more than 1 ingester does not have tokens and force flag is set @@ -207,46 +204,3 @@ func assertDistancePerIngester(t testing.TB, d *Desc, tolerance float64) { }, "[%v] expected and real distance error is greater than %v -> %v[%v/%v]", s, tolerance, 1-math.Abs(expectedDistance/realDistance), expectedDistance, realDistance) } } - -func TestMinimizeSpreadTokenGenerator_NoDuplicatesOnConcurrentJoin(t *testing.T) { - // Simulate two ingesters joining concurrently: both see the same ring state - // and generate tokens independently. With candidate gap selection, they must - // produce different tokens. - zones := []string{"zone1", "zone2", "zone3"} - tg := NewMinimizeSpreadTokenGenerator() - - // Set up a ring with existing ingesters so MinimizeSpread uses its deterministic path. - ringDesc := NewDesc() - for i := range 3 { - for _, zone := range zones { - id := fmt.Sprintf("existing-%d-%s", i, zone) - tokens := tg.GenerateTokens(ringDesc, id, zone, 512, true) - ringDesc.AddIngester(id, id, zone, tokens, ACTIVE, time.Now()) - } - } - - // Two new ingesters in the same zone read the same ring state. - // Register both with no tokens so they both attempt MinimizeSpread. - now := time.Now() - ringDesc.AddIngester("new-ingester-A", "new-ingester-A", zones[0], []uint32{}, ACTIVE, now) - ringDesc.AddIngester("new-ingester-B", "new-ingester-B", zones[0], []uint32{}, ACTIVE, now) - - tokensA := tg.GenerateTokens(ringDesc, "new-ingester-A", zones[0], 512, true) - tokensB := tg.GenerateTokens(ringDesc, "new-ingester-B", zones[0], 512, true) - - setA := make(map[uint32]bool, len(tokensA)) - for _, tok := range tokensA { - setA[tok] = true - } - - var duplicates []uint32 - for _, tok := range tokensB { - if setA[tok] { - duplicates = append(duplicates, tok) - } - } - - require.Empty(t, duplicates, "ingesters A and B produced %d duplicate tokens from the same ring state", len(duplicates)) - require.Len(t, tokensA, 512) - require.Len(t, tokensB, 512) -} From b7bd045f81d410215b7d08c2cfddaab81c96c287 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Fri, 22 May 2026 13:45:30 -0700 Subject: [PATCH 3/3] fix resolve token conflict logic under race condition Signed-off-by: Alex Le --- pkg/ring/model.go | 29 +++++++++++---------------- pkg/ring/model_test.go | 45 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 17 deletions(-) diff --git a/pkg/ring/model.go b/pkg/ring/model.go index 82d0f9ccb3f..c416439a938 100644 --- a/pkg/ring/model.go +++ b/pkg/ring/model.go @@ -241,7 +241,6 @@ func (d *Desc) mergeWithTime(mergeable memberlist.Mergeable, localCAS bool, now otherIngesterMap := other.Ingesters var updated []string - tokensChanged := false maxFutureLimit := now.Add(30 * time.Minute).Unix() for name, oing := range otherIngesterMap { @@ -252,9 +251,6 @@ func (d *Desc) mergeWithTime(mergeable memberlist.Mergeable, localCAS bool, now ting := thisIngesterMap[name] // ting.Timestamp will be 0, if there was no such ingester in our version if oing.Timestamp > ting.Timestamp { - if !tokensEqual(ting.Tokens, oing.Tokens) { - tokensChanged = true - } oing.Tokens = append([]uint32(nil), oing.Tokens...) // make a copy of tokens thisIngesterMap[name] = oing updated = append(updated, name) @@ -289,7 +285,7 @@ func (d *Desc) mergeWithTime(mergeable memberlist.Mergeable, localCAS bool, now } // resolveConflicts allocates lot of memory, so if we can avoid it, do that. - if tokensChanged && conflictingTokensExist(thisIngesterMap) { + if conflictingTokensExist(thisIngesterMap) { resolveConflicts(thisIngesterMap) } @@ -738,7 +734,6 @@ func (d *Desc) FindDifference(o codec.MultiKey) (any, []string, error) { toUpdated := NewDesc() toDelete := make([]string, 0) - tokensChanged := false // If both are null if d == nil && out == nil { @@ -762,7 +757,6 @@ func (d *Desc) FindDifference(o codec.MultiKey) (any, []string, error) { //If new added for name, oing := range out.Ingesters { if _, ok := d.Ingesters[name]; !ok { - tokensChanged = true toUpdated.Ingesters[name] = oing } } @@ -775,29 +769,30 @@ func (d *Desc) FindDifference(o codec.MultiKey) (any, []string, error) { } else if !ing.Equal(oing) { if oing.Timestamp > ing.Timestamp { toUpdated.Ingesters[name] = oing - if !tokensEqual(ing.Tokens, oing.Tokens) { - tokensChanged = true - } } else if oing.Timestamp == ing.Timestamp && ing.State != LEFT && oing.State == LEFT { // we accept LEFT even if timestamp hasn't changed toUpdated.Ingesters[name] = oing - if !tokensEqual(ing.Tokens, oing.Tokens) { - tokensChanged = true - } } } } // resolveConflicts allocates a lot of memory, so if we can avoid it, do that. - if tokensChanged && conflictingTokensExist(out.Ingesters) { + if conflictingTokensExist(out.Ingesters) { resolveConflicts(out.Ingesters) - //Recheck if any instance was updated by the resolveConflict - //All ingesters in toUpdated have already passed the timestamp check, so we can skip checking again + // Refresh all entries already in toUpdated (their tokens may have changed). for name := range toUpdated.Ingesters { - //name must appear in out Ingesters, so we can skip the contains key check toUpdated.Ingesters[name] = out.Ingesters[name] } + // Also include any existing ingester whose tokens were changed by resolveConflicts. + for name, oing := range out.Ingesters { + if _, alreadyUpdated := toUpdated.Ingesters[name]; alreadyUpdated { + continue + } + if ing, ok := d.Ingesters[name]; ok && !tokensEqual(ing.Tokens, oing.Tokens) { + toUpdated.Ingesters[name] = oing + } + } } return toUpdated, toDelete, nil diff --git a/pkg/ring/model_test.go b/pkg/ring/model_test.go index 16295ff3541..974bb1cac73 100644 --- a/pkg/ring/model_test.go +++ b/pkg/ring/model_test.go @@ -5,6 +5,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestInstanceDesc_IsHealthy_ForIngesterOperations(t *testing.T) { @@ -746,6 +747,50 @@ func TestDesc_FindDifference(t *testing.T) { } } +func TestDesc_FindDifference_ConcurrentConflictResolutionIsDeterministic(t *testing.T) { + // Simulate two ingesters with duplicate tokens both doing CAS at the same time. + // Both read the same DDB state (current) and produce the same new state (out). + // FindDifference must produce identical toUpdate results so they don't write + // conflicting resolutions to DDB. + current := &Desc{Ingesters: map[string]InstanceDesc{ + "ing-A": {Addr: "addr-A", Tokens: []uint32{1, 2, 3, 10, 20}, Timestamp: 100, State: ACTIVE}, + "ing-B": {Addr: "addr-B", Tokens: []uint32{1, 2, 3, 30, 40}, Timestamp: 100, State: ACTIVE}, + }} + + // ing-A does a heartbeat CAS (only timestamp changes) + outA := &Desc{Ingesters: map[string]InstanceDesc{ + "ing-A": {Addr: "addr-A", Tokens: []uint32{1, 2, 3, 10, 20}, Timestamp: 110, State: ACTIVE}, + "ing-B": {Addr: "addr-B", Tokens: []uint32{1, 2, 3, 30, 40}, Timestamp: 100, State: ACTIVE}, + }} + + // ing-B does a heartbeat CAS (only timestamp changes) + outB := &Desc{Ingesters: map[string]InstanceDesc{ + "ing-A": {Addr: "addr-A", Tokens: []uint32{1, 2, 3, 10, 20}, Timestamp: 100, State: ACTIVE}, + "ing-B": {Addr: "addr-B", Tokens: []uint32{1, 2, 3, 30, 40}, Timestamp: 110, State: ACTIVE}, + }} + + toUpdateA, _, errA := current.FindDifference(outA) + toUpdateB, _, errB := current.FindDifference(outB) + + require.NoError(t, errA) + require.NoError(t, errB) + + // Both must resolve the conflict: ing-A wins tokens 1,2,3 (lower name), ing-B loses them. + updatedA := toUpdateA.(*Desc) + updatedB := toUpdateB.(*Desc) + + // ing-A's resolution should strip tokens 1,2,3 from ing-B + require.Contains(t, updatedA.Ingesters, "ing-B") + assert.Equal(t, []uint32{30, 40}, updatedA.Ingesters["ing-B"].Tokens) + + // ing-B's resolution should also strip tokens 1,2,3 from ing-B + require.Contains(t, updatedB.Ingesters, "ing-B") + assert.Equal(t, []uint32{30, 40}, updatedB.Ingesters["ing-B"].Tokens) + + // Both produce the same token assignment for ing-B — deterministic resolution. + assert.Equal(t, updatedA.Ingesters["ing-B"].Tokens, updatedB.Ingesters["ing-B"].Tokens) +} + func Test_resolveConflicts(t *testing.T) { tests := []struct { name string