diff --git a/pkg/compactor/retention/expiration.go b/pkg/compactor/retention/expiration.go index 7e04f2133a9..09c7692b540 100644 --- a/pkg/compactor/retention/expiration.go +++ b/pkg/compactor/retention/expiration.go @@ -242,7 +242,10 @@ func findLatestRetentionStartTime(now model.Time, limits Limits) latestRetention defaultLimits := limits.DefaultLimits() smallestDefaultRetentionPeriod := defaultLimits.RetentionPeriod for _, streamRetention := range defaultLimits.StreamRetention { - if streamRetention.Period < smallestDefaultRetentionPeriod { + if streamRetention.Period <= 0 { + continue + } + if smallestDefaultRetentionPeriod <= 0 || streamRetention.Period < smallestDefaultRetentionPeriod { smallestDefaultRetentionPeriod = streamRetention.Period } } @@ -255,21 +258,37 @@ func findLatestRetentionStartTime(now model.Time, limits Limits) latestRetention for userID, limit := range limitsByUserID { smallestRetentionPeriodForUser := limit.RetentionPeriod for _, streamRetention := range limit.StreamRetention { - if streamRetention.Period < smallestRetentionPeriodForUser { + if streamRetention.Period <= 0 { + continue + } + if smallestRetentionPeriodForUser <= 0 || streamRetention.Period < smallestRetentionPeriodForUser { smallestRetentionPeriodForUser = streamRetention.Period } } - // update the overallSmallestRetentionPeriod if this user has smaller value + if smallestRetentionPeriodForUser <= 0 { + smallestRetentionPeriodByUser[userID] = 0 + continue + } + + // update the overallSmallestRetentionPeriod if this user has a smaller positive value smallestRetentionPeriodByUser[userID] = now.Add(time.Duration(-smallestRetentionPeriodForUser)) - if smallestRetentionPeriodForUser < overallSmallestRetentionPeriod { + if overallSmallestRetentionPeriod <= 0 || smallestRetentionPeriodForUser < overallSmallestRetentionPeriod { overallSmallestRetentionPeriod = smallestRetentionPeriodForUser } } + var defaults, overall model.Time + if smallestDefaultRetentionPeriod > 0 { + defaults = now.Add(time.Duration(-smallestDefaultRetentionPeriod)) + } + if overallSmallestRetentionPeriod > 0 { + overall = now.Add(time.Duration(-overallSmallestRetentionPeriod)) + } + return latestRetentionStartTime{ - defaults: now.Add(time.Duration(-smallestDefaultRetentionPeriod)), - overall: now.Add(time.Duration(-overallSmallestRetentionPeriod)), + defaults: defaults, + overall: overall, byUser: smallestRetentionPeriodByUser, } } diff --git a/pkg/compactor/retention/expiration_test.go b/pkg/compactor/retention/expiration_test.go index 3f7d65c6e01..b5d239e74f9 100644 --- a/pkg/compactor/retention/expiration_test.go +++ b/pkg/compactor/retention/expiration_test.go @@ -476,6 +476,107 @@ func TestFindLatestRetentionStartTime(t *testing.T) { }, }, }, + { + name: "default retention period zero disables retention entirely", + limit: fakeLimits{ + defaultLimit: retentionLimit{ + retentionPeriod: 0, + }, + }, + expectedLatestRetentionStartTime: latestRetentionStartTime{ + overall: 0, + defaults: 0, + byUser: map[string]model.Time{}, + }, + }, + { + name: "default retention zero with all stream retentions zero does not scan any tables", + limit: fakeLimits{ + defaultLimit: retentionLimit{ + retentionPeriod: 0, + streamRetention: []validation.StreamRetention{ + {Period: model.Duration(0)}, + }, + }, + }, + expectedLatestRetentionStartTime: latestRetentionStartTime{ + overall: 0, + defaults: 0, + byUser: map[string]model.Time{}, + }, + }, + { + name: "default retention zero with positive stream retention uses stream period", + limit: fakeLimits{ + defaultLimit: retentionLimit{ + retentionPeriod: 0, + streamRetention: []validation.StreamRetention{ + {Period: model.Duration(7 * dayDuration)}, + }, + }, + }, + expectedLatestRetentionStartTime: latestRetentionStartTime{ + overall: now.Add(-7 * dayDuration), + defaults: now.Add(-7 * dayDuration), + byUser: map[string]model.Time{}, + }, + }, + { + name: "stream retention zero does not override positive default", + limit: fakeLimits{ + defaultLimit: retentionLimit{ + retentionPeriod: 30 * dayDuration, + streamRetention: []validation.StreamRetention{ + {Period: model.Duration(0)}, + }, + }, + }, + expectedLatestRetentionStartTime: latestRetentionStartTime{ + overall: now.Add(-30 * dayDuration), + defaults: now.Add(-30 * dayDuration), + byUser: map[string]model.Time{}, + }, + }, + { + name: "per-user retention zero disables retention for user and does not override overall", + limit: fakeLimits{ + defaultLimit: retentionLimit{ + retentionPeriod: 30 * dayDuration, + }, + perTenant: map[string]retentionLimit{ + "0": {retentionPeriod: 0}, + "1": {retentionPeriod: 7 * dayDuration}, + }, + }, + expectedLatestRetentionStartTime: latestRetentionStartTime{ + overall: now.Add(-7 * dayDuration), + defaults: now.Add(-30 * dayDuration), + byUser: map[string]model.Time{ + "0": 0, + "1": now.Add(-7 * dayDuration), + }, + }, + }, + { + name: "default zero with some users having positive retention", + limit: fakeLimits{ + defaultLimit: retentionLimit{ + retentionPeriod: 0, + }, + perTenant: map[string]retentionLimit{ + "0": {retentionPeriod: 30 * dayDuration}, + "1": {retentionPeriod: 7 * dayDuration}, + }, + }, + expectedLatestRetentionStartTime: latestRetentionStartTime{ + overall: now.Add(-7 * dayDuration), + defaults: 0, + byUser: map[string]model.Time{ + "0": now.Add(-30 * dayDuration), + "1": now.Add(-7 * dayDuration), + }, + }, + }, } { t.Run(tc.name, func(t *testing.T) { latestRetentionStartTime := findLatestRetentionStartTime(now, tc.limit) @@ -493,6 +594,7 @@ func TestExpirationChecker_IntervalMayHaveExpiredChunks(t *testing.T) { byUser: map[string]model.Time{ "user0": now.Add(-72 * time.Hour), "user1": now.Add(-24 * time.Hour), + "user2": 0, }, }, } @@ -556,6 +658,16 @@ func TestExpirationChecker_IntervalMayHaveExpiredChunks(t *testing.T) { hasExpiredChunks: true, }, + // user2 has custom retention disabled, so it must not fall back to defaults + { + name: "user2 index - disabled retention", + userID: "user2", + interval: model.Interval{ + Start: now.Add(-49 * time.Hour), + End: now.Add(-47 * time.Hour), + }, + }, + // user3 not having custom retention so using defaultLatestRetentionStartTime { name: "user3 index - not expired",