Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions pkg/bloombuild/planner/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,17 @@ func NewQueue(
limits Limits,
metrics *Metrics,
storeMetrics storage.ClientMetrics,
) (*Queue, error) {
return newQueue(logger, cfg, limits, metrics, storeMetrics, 5*time.Minute, 1*time.Hour)
}

func newQueue(
logger log.Logger,
cfg Config,
limits Limits,
metrics *Metrics,
storeMetrics storage.ClientMetrics,
activeUsersCleanupInterval, activeUsersInactiveTimeout time.Duration,
) (*Queue, error) {
// Configure the filesystem client if we are storing tasks on disk.
var diskClient client.ObjectClient
Expand All @@ -73,8 +84,15 @@ func NewQueue(

tasksQueue := queue.NewRequestQueue(cfg.MaxQueuedTasksPerTenant, 0, limits, metrics)

// Clean metrics for inactive users: do not have added tasks to the queue in the last 1 hour
activeUsers := util.NewActiveUsersCleanupService(5*time.Minute, 1*time.Hour, func(user string) {
// Re-register the user when the queue is non-empty: PurgeInactiveUsers has already
// removed it from its tracking map by the time this callback runs, so without this
// the metrics would never be cleaned up once the backlog drains.
var activeUsers *util.ActiveUsersCleanupService
activeUsers = util.NewActiveUsersCleanupService(activeUsersCleanupInterval, activeUsersInactiveTimeout, func(user string) {
if tasksQueue.GetUserQueueLength(user) > 0 {
activeUsers.UpdateUserTimestamp(user, time.Now())
return
}
metrics.Cleanup(user)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether it would make more sense to check the queue length of an inactive user before cleaning up the their metrics.
It could still be the case that there haven't been any dequeue operations for more than the inactive period.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, took the suggestion in 22c0eb4.

})

Expand Down
54 changes: 54 additions & 0 deletions pkg/bloombuild/planner/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ import (
"fmt"
"os"
"path/filepath"
"strings"
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -209,3 +212,54 @@ type fakeLimits struct{}
func (f fakeLimits) MaxConsumers(_ string, _ int) int {
return 0 // Unlimited
}

func TestActiveUsersCleanupSkippedWhileQueueNotEmpty(t *testing.T) {
const (
cleanupInterval = 10 * time.Millisecond
inactiveTimeout = 50 * time.Millisecond
numTasks = 5
)

reg := prometheus.NewPedanticRegistry()
queueMetrics := NewMetrics(reg, "test", "queue")
clientMetrics := storage.NewClientMetrics()
defer clientMetrics.Unregister()

cfg := Config{MaxQueuedTasksPerTenant: 1000}
q, err := newQueue(log.NewNopLogger(), cfg, fakeLimits{}, queueMetrics, clientMetrics, cleanupInterval, inactiveTimeout)
require.NoError(t, err)

require.NoError(t, services.StartAndAwaitRunning(context.Background(), q))
t.Cleanup(func() { _ = services.StopAndAwaitTerminated(context.Background(), q) })

const consumer = "fakeConsumer"
q.RegisterConsumerConnection(consumer)
defer q.UnregisterConsumerConnection(consumer)

tasks := createTasks(numTasks)
for _, task := range tasks {
require.NoError(t, q.Enqueue(task.ProtoTask, task.taskMeta, nil))
}

time.Sleep(3 * inactiveTimeout)

expected := fmt.Sprintf(`# HELP test_queue_queue_length Number of queries in the queue.
# TYPE test_queue_queue_length gauge
test_queue_queue_length{user="fakeTenant"} %d
`, numTasks)
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expected), "test_queue_queue_length"))

idx := StartIndex
for i := 0; i < numTasks; i++ {
var task *protos.ProtoTask
task, _, idx, err = q.Dequeue(context.Background(), idx, consumer)
require.NoError(t, err)
require.NotNil(t, task)
q.Release(task)
}

require.Eventually(t, func() bool {
count, err := testutil.GatherAndCount(reg, "test_queue_queue_length")
return err == nil && count == 0
}, 5*inactiveTimeout, cleanupInterval, "queue_length series should be cleaned up after drain")
}
10 changes: 10 additions & 0 deletions pkg/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,16 @@ func (q *RequestQueue) GetConnectedConsumersMetric() float64 {
return float64(q.connectedConsumers.Load())
}

// GetUserQueueLength returns the number of pending requests for the given tenant.
func (q *RequestQueue) GetUserQueueLength(tenant string) int {
q.mtx.Lock()
defer q.mtx.Unlock()
if ptr, ok := q.queues.perUserQueueLen[tenant]; ok {
return *ptr
}
return 0
}

// contextCond is a *sync.Cond with Wait() method overridden to support context-based waiting.
type contextCond struct {
*sync.Cond
Expand Down