diff --git a/pkg/bloombuild/planner/queue/queue.go b/pkg/bloombuild/planner/queue/queue.go index b36be563068..68c8283ddcc 100644 --- a/pkg/bloombuild/planner/queue/queue.go +++ b/pkg/bloombuild/planner/queue/queue.go @@ -57,6 +57,17 @@ func NewQueue( limits Limits, metrics *Metrics, storeMetrics storage.ClientMetrics, +) (*Queue, error) { + return newQueue(logger, cfg, limits, metrics, storeMetrics, 5*time.Minute, 1*time.Hour) +} + +func newQueue( + logger log.Logger, + cfg Config, + limits Limits, + metrics *Metrics, + storeMetrics storage.ClientMetrics, + activeUsersCleanupInterval, activeUsersInactiveTimeout time.Duration, ) (*Queue, error) { // Configure the filesystem client if we are storing tasks on disk. var diskClient client.ObjectClient @@ -73,8 +84,15 @@ func NewQueue( tasksQueue := queue.NewRequestQueue(cfg.MaxQueuedTasksPerTenant, 0, limits, metrics) - // Clean metrics for inactive users: do not have added tasks to the queue in the last 1 hour - activeUsers := util.NewActiveUsersCleanupService(5*time.Minute, 1*time.Hour, func(user string) { + // Re-register the user when the queue is non-empty: PurgeInactiveUsers has already + // removed it from its tracking map by the time this callback runs, so without this + // the metrics would never be cleaned up once the backlog drains. + var activeUsers *util.ActiveUsersCleanupService + activeUsers = util.NewActiveUsersCleanupService(activeUsersCleanupInterval, activeUsersInactiveTimeout, func(user string) { + if tasksQueue.GetUserQueueLength(user) > 0 { + activeUsers.UpdateUserTimestamp(user, time.Now()) + return + } metrics.Cleanup(user) }) diff --git a/pkg/bloombuild/planner/queue/queue_test.go b/pkg/bloombuild/planner/queue/queue_test.go index 9947ac9381a..13d613ace7d 100644 --- a/pkg/bloombuild/planner/queue/queue_test.go +++ b/pkg/bloombuild/planner/queue/queue_test.go @@ -5,11 +5,14 @@ import ( "fmt" "os" "path/filepath" + "strings" "testing" + "time" "github.com/go-kit/log" "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" @@ -209,3 +212,54 @@ type fakeLimits struct{} func (f fakeLimits) MaxConsumers(_ string, _ int) int { return 0 // Unlimited } + +func TestActiveUsersCleanupSkippedWhileQueueNotEmpty(t *testing.T) { + const ( + cleanupInterval = 10 * time.Millisecond + inactiveTimeout = 50 * time.Millisecond + numTasks = 5 + ) + + reg := prometheus.NewPedanticRegistry() + queueMetrics := NewMetrics(reg, "test", "queue") + clientMetrics := storage.NewClientMetrics() + defer clientMetrics.Unregister() + + cfg := Config{MaxQueuedTasksPerTenant: 1000} + q, err := newQueue(log.NewNopLogger(), cfg, fakeLimits{}, queueMetrics, clientMetrics, cleanupInterval, inactiveTimeout) + require.NoError(t, err) + + require.NoError(t, services.StartAndAwaitRunning(context.Background(), q)) + t.Cleanup(func() { _ = services.StopAndAwaitTerminated(context.Background(), q) }) + + const consumer = "fakeConsumer" + q.RegisterConsumerConnection(consumer) + defer q.UnregisterConsumerConnection(consumer) + + tasks := createTasks(numTasks) + for _, task := range tasks { + require.NoError(t, q.Enqueue(task.ProtoTask, task.taskMeta, nil)) + } + + time.Sleep(3 * inactiveTimeout) + + expected := fmt.Sprintf(`# HELP test_queue_queue_length Number of queries in the queue. +# TYPE test_queue_queue_length gauge +test_queue_queue_length{user="fakeTenant"} %d +`, numTasks) + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expected), "test_queue_queue_length")) + + idx := StartIndex + for i := 0; i < numTasks; i++ { + var task *protos.ProtoTask + task, _, idx, err = q.Dequeue(context.Background(), idx, consumer) + require.NoError(t, err) + require.NotNil(t, task) + q.Release(task) + } + + require.Eventually(t, func() bool { + count, err := testutil.GatherAndCount(reg, "test_queue_queue_length") + return err == nil && count == 0 + }, 5*inactiveTimeout, cleanupInterval, "queue_length series should be cleaned up after drain") +} diff --git a/pkg/queue/queue.go b/pkg/queue/queue.go index 7a5f18c1c90..e98669e6e77 100644 --- a/pkg/queue/queue.go +++ b/pkg/queue/queue.go @@ -299,6 +299,16 @@ func (q *RequestQueue) GetConnectedConsumersMetric() float64 { return float64(q.connectedConsumers.Load()) } +// GetUserQueueLength returns the number of pending requests for the given tenant. +func (q *RequestQueue) GetUserQueueLength(tenant string) int { + q.mtx.Lock() + defer q.mtx.Unlock() + if ptr, ok := q.queues.perUserQueueLen[tenant]; ok { + return *ptr + } + return 0 +} + // contextCond is a *sync.Cond with Wait() method overridden to support context-based waiting. type contextCond struct { *sync.Cond