Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 26 additions & 4 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1343,8 +1343,9 @@ func (gc *groupCostController) acquireTokens(ctx context.Context, delta *rmpb.Co
gc.metrics.runningKVRequestCounter.Inc()
defer gc.metrics.runningKVRequestCounter.Dec()
var (
err error
d time.Duration
err error
d time.Duration
reconfiguredCh <-chan struct{}
)
retryLoop:
for range gc.mainCfg.WaitRetryTimes {
Expand All @@ -1363,6 +1364,7 @@ retryLoop:
case rmpb.GroupMode_RUMode:
res := make([]*Reservation, 0, len(requestUnitLimitTypeList))
for typ, counter := range gc.run.requestUnitTokens {
reconfiguredCh = counter.limiter.GetReconfiguredCh()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we move it to line 1365 if no only one type ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although there is technically only one type at the moment, I kept it within a for loop to avoid code conflicts or confusing semantics.

if v := getRUValueFromConsumption(delta, typ); v > 0 {
// record the consume token histogram if enable controller debug mode.
if enableControllerTraceLog.Load() {
Expand All @@ -1381,8 +1383,28 @@ retryLoop:
}
}
gc.metrics.requestRetryCounter.Inc()
time.Sleep(gc.mainCfg.WaitRetryInterval)
*waitDuration += gc.mainCfg.WaitRetryInterval
waitStart := time.Now()
waitTimer := time.NewTimer(gc.mainCfg.WaitRetryInterval)
select {
case <-ctx.Done():
if !waitTimer.Stop() {
select {
case <-waitTimer.C:
default:
}
}
*waitDuration += time.Since(waitStart)
return d, ctx.Err()
case <-reconfiguredCh:
if !waitTimer.Stop() {
select {
case <-waitTimer.C:
default:
}
}
case <-waitTimer.C:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to stop the timer here?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a time.Timer that has already triggered, there is no need to stop it.

}
*waitDuration += time.Since(waitStart)
}
return d, err
}
Expand Down
95 changes: 95 additions & 0 deletions client/resource_group/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,101 @@ func TestResourceGroupThrottledError(t *testing.T) {
re.True(errs.ErrClientResourceGroupThrottled.Equal(err))
}

func TestAcquireTokensSignalAwareWait(t *testing.T) {
re := require.New(t)

// Build controller with a buffered lowRUNotifyChan so the test can
// observe the notify() call inside reserveN as a synchronization point.
group := &rmpb.ResourceGroup{
Name: "test",
Mode: rmpb.GroupMode_RUMode,
RUSettings: &rmpb.GroupRequestUnitSettings{
RU: &rmpb.TokenBucket{
Settings: &rmpb.TokenLimitSettings{FillRate: 1000},
},
},
}
notifyCh := make(chan notifyMsg, 1)
cfg := DefaultRUConfig()
cfg.WaitRetryInterval = 5 * time.Second
cfg.WaitRetryTimes = 3
gc, err := newGroupCostController(group, cfg, notifyCh, make(chan *groupCostController, 1))
re.NoError(err)

// Set fillRate=0 so reservation always fails with InfDuration,
// which is the exact scenario described in issue #10251.
counter := gc.run.requestUnitTokens[rmpb.RequestUnitType_RU]
counter.limiter.Reconfigure(time.Now(), tokenBucketReconfigureArgs{
NewTokens: 1000,
NewRate: 0,
NewBurst: 0,
})

delta := &rmpb.Consumption{RRU: 5000}
type acquireResult struct {
err error
waitDuration time.Duration
}
resultCh := make(chan acquireResult, 1)
go func() {
var waitDuration time.Duration
_, err := gc.acquireTokens(context.Background(), delta, &waitDuration, false)
resultCh <- acquireResult{err, waitDuration}
}()

// Wait for notify — Reserve has failed and the retry path is entered.
select {
case <-notifyCh:
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for low-RU notification")
}

// Now reconfigure with enough tokens and a real fillRate.
// This closes the reconfiguredCh (waking the select) and provides
// tokens so the next Reserve() succeeds.
counter.limiter.Reconfigure(time.Now(), tokenBucketReconfigureArgs{
NewTokens: 100000,
NewRate: 100000,
NewBurst: 0,
})

select {
case r := <-resultCh:
re.NoError(r.err)
re.Less(r.waitDuration, cfg.WaitRetryInterval)
case <-time.After(cfg.WaitRetryInterval):
t.Fatal("acquireTokens was not woken up promptly by Reconfigure signal")
}
}

func TestAcquireTokensFallbackToTimer(t *testing.T) {
re := require.New(t)
gc := createTestGroupCostController(re)
// Short retry interval so the test runs fast.
gc.mainCfg.WaitRetryInterval = 50 * time.Millisecond
gc.mainCfg.WaitRetryTimes = 3
gc.mainCfg.LTBMaxWaitDuration = 100 * time.Millisecond

// Set fillRate=0 and never reconfigure — no signal will arrive.
counter := gc.run.requestUnitTokens[rmpb.RequestUnitType_RU]
counter.limiter.Reconfigure(time.Now(), tokenBucketReconfigureArgs{
NewTokens: 1000,
NewRate: 0,
NewBurst: 0,
})

delta := &rmpb.Consumption{RRU: 5000}
ctx := context.Background()
var waitDuration time.Duration
_, err := gc.acquireTokens(ctx, delta, &waitDuration, false)

// Without a Reconfigure signal, all retries should exhaust and return an error.
re.Error(err)
re.True(errs.ErrClientResourceGroupThrottled.Equal(err))
// waitDuration should be roughly retryTimes * retryInterval.
re.GreaterOrEqual(waitDuration, gc.mainCfg.WaitRetryInterval*time.Duration(gc.mainCfg.WaitRetryTimes))
}

// MockResourceGroupProvider is a mock implementation of the ResourceGroupProvider interface.
type MockResourceGroupProvider struct {
mock.Mock
Expand Down
23 changes: 23 additions & 0 deletions client/resource_group/controller/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ type Limiter struct {
remainingNotifyTimes int
name string

// reconfiguredCh is closed on every Reconfigure() call to wake up
// goroutines waiting in acquireTokens() retry loops.
reconfiguredCh chan struct{}

// metrics
metrics *limiterMetricsCollection
}
Expand Down Expand Up @@ -114,6 +118,7 @@ func NewLimiter(now time.Time, r Limit, b int64, tokens float64, lowTokensNotify
tokens: tokens,
burst: b,
lowTokensNotifyChan: lowTokensNotifyChan,
reconfiguredCh: make(chan struct{}),
}
log.Debug("new limiter", zap.String("limiter", fmt.Sprintf("%+v", lim)))
return lim
Expand All @@ -130,6 +135,7 @@ func NewLimiterWithCfg(name string, now time.Time, cfg tokenBucketReconfigureArg
burst: cfg.NewBurst,
notifyThreshold: cfg.NotifyThreshold,
lowTokensNotifyChan: lowTokensNotifyChan,
reconfiguredCh: make(chan struct{}),
}
lim.metrics = &limiterMetricsCollection{
lowTokenNotifyCounter: lowTokenRequestNotifyCounter.WithLabelValues(lim.name),
Expand Down Expand Up @@ -253,6 +259,18 @@ func (lim *Limiter) SetName(name string) *Limiter {
return lim
}

// GetReconfiguredCh returns a channel that is closed when the limiter is
// reconfigured. Callers can select on this to be woken up immediately
// when new tokens arrive, instead of blind-sleeping.
func (lim *Limiter) GetReconfiguredCh() <-chan struct{} {
lim.mu.Lock()
defer lim.mu.Unlock()
if lim.reconfiguredCh == nil {
lim.reconfiguredCh = make(chan struct{})
}
return lim.reconfiguredCh
}

// notify tries to send a non-blocking notification on notifyCh and disables
// further notifications (until the next Reconfigure or StartNotification).
func (lim *Limiter) notify() {
Expand Down Expand Up @@ -351,6 +369,11 @@ func (lim *Limiter) Reconfigure(now time.Time,
opt(lim)
}
lim.maybeNotify()
// Wake up all goroutines waiting in acquireTokens retry loops.
if lim.reconfiguredCh != nil {
close(lim.reconfiguredCh)
}
lim.reconfiguredCh = make(chan struct{})
logControllerTrace("[resource group controller] after reconfigure", zap.String("name", lim.name), zap.Float64("tokens", lim.tokens), zap.Float64("rate", float64(lim.limit)), zap.Float64("notify-threshold", args.NotifyThreshold), zap.Int64("burst", lim.burst))
}

Expand Down
68 changes: 68 additions & 0 deletions client/resource_group/controller/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,74 @@ func TestQPS(t *testing.T) {
}
}

func TestReconfiguredCh(t *testing.T) {
re := require.New(t)
nc := make(chan notifyMsg, 1)
lim := NewLimiter(t0, 1, 0, 0, nc)

// The channel should block initially.
ch := lim.GetReconfiguredCh()
select {
case <-ch:
t.Fatal("reconfiguredCh should not be closed before Reconfigure")
default:
}

// After Reconfigure the old channel must be closed.
args := tokenBucketReconfigureArgs{NewTokens: 10, NewRate: 1}
lim.Reconfigure(t1, args)
select {
case <-ch:
default:
t.Fatal("reconfiguredCh should be closed after Reconfigure")
}

// A new channel is created; it should block again.
ch2 := lim.GetReconfiguredCh()
re.NotEqual(fmt.Sprintf("%p", ch), fmt.Sprintf("%p", ch2))
select {
case <-ch2:
t.Fatal("new reconfiguredCh should not be closed yet")
default:
}

// Successive Reconfigure calls each close the current channel.
lim.Reconfigure(t2, args)
select {
case <-ch2:
default:
t.Fatal("second reconfiguredCh should be closed after second Reconfigure")
}
}

func TestReconfiguredChWakesMultipleWaiters(t *testing.T) {
nc := make(chan notifyMsg, 1)
lim := NewLimiter(t0, 1, 0, 0, nc)

const numWaiters = 5
ch := lim.GetReconfiguredCh()
wokenUp := make(chan struct{}, numWaiters)

var wg sync.WaitGroup
for range numWaiters {
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-ch:
wokenUp <- struct{}{}
case <-time.After(2 * time.Second):
}
}()
}

// Close by reconfiguring.
lim.Reconfigure(t1, tokenBucketReconfigureArgs{NewTokens: 10, NewRate: 1})
wg.Wait()

require.Len(t, wokenUp, numWaiters)
}

const testCaseRunTime = 4 * time.Second

func testQPSCase(concurrency int, reserveN int64, limit int64) (qps float64, ru float64, needWait time.Duration) {
Expand Down
Loading