Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions pkg/bloombuild/planner/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,17 @@ func NewQueue(
limits Limits,
metrics *Metrics,
storeMetrics storage.ClientMetrics,
) (*Queue, error) {
return newQueue(logger, cfg, limits, metrics, storeMetrics, 5*time.Minute, 1*time.Hour)
}

func newQueue(
logger log.Logger,
cfg Config,
limits Limits,
metrics *Metrics,
storeMetrics storage.ClientMetrics,
activeUsersCleanupInterval, activeUsersInactiveTimeout time.Duration,
) (*Queue, error) {
// Configure the filesystem client if we are storing tasks on disk.
var diskClient client.ObjectClient
Expand All @@ -73,8 +84,10 @@ func NewQueue(

tasksQueue := queue.NewRequestQueue(cfg.MaxQueuedTasksPerTenant, 0, limits, metrics)

// Clean metrics for inactive users: do not have added tasks to the queue in the last 1 hour
activeUsers := util.NewActiveUsersCleanupService(5*time.Minute, 1*time.Hour, func(user string) {
// Clean metrics for inactive users: do not have touched the queue in the last activeUsersInactiveTimeout.
// Both Enqueue and Dequeue/Release refresh the user's timestamp so that an in-flight backlog being drained
// by builders keeps the gauge series alive even when no new tasks are enqueued for a while.
activeUsers := util.NewActiveUsersCleanupService(activeUsersCleanupInterval, activeUsersInactiveTimeout, func(user string) {
metrics.Cleanup(user)
Comment thread
chaudum marked this conversation as resolved.
})

Expand Down Expand Up @@ -203,6 +216,7 @@ func (q *Queue) Dequeue(ctx context.Context, last Index, consumerID string) (*pr

if !q.cfg.StoreTasksOnDisk {
val := item.(metaWithTask)
q.activeUsers.UpdateUserTimestamp(val.task.Tenant, time.Now())
return val.task, val.metadata, idx, nil
}

Expand All @@ -213,6 +227,7 @@ func (q *Queue) Dequeue(ctx context.Context, last Index, consumerID string) (*pr
return nil, nil, idx, err
}

q.activeUsers.UpdateUserTimestamp(task.Tenant, time.Now())
return task, meta.metadata, idx, nil
}

Expand All @@ -225,6 +240,8 @@ func (q *Queue) Release(task *protos.ProtoTask) {
return
}

q.activeUsers.UpdateUserTimestamp(task.Tenant, time.Now())

if !q.cfg.StoreTasksOnDisk {
return
}
Expand Down
60 changes: 60 additions & 0 deletions pkg/bloombuild/planner/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ import (
"fmt"
"os"
"path/filepath"
"strings"
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -209,3 +212,60 @@ type fakeLimits struct{}
func (f fakeLimits) MaxConsumers(_ string, _ int) int {
return 0 // Unlimited
}

// TestQueueLengthMetricStaysNonNegativeWhileDraining is a regression test for the
// scenario where a tenant's bloom build backlog is drained by builders for longer
// than the active-users inactivity timeout without any new enqueues.
//
// Previously the cleanup ticker would DeleteLabelValues the per-tenant queue_length
// gauge series mid-drain (because UpdateUserTimestamp was only called from Enqueue),
// after which subsequent Dequeue() calls would lazily recreate the series at zero
// and decrement it into negative values. See https://github.com/grafana/loki/issues/19490.
func TestQueueLengthMetricStaysNonNegativeWhileDraining(t *testing.T) {
const (
cleanupInterval = 10 * time.Millisecond
inactiveTimeout = 50 * time.Millisecond
dequeueInterval = 30 * time.Millisecond
numTasks = 5
)

reg := prometheus.NewPedanticRegistry()
queueMetrics := NewMetrics(reg, "test", "queue")
clientMetrics := storage.NewClientMetrics()
defer clientMetrics.Unregister()

cfg := Config{MaxQueuedTasksPerTenant: 1000}
q, err := newQueue(log.NewNopLogger(), cfg, fakeLimits{}, queueMetrics, clientMetrics, cleanupInterval, inactiveTimeout)
require.NoError(t, err)

require.NoError(t, services.StartAndAwaitRunning(context.Background(), q))
t.Cleanup(func() { _ = services.StopAndAwaitTerminated(context.Background(), q) })

const consumer = "fakeConsumer"
q.RegisterConsumerConnection(consumer)
defer q.UnregisterConsumerConnection(consumer)

for _, task := range createTasks(numTasks) {
require.NoError(t, q.Enqueue(task.ProtoTask, task.taskMeta, nil))
}

idx := StartIndex
for i := 0; i < numTasks; i++ {
time.Sleep(dequeueInterval)
var task *protos.ProtoTask
task, _, idx, err = q.Dequeue(context.Background(), idx, consumer)
require.NoError(t, err)
require.NotNil(t, task)
q.Release(task)
}

// numTasks * dequeueInterval == 150ms, well past inactiveTimeout (50ms), so the
// cleanup ticker (10ms) had many opportunities to purge "fakeTenant". With the
// fix in place each Dequeue/Release refreshes the timestamp, so the gauge series
// for the tenant is preserved and ends at exactly zero after the drain.
expected := `# HELP test_queue_queue_length Number of queries in the queue.
# TYPE test_queue_queue_length gauge
test_queue_queue_length{user="fakeTenant"} 0
`
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expected), "test_queue_queue_length"))
}