diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 5fccdd1437f..fa6973a4a70 100644 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -1343,8 +1343,9 @@ func (gc *groupCostController) acquireTokens(ctx context.Context, delta *rmpb.Co gc.metrics.runningKVRequestCounter.Inc() defer gc.metrics.runningKVRequestCounter.Dec() var ( - err error - d time.Duration + err error + d time.Duration + reconfiguredCh <-chan struct{} ) retryLoop: for range gc.mainCfg.WaitRetryTimes { @@ -1363,6 +1364,7 @@ retryLoop: case rmpb.GroupMode_RUMode: res := make([]*Reservation, 0, len(requestUnitLimitTypeList)) for typ, counter := range gc.run.requestUnitTokens { + reconfiguredCh = counter.limiter.GetReconfiguredCh() if v := getRUValueFromConsumption(delta, typ); v > 0 { // record the consume token histogram if enable controller debug mode. if enableControllerTraceLog.Load() { @@ -1381,8 +1383,28 @@ retryLoop: } } gc.metrics.requestRetryCounter.Inc() - time.Sleep(gc.mainCfg.WaitRetryInterval) - *waitDuration += gc.mainCfg.WaitRetryInterval + waitStart := time.Now() + waitTimer := time.NewTimer(gc.mainCfg.WaitRetryInterval) + select { + case <-ctx.Done(): + if !waitTimer.Stop() { + select { + case <-waitTimer.C: + default: + } + } + *waitDuration += time.Since(waitStart) + return d, ctx.Err() + case <-reconfiguredCh: + if !waitTimer.Stop() { + select { + case <-waitTimer.C: + default: + } + } + case <-waitTimer.C: + } + *waitDuration += time.Since(waitStart) } return d, err } diff --git a/client/resource_group/controller/controller_test.go b/client/resource_group/controller/controller_test.go index a59be4d5a2d..99f928fdab8 100644 --- a/client/resource_group/controller/controller_test.go +++ b/client/resource_group/controller/controller_test.go @@ -174,6 +174,101 @@ func TestResourceGroupThrottledError(t *testing.T) { re.True(errs.ErrClientResourceGroupThrottled.Equal(err)) } +func TestAcquireTokensSignalAwareWait(t *testing.T) { + re := require.New(t) + + // Build controller with a buffered lowRUNotifyChan so the test can + // observe the notify() call inside reserveN as a synchronization point. + group := &rmpb.ResourceGroup{ + Name: "test", + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{FillRate: 1000}, + }, + }, + } + notifyCh := make(chan notifyMsg, 1) + cfg := DefaultRUConfig() + cfg.WaitRetryInterval = 5 * time.Second + cfg.WaitRetryTimes = 3 + gc, err := newGroupCostController(group, cfg, notifyCh, make(chan *groupCostController, 1)) + re.NoError(err) + + // Set fillRate=0 so reservation always fails with InfDuration, + // which is the exact scenario described in issue #10251. + counter := gc.run.requestUnitTokens[rmpb.RequestUnitType_RU] + counter.limiter.Reconfigure(time.Now(), tokenBucketReconfigureArgs{ + NewTokens: 1000, + NewRate: 0, + NewBurst: 0, + }) + + delta := &rmpb.Consumption{RRU: 5000} + type acquireResult struct { + err error + waitDuration time.Duration + } + resultCh := make(chan acquireResult, 1) + go func() { + var waitDuration time.Duration + _, err := gc.acquireTokens(context.Background(), delta, &waitDuration, false) + resultCh <- acquireResult{err, waitDuration} + }() + + // Wait for notify — Reserve has failed and the retry path is entered. + select { + case <-notifyCh: + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for low-RU notification") + } + + // Now reconfigure with enough tokens and a real fillRate. + // This closes the reconfiguredCh (waking the select) and provides + // tokens so the next Reserve() succeeds. + counter.limiter.Reconfigure(time.Now(), tokenBucketReconfigureArgs{ + NewTokens: 100000, + NewRate: 100000, + NewBurst: 0, + }) + + select { + case r := <-resultCh: + re.NoError(r.err) + re.Less(r.waitDuration, cfg.WaitRetryInterval) + case <-time.After(cfg.WaitRetryInterval): + t.Fatal("acquireTokens was not woken up promptly by Reconfigure signal") + } +} + +func TestAcquireTokensFallbackToTimer(t *testing.T) { + re := require.New(t) + gc := createTestGroupCostController(re) + // Short retry interval so the test runs fast. + gc.mainCfg.WaitRetryInterval = 50 * time.Millisecond + gc.mainCfg.WaitRetryTimes = 3 + gc.mainCfg.LTBMaxWaitDuration = 100 * time.Millisecond + + // Set fillRate=0 and never reconfigure — no signal will arrive. + counter := gc.run.requestUnitTokens[rmpb.RequestUnitType_RU] + counter.limiter.Reconfigure(time.Now(), tokenBucketReconfigureArgs{ + NewTokens: 1000, + NewRate: 0, + NewBurst: 0, + }) + + delta := &rmpb.Consumption{RRU: 5000} + ctx := context.Background() + var waitDuration time.Duration + _, err := gc.acquireTokens(ctx, delta, &waitDuration, false) + + // Without a Reconfigure signal, all retries should exhaust and return an error. + re.Error(err) + re.True(errs.ErrClientResourceGroupThrottled.Equal(err)) + // waitDuration should be roughly retryTimes * retryInterval. + re.GreaterOrEqual(waitDuration, gc.mainCfg.WaitRetryInterval*time.Duration(gc.mainCfg.WaitRetryTimes)) +} + // MockResourceGroupProvider is a mock implementation of the ResourceGroupProvider interface. type MockResourceGroupProvider struct { mock.Mock diff --git a/client/resource_group/controller/limiter.go b/client/resource_group/controller/limiter.go index 5d9823312ca..8a6e10186df 100644 --- a/client/resource_group/controller/limiter.go +++ b/client/resource_group/controller/limiter.go @@ -84,6 +84,10 @@ type Limiter struct { remainingNotifyTimes int name string + // reconfiguredCh is closed on every Reconfigure() call to wake up + // goroutines waiting in acquireTokens() retry loops. + reconfiguredCh chan struct{} + // metrics metrics *limiterMetricsCollection } @@ -114,6 +118,7 @@ func NewLimiter(now time.Time, r Limit, b int64, tokens float64, lowTokensNotify tokens: tokens, burst: b, lowTokensNotifyChan: lowTokensNotifyChan, + reconfiguredCh: make(chan struct{}), } log.Debug("new limiter", zap.String("limiter", fmt.Sprintf("%+v", lim))) return lim @@ -130,6 +135,7 @@ func NewLimiterWithCfg(name string, now time.Time, cfg tokenBucketReconfigureArg burst: cfg.NewBurst, notifyThreshold: cfg.NotifyThreshold, lowTokensNotifyChan: lowTokensNotifyChan, + reconfiguredCh: make(chan struct{}), } lim.metrics = &limiterMetricsCollection{ lowTokenNotifyCounter: lowTokenRequestNotifyCounter.WithLabelValues(lim.name), @@ -253,6 +259,18 @@ func (lim *Limiter) SetName(name string) *Limiter { return lim } +// GetReconfiguredCh returns a channel that is closed when the limiter is +// reconfigured. Callers can select on this to be woken up immediately +// when new tokens arrive, instead of blind-sleeping. +func (lim *Limiter) GetReconfiguredCh() <-chan struct{} { + lim.mu.Lock() + defer lim.mu.Unlock() + if lim.reconfiguredCh == nil { + lim.reconfiguredCh = make(chan struct{}) + } + return lim.reconfiguredCh +} + // notify tries to send a non-blocking notification on notifyCh and disables // further notifications (until the next Reconfigure or StartNotification). func (lim *Limiter) notify() { @@ -351,6 +369,11 @@ func (lim *Limiter) Reconfigure(now time.Time, opt(lim) } lim.maybeNotify() + // Wake up all goroutines waiting in acquireTokens retry loops. + if lim.reconfiguredCh != nil { + close(lim.reconfiguredCh) + } + lim.reconfiguredCh = make(chan struct{}) logControllerTrace("[resource group controller] after reconfigure", zap.String("name", lim.name), zap.Float64("tokens", lim.tokens), zap.Float64("rate", float64(lim.limit)), zap.Float64("notify-threshold", args.NotifyThreshold), zap.Int64("burst", lim.burst)) } diff --git a/client/resource_group/controller/limiter_test.go b/client/resource_group/controller/limiter_test.go index 9afebcb3d53..563e6d6235d 100644 --- a/client/resource_group/controller/limiter_test.go +++ b/client/resource_group/controller/limiter_test.go @@ -235,6 +235,74 @@ func TestQPS(t *testing.T) { } } +func TestReconfiguredCh(t *testing.T) { + re := require.New(t) + nc := make(chan notifyMsg, 1) + lim := NewLimiter(t0, 1, 0, 0, nc) + + // The channel should block initially. + ch := lim.GetReconfiguredCh() + select { + case <-ch: + t.Fatal("reconfiguredCh should not be closed before Reconfigure") + default: + } + + // After Reconfigure the old channel must be closed. + args := tokenBucketReconfigureArgs{NewTokens: 10, NewRate: 1} + lim.Reconfigure(t1, args) + select { + case <-ch: + default: + t.Fatal("reconfiguredCh should be closed after Reconfigure") + } + + // A new channel is created; it should block again. + ch2 := lim.GetReconfiguredCh() + re.NotEqual(fmt.Sprintf("%p", ch), fmt.Sprintf("%p", ch2)) + select { + case <-ch2: + t.Fatal("new reconfiguredCh should not be closed yet") + default: + } + + // Successive Reconfigure calls each close the current channel. + lim.Reconfigure(t2, args) + select { + case <-ch2: + default: + t.Fatal("second reconfiguredCh should be closed after second Reconfigure") + } +} + +func TestReconfiguredChWakesMultipleWaiters(t *testing.T) { + nc := make(chan notifyMsg, 1) + lim := NewLimiter(t0, 1, 0, 0, nc) + + const numWaiters = 5 + ch := lim.GetReconfiguredCh() + wokenUp := make(chan struct{}, numWaiters) + + var wg sync.WaitGroup + for range numWaiters { + wg.Add(1) + go func() { + defer wg.Done() + select { + case <-ch: + wokenUp <- struct{}{} + case <-time.After(2 * time.Second): + } + }() + } + + // Close by reconfiguring. + lim.Reconfigure(t1, tokenBucketReconfigureArgs{NewTokens: 10, NewRate: 1}) + wg.Wait() + + require.Len(t, wokenUp, numWaiters) +} + const testCaseRunTime = 4 * time.Second func testQPSCase(concurrency int, reserveN int64, limit int64) (qps float64, ru float64, needWait time.Duration) {