Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 25 additions & 6 deletions pkg/compactor/retention/expiration.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,10 @@ func findLatestRetentionStartTime(now model.Time, limits Limits) latestRetention
defaultLimits := limits.DefaultLimits()
smallestDefaultRetentionPeriod := defaultLimits.RetentionPeriod
for _, streamRetention := range defaultLimits.StreamRetention {
if streamRetention.Period < smallestDefaultRetentionPeriod {
if streamRetention.Period <= 0 {
continue
}
if smallestDefaultRetentionPeriod <= 0 || streamRetention.Period < smallestDefaultRetentionPeriod {
smallestDefaultRetentionPeriod = streamRetention.Period
}
}
Expand All @@ -255,21 +258,37 @@ func findLatestRetentionStartTime(now model.Time, limits Limits) latestRetention
for userID, limit := range limitsByUserID {
smallestRetentionPeriodForUser := limit.RetentionPeriod
for _, streamRetention := range limit.StreamRetention {
if streamRetention.Period < smallestRetentionPeriodForUser {
if streamRetention.Period <= 0 {
continue
}
if smallestRetentionPeriodForUser <= 0 || streamRetention.Period < smallestRetentionPeriodForUser {
smallestRetentionPeriodForUser = streamRetention.Period
}
}

// update the overallSmallestRetentionPeriod if this user has smaller value
if smallestRetentionPeriodForUser <= 0 {
smallestRetentionPeriodByUser[userID] = 0
continue
}

// update the overallSmallestRetentionPeriod if this user has a smaller positive value
smallestRetentionPeriodByUser[userID] = now.Add(time.Duration(-smallestRetentionPeriodForUser))
if smallestRetentionPeriodForUser < overallSmallestRetentionPeriod {
if overallSmallestRetentionPeriod <= 0 || smallestRetentionPeriodForUser < overallSmallestRetentionPeriod {
overallSmallestRetentionPeriod = smallestRetentionPeriodForUser
}
}

var defaults, overall model.Time
if smallestDefaultRetentionPeriod > 0 {
defaults = now.Add(time.Duration(-smallestDefaultRetentionPeriod))
}
if overallSmallestRetentionPeriod > 0 {
overall = now.Add(time.Duration(-overallSmallestRetentionPeriod))
}

return latestRetentionStartTime{
defaults: now.Add(time.Duration(-smallestDefaultRetentionPeriod)),
overall: now.Add(time.Duration(-overallSmallestRetentionPeriod)),
defaults: defaults,
overall: overall,
byUser: smallestRetentionPeriodByUser,
}
}
112 changes: 112 additions & 0 deletions pkg/compactor/retention/expiration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,107 @@ func TestFindLatestRetentionStartTime(t *testing.T) {
},
},
},
{
name: "default retention period zero disables retention entirely",
limit: fakeLimits{
defaultLimit: retentionLimit{
retentionPeriod: 0,
},
},
expectedLatestRetentionStartTime: latestRetentionStartTime{
overall: 0,
defaults: 0,
byUser: map[string]model.Time{},
},
},
{
name: "default retention zero with all stream retentions zero does not scan any tables",
limit: fakeLimits{
defaultLimit: retentionLimit{
retentionPeriod: 0,
streamRetention: []validation.StreamRetention{
{Period: model.Duration(0)},
},
},
},
expectedLatestRetentionStartTime: latestRetentionStartTime{
overall: 0,
defaults: 0,
byUser: map[string]model.Time{},
},
},
{
name: "default retention zero with positive stream retention uses stream period",
limit: fakeLimits{
defaultLimit: retentionLimit{
retentionPeriod: 0,
streamRetention: []validation.StreamRetention{
{Period: model.Duration(7 * dayDuration)},
},
},
},
expectedLatestRetentionStartTime: latestRetentionStartTime{
overall: now.Add(-7 * dayDuration),
defaults: now.Add(-7 * dayDuration),
byUser: map[string]model.Time{},
},
},
{
name: "stream retention zero does not override positive default",
limit: fakeLimits{
defaultLimit: retentionLimit{
retentionPeriod: 30 * dayDuration,
streamRetention: []validation.StreamRetention{
{Period: model.Duration(0)},
},
},
},
expectedLatestRetentionStartTime: latestRetentionStartTime{
overall: now.Add(-30 * dayDuration),
defaults: now.Add(-30 * dayDuration),
byUser: map[string]model.Time{},
},
},
{
name: "per-user retention zero disables retention for user and does not override overall",
limit: fakeLimits{
defaultLimit: retentionLimit{
retentionPeriod: 30 * dayDuration,
},
perTenant: map[string]retentionLimit{
"0": {retentionPeriod: 0},
"1": {retentionPeriod: 7 * dayDuration},
},
},
expectedLatestRetentionStartTime: latestRetentionStartTime{
overall: now.Add(-7 * dayDuration),
defaults: now.Add(-30 * dayDuration),
byUser: map[string]model.Time{
"0": 0,
"1": now.Add(-7 * dayDuration),
},
},
},
{
name: "default zero with some users having positive retention",
limit: fakeLimits{
defaultLimit: retentionLimit{
retentionPeriod: 0,
},
perTenant: map[string]retentionLimit{
"0": {retentionPeriod: 30 * dayDuration},
"1": {retentionPeriod: 7 * dayDuration},
},
},
expectedLatestRetentionStartTime: latestRetentionStartTime{
overall: now.Add(-7 * dayDuration),
defaults: 0,
byUser: map[string]model.Time{
"0": now.Add(-30 * dayDuration),
"1": now.Add(-7 * dayDuration),
},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
latestRetentionStartTime := findLatestRetentionStartTime(now, tc.limit)
Expand All @@ -493,6 +594,7 @@ func TestExpirationChecker_IntervalMayHaveExpiredChunks(t *testing.T) {
byUser: map[string]model.Time{
"user0": now.Add(-72 * time.Hour),
"user1": now.Add(-24 * time.Hour),
"user2": 0,
},
},
}
Expand Down Expand Up @@ -556,6 +658,16 @@ func TestExpirationChecker_IntervalMayHaveExpiredChunks(t *testing.T) {
hasExpiredChunks: true,
},

// user2 has custom retention disabled, so it must not fall back to defaults
{
name: "user2 index - disabled retention",
userID: "user2",
interval: model.Interval{
Start: now.Add(-49 * time.Hour),
End: now.Add(-47 * time.Hour),
},
},

// user3 not having custom retention so using defaultLatestRetentionStartTime
{
name: "user3 index - not expired",
Expand Down