diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index cf81dd561cf3..ab9ab42c1870 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -2487,9 +2487,9 @@ The `compactor` block configures the compactor component, which compacts index s # CLI flag: -compactor.delete-request-cancel-period [delete_request_cancel_period: | default = 24h] -# Constrain the size of any single delete request. When a delete request > -# delete_max_interval is input, the request is sharded into smaller requests of -# no more than delete_max_interval +# Constrain the size of any single delete request with line filters. When a +# delete request > delete_max_interval is input, the request is sharded into +# smaller requests of no more than delete_max_interval # CLI flag: -compactor.delete-max-interval [delete_max_interval: | default = 24h] diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 75bd575e2c77..6521983729b9 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -102,7 +102,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.DeleteRequestStoreKeyPrefix, "compactor.delete-request-store.key-prefix", "index/", "Path prefix for storing delete requests.") f.IntVar(&cfg.DeleteBatchSize, "compactor.delete-batch-size", 70, "The max number of delete requests to run per compaction cycle.") f.DurationVar(&cfg.DeleteRequestCancelPeriod, "compactor.delete-request-cancel-period", 24*time.Hour, "Allow cancellation of delete request until duration after they are created. Data would be deleted only after delete requests have been older than this duration. Ideally this should be set to at least 24h.") - f.DurationVar(&cfg.DeleteMaxInterval, "compactor.delete-max-interval", 24*time.Hour, "Constrain the size of any single delete request. When a delete request > delete_max_interval is input, the request is sharded into smaller requests of no more than delete_max_interval") + f.DurationVar(&cfg.DeleteMaxInterval, "compactor.delete-max-interval", 24*time.Hour, "Constrain the size of any single delete request with line filters. When a delete request > delete_max_interval is input, the request is sharded into smaller requests of no more than delete_max_interval") f.DurationVar(&cfg.RetentionTableTimeout, "compactor.retention-table-timeout", 0, "The maximum amount of time to spend running retention and deletion on any given table in the index.") f.IntVar(&cfg.MaxCompactionParallelism, "compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.") f.IntVar(&cfg.UploadParallelism, "compactor.upload-parallelism", 10, "Number of upload/remove operations to execute in parallel when finalizing a compaction. NOTE: This setting is per compaction operation, which can be executed in parallel. The upper bound on the number of concurrent uploads is upload_parallelism * max_compaction_parallelism.") diff --git a/pkg/compactor/deletion/delete_requests_manager.go b/pkg/compactor/deletion/delete_requests_manager.go index 0e22439dd2a6..c18d1b032ba7 100644 --- a/pkg/compactor/deletion/delete_requests_manager.go +++ b/pkg/compactor/deletion/delete_requests_manager.go @@ -126,10 +126,25 @@ func (d *DeleteRequestsManager) loadDeleteRequestsToProcess() error { return err } + reqCount := 0 for i := range deleteRequests { deleteRequest := deleteRequests[i] - if i >= d.batchSize { - logBatchTruncation(i, len(deleteRequests)) + maxRetentionInterval := getMaxRetentionInterval(deleteRequest.UserID, d.limits) + // retention interval 0 means retain the data forever + if maxRetentionInterval != 0 { + oldestRetainedLogTimestamp := model.Now().Add(-maxRetentionInterval) + if deleteRequest.StartTime.Before(oldestRetainedLogTimestamp) && deleteRequest.EndTime.Before(oldestRetainedLogTimestamp) { + level.Info(util_log.Logger).Log( + "msg", "Marking delete request with interval beyond retention period as processed", + "delete_request_id", deleteRequest.RequestID, + "user", deleteRequest.UserID, + ) + d.markRequestAsProcessed(deleteRequest) + continue + } + } + if reqCount >= d.batchSize { + logBatchTruncation(reqCount, len(deleteRequests)) break } @@ -149,6 +164,7 @@ func (d *DeleteRequestsManager) loadDeleteRequestsToProcess() error { if deleteRequest.EndTime > ur.requestsInterval.End { ur.requestsInterval.End = deleteRequest.EndTime } + reqCount++ } return nil @@ -305,6 +321,28 @@ func (d *DeleteRequestsManager) MarkPhaseTimedOut() { d.deleteRequestsToProcess = map[string]*userDeleteRequests{} } +func (d *DeleteRequestsManager) markRequestAsProcessed(deleteRequest DeleteRequest) { + if err := d.deleteRequestsStore.UpdateStatus(context.Background(), deleteRequest, StatusProcessed); err != nil { + level.Error(util_log.Logger).Log( + "msg", "failed to mark delete request for user as processed", + "delete_request_id", deleteRequest.RequestID, + "sequence_num", deleteRequest.SequenceNum, + "user", deleteRequest.UserID, + "err", err, + "deleted_lines", deleteRequest.DeletedLines, + ) + } else { + level.Info(util_log.Logger).Log( + "msg", "delete request for user marked as processed", + "delete_request_id", deleteRequest.RequestID, + "sequence_num", deleteRequest.SequenceNum, + "user", deleteRequest.UserID, + "deleted_lines", deleteRequest.DeletedLines, + ) + d.metrics.deleteRequestsProcessedTotal.WithLabelValues(deleteRequest.UserID).Inc() + } +} + func (d *DeleteRequestsManager) MarkPhaseFinished() { d.deleteRequestsToProcessMtx.Lock() defer d.deleteRequestsToProcessMtx.Unlock() @@ -315,25 +353,7 @@ func (d *DeleteRequestsManager) MarkPhaseFinished() { } for _, deleteRequest := range userDeleteRequests.requests { - if err := d.deleteRequestsStore.UpdateStatus(context.Background(), *deleteRequest, StatusProcessed); err != nil { - level.Error(util_log.Logger).Log( - "msg", "failed to mark delete request for user as processed", - "delete_request_id", deleteRequest.RequestID, - "sequence_num", deleteRequest.SequenceNum, - "user", deleteRequest.UserID, - "err", err, - "deleted_lines", deleteRequest.DeletedLines, - ) - } else { - level.Info(util_log.Logger).Log( - "msg", "delete request for user marked as processed", - "delete_request_id", deleteRequest.RequestID, - "sequence_num", deleteRequest.SequenceNum, - "user", deleteRequest.UserID, - "deleted_lines", deleteRequest.DeletedLines, - ) - } - d.metrics.deleteRequestsProcessedTotal.WithLabelValues(deleteRequest.UserID).Inc() + d.markRequestAsProcessed(*deleteRequest) } } } @@ -355,3 +375,21 @@ func (d *DeleteRequestsManager) IntervalMayHaveExpiredChunks(_ model.Interval, u func (d *DeleteRequestsManager) DropFromIndex(_ retention.ChunkEntry, _ model.Time, _ model.Time) bool { return false } + +func getMaxRetentionInterval(userID string, limits Limits) time.Duration { + maxRetention := model.Duration(limits.RetentionPeriod(userID)) + if maxRetention == 0 { + return 0 + } + + for _, streamRetention := range limits.StreamRetention(userID) { + if streamRetention.Period == 0 { + return 0 + } + if streamRetention.Period > maxRetention { + maxRetention = streamRetention.Period + } + } + + return time.Duration(maxRetention) +} diff --git a/pkg/compactor/deletion/delete_requests_manager_test.go b/pkg/compactor/deletion/delete_requests_manager_test.go index c2777c9801b7..44285bb890b4 100644 --- a/pkg/compactor/deletion/delete_requests_manager_test.go +++ b/pkg/compactor/deletion/delete_requests_manager_test.go @@ -41,12 +41,13 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { } for _, tc := range []struct { - name string - deletionMode deletionmode.Mode - deleteRequestsFromStore []DeleteRequest - batchSize int - expectedResp resp - expectedDeletionRangeByUser map[string]model.Interval + name string + deletionMode deletionmode.Mode + deleteRequestsFromStore []DeleteRequest + batchSize int + expectedResp resp + expectedDeletionRangeByUser map[string]model.Interval + expectedRequestsMarkedAsProcessed []int }{ { name: "no delete requests", @@ -66,6 +67,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { Query: lblFoo.String(), StartTime: now.Add(-24 * time.Hour), EndTime: now, + Status: StatusReceived, }, }, expectedResp: resp{ @@ -77,6 +79,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { End: now, }, }, + expectedRequestsMarkedAsProcessed: []int{0}, }, { name: "no relevant delete requests", @@ -88,6 +91,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { Query: lblFoo.String(), StartTime: now.Add(-24 * time.Hour), EndTime: now, + Status: StatusReceived, }, }, expectedResp: resp{ @@ -99,6 +103,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { End: now, }, }, + expectedRequestsMarkedAsProcessed: []int{0}, }, { name: "delete request not matching labels", @@ -110,6 +115,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { Query: `{fizz="buzz"}`, StartTime: now.Add(-24 * time.Hour), EndTime: now, + Status: StatusReceived, }, }, expectedResp: resp{ @@ -121,6 +127,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { End: now, }, }, + expectedRequestsMarkedAsProcessed: []int{0}, }, { name: "whole chunk deleted by single request", @@ -132,6 +139,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { Query: lblFoo.String(), StartTime: now.Add(-24 * time.Hour), EndTime: now, + Status: StatusReceived, }, }, expectedResp: resp{ @@ -143,6 +151,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { End: now, }, }, + expectedRequestsMarkedAsProcessed: []int{0}, }, { name: "whole chunk deleted by single request with line filters", @@ -154,6 +163,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { Query: streamSelectorWithLineFilters, StartTime: now.Add(-24 * time.Hour), EndTime: now, + Status: StatusReceived, }, }, expectedResp: resp{ @@ -168,6 +178,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { End: now, }, }, + expectedRequestsMarkedAsProcessed: []int{0}, }, { name: "whole chunk deleted by single request with structured metadata filters", @@ -179,6 +190,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { Query: streamSelectorWithStructuredMetadataFilters, StartTime: now.Add(-24 * time.Hour), EndTime: now, + Status: StatusReceived, }, }, expectedResp: resp{ @@ -193,6 +205,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { End: now, }, }, + expectedRequestsMarkedAsProcessed: []int{0}, }, { name: "whole chunk deleted by single request with line and structured metadata filters", @@ -204,6 +217,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { Query: streamSelectorWithLineAndStructuredMetadataFilters, StartTime: now.Add(-24 * time.Hour), EndTime: now, + Status: StatusReceived, }, }, expectedResp: resp{ @@ -218,6 +232,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { End: now, }, }, + expectedRequestsMarkedAsProcessed: []int{0}, }, { name: "deleted interval out of range", @@ -229,6 +244,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { Query: lblFoo.String(), StartTime: now.Add(-48 * time.Hour), EndTime: now.Add(-24 * time.Hour), + Status: StatusReceived, }, }, expectedResp: resp{ @@ -240,6 +256,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { End: now.Add(-24 * time.Hour), }, }, + expectedRequestsMarkedAsProcessed: []int{0}, }, { name: "deleted interval out of range(with multiple user requests)", @@ -251,12 +268,14 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { Query: lblFoo.String(), StartTime: now.Add(-48 * time.Hour), EndTime: now.Add(-24 * time.Hour), + Status: StatusReceived, }, { UserID: "different-user", Query: lblFoo.String(), StartTime: now.Add(-24 * time.Hour), EndTime: now, + Status: StatusReceived, }, }, expectedResp: resp{ @@ -272,6 +291,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { End: now, }, }, + expectedRequestsMarkedAsProcessed: []int{0, 1}, }, { name: "multiple delete requests with one deleting the whole chunk", @@ -283,12 +303,14 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { Query: lblFoo.String(), StartTime: now.Add(-48 * time.Hour), EndTime: now.Add(-24 * time.Hour), + Status: StatusReceived, }, { UserID: testUserID, Query: lblFoo.String(), StartTime: now.Add(-12 * time.Hour), EndTime: now, + Status: StatusReceived, }, }, expectedResp: resp{ @@ -300,6 +322,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { End: now, }, }, + expectedRequestsMarkedAsProcessed: []int{0, 1}, }, { name: "multiple delete requests with line filters and one deleting the whole chunk", @@ -311,12 +334,14 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { Query: streamSelectorWithLineFilters, StartTime: now.Add(-48 * time.Hour), EndTime: now.Add(-24 * time.Hour), + Status: StatusReceived, }, { UserID: testUserID, Query: streamSelectorWithLineFilters, StartTime: now.Add(-12 * time.Hour), EndTime: now, + Status: StatusReceived, }, }, expectedResp: resp{ @@ -331,6 +356,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { End: now, }, }, + expectedRequestsMarkedAsProcessed: []int{0, 1}, }, { name: "multiple delete requests with structured metadata filters and one deleting the whole chunk", @@ -342,12 +368,14 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { Query: streamSelectorWithStructuredMetadataFilters, StartTime: now.Add(-48 * time.Hour), EndTime: now.Add(-24 * time.Hour), + Status: StatusReceived, }, { UserID: testUserID, Query: streamSelectorWithStructuredMetadataFilters, StartTime: now.Add(-12 * time.Hour), EndTime: now, + Status: StatusReceived, }, }, expectedResp: resp{ @@ -362,6 +390,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { End: now, }, }, + expectedRequestsMarkedAsProcessed: []int{0, 1}, }, { name: "multiple delete requests causing multiple holes", @@ -373,24 +402,28 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { Query: lblFoo.String(), StartTime: now.Add(-13 * time.Hour), EndTime: now.Add(-11 * time.Hour), + Status: StatusReceived, }, { UserID: testUserID, Query: lblFoo.String(), StartTime: now.Add(-10 * time.Hour), EndTime: now.Add(-8 * time.Hour), + Status: StatusReceived, }, { UserID: testUserID, Query: lblFoo.String(), StartTime: now.Add(-6 * time.Hour), EndTime: now.Add(-5 * time.Hour), + Status: StatusReceived, }, { UserID: testUserID, Query: lblFoo.String(), StartTime: now.Add(-2 * time.Hour), EndTime: now, + Status: StatusReceived, }, }, expectedResp: resp{ @@ -412,6 +445,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { End: now, }, }, + expectedRequestsMarkedAsProcessed: []int{0, 1, 2, 3}, }, { name: "multiple overlapping requests deleting the whole chunk", @@ -423,12 +457,14 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { Query: lblFoo.String(), StartTime: now.Add(-13 * time.Hour), EndTime: now.Add(-6 * time.Hour), + Status: StatusReceived, }, { UserID: testUserID, Query: lblFoo.String(), StartTime: now.Add(-8 * time.Hour), EndTime: now, + Status: StatusReceived, }, }, expectedResp: resp{ @@ -443,6 +479,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { End: now, }, }, + expectedRequestsMarkedAsProcessed: []int{0, 1}, }, { name: "multiple overlapping requests with line filters deleting the whole chunk", @@ -454,12 +491,14 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { Query: streamSelectorWithLineFilters, StartTime: now.Add(-13 * time.Hour), EndTime: now.Add(-6 * time.Hour), + Status: StatusReceived, }, { UserID: testUserID, Query: streamSelectorWithLineFilters, StartTime: now.Add(-8 * time.Hour), EndTime: now, + Status: StatusReceived, }, }, expectedResp: resp{ @@ -474,6 +513,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { End: now, }, }, + expectedRequestsMarkedAsProcessed: []int{0, 1}, }, { name: "multiple overlapping requests with structured metadata filters deleting the whole chunk", @@ -485,12 +525,14 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { Query: streamSelectorWithStructuredMetadataFilters, StartTime: now.Add(-13 * time.Hour), EndTime: now.Add(-6 * time.Hour), + Status: StatusReceived, }, { UserID: testUserID, Query: streamSelectorWithStructuredMetadataFilters, StartTime: now.Add(-8 * time.Hour), EndTime: now, + Status: StatusReceived, }, }, expectedResp: resp{ @@ -505,6 +547,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { End: now, }, }, + expectedRequestsMarkedAsProcessed: []int{0, 1}, }, { name: "multiple non-overlapping requests deleting the whole chunk", @@ -516,18 +559,21 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { Query: lblFoo.String(), StartTime: now.Add(-12 * time.Hour), EndTime: now.Add(-6*time.Hour) - 1, + Status: StatusReceived, }, { UserID: testUserID, Query: lblFoo.String(), StartTime: now.Add(-6 * time.Hour), EndTime: now.Add(-4*time.Hour) - 1, + Status: StatusReceived, }, { UserID: testUserID, Query: lblFoo.String(), StartTime: now.Add(-4 * time.Hour), EndTime: now, + Status: StatusReceived, }, }, expectedResp: resp{ @@ -542,6 +588,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { End: now, }, }, + expectedRequestsMarkedAsProcessed: []int{0, 1, 2}, }, { name: "multiple non-overlapping requests with line filter deleting the whole chunk", @@ -553,18 +600,21 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { Query: streamSelectorWithLineFilters, StartTime: now.Add(-12 * time.Hour), EndTime: now.Add(-6*time.Hour) - 1, + Status: StatusReceived, }, { UserID: testUserID, Query: streamSelectorWithLineFilters, StartTime: now.Add(-6 * time.Hour), EndTime: now.Add(-4*time.Hour) - 1, + Status: StatusReceived, }, { UserID: testUserID, Query: streamSelectorWithLineFilters, StartTime: now.Add(-4 * time.Hour), EndTime: now, + Status: StatusReceived, }, }, expectedResp: resp{ @@ -579,6 +629,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { End: now, }, }, + expectedRequestsMarkedAsProcessed: []int{0, 1, 2}, }, { name: "multiple non-overlapping requests with structured metadata filter deleting the whole chunk", @@ -590,18 +641,21 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { Query: streamSelectorWithStructuredMetadataFilters, StartTime: now.Add(-12 * time.Hour), EndTime: now.Add(-6*time.Hour) - 1, + Status: StatusReceived, }, { UserID: testUserID, Query: streamSelectorWithStructuredMetadataFilters, StartTime: now.Add(-6 * time.Hour), EndTime: now.Add(-4*time.Hour) - 1, + Status: StatusReceived, }, { UserID: testUserID, Query: streamSelectorWithStructuredMetadataFilters, StartTime: now.Add(-4 * time.Hour), EndTime: now, + Status: StatusReceived, }, }, expectedResp: resp{ @@ -616,6 +670,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { End: now, }, }, + expectedRequestsMarkedAsProcessed: []int{0, 1, 2}, }, { name: "deletes are disabled", @@ -627,24 +682,28 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { Query: lblFoo.String(), StartTime: now.Add(-13 * time.Hour), EndTime: now.Add(-11 * time.Hour), + Status: StatusReceived, }, { UserID: testUserID, Query: lblFoo.String(), StartTime: now.Add(-10 * time.Hour), EndTime: now.Add(-8 * time.Hour), + Status: StatusReceived, }, { UserID: testUserID, Query: lblFoo.String(), StartTime: now.Add(-6 * time.Hour), EndTime: now.Add(-5 * time.Hour), + Status: StatusReceived, }, { UserID: testUserID, Query: lblFoo.String(), StartTime: now.Add(-2 * time.Hour), EndTime: now, + Status: StatusReceived, }, }, expectedResp: resp{ @@ -661,24 +720,28 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { Query: lblFoo.String(), StartTime: now.Add(-13 * time.Hour), EndTime: now.Add(-11 * time.Hour), + Status: StatusReceived, }, { UserID: testUserID, Query: lblFoo.String(), StartTime: now.Add(-10 * time.Hour), EndTime: now.Add(-8 * time.Hour), + Status: StatusReceived, }, { UserID: testUserID, Query: lblFoo.String(), StartTime: now.Add(-6 * time.Hour), EndTime: now.Add(-5 * time.Hour), + Status: StatusReceived, }, { UserID: testUserID, Query: lblFoo.String(), StartTime: now.Add(-2 * time.Hour), EndTime: now, + Status: StatusReceived, }, }, expectedResp: resp{ @@ -695,24 +758,28 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { Query: lblFoo.String(), StartTime: now.Add(-2 * time.Hour), EndTime: now, + Status: StatusReceived, }, { UserID: testUserID, Query: lblFoo.String(), StartTime: now.Add(-6 * time.Hour), EndTime: now.Add(-5 * time.Hour), + Status: StatusReceived, }, { UserID: testUserID, Query: lblFoo.String(), StartTime: now.Add(-10 * time.Hour), EndTime: now.Add(-8 * time.Hour), + Status: StatusReceived, }, { UserID: testUserID, Query: lblFoo.String(), StartTime: now.Add(-13 * time.Hour), EndTime: now.Add(-11 * time.Hour), + Status: StatusReceived, }, }, expectedResp: resp{ @@ -733,10 +800,108 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { End: now.Add(-8 * time.Hour), }, }, + expectedRequestsMarkedAsProcessed: []int{2, 3}, + }, + { + name: "Deletes beyond retention are marked as processed straight away without being batched for processing", + deletionMode: deletionmode.FilterAndDelete, + batchSize: 2, + deleteRequestsFromStore: []DeleteRequest{ + { + UserID: "different-user", + Query: lblFoo.String(), + StartTime: now.Add(-14 * 24 * time.Hour), + EndTime: now.Add(-10 * 24 * time.Hour), + Status: StatusReceived, + }, + { + UserID: testUserID, + Query: lblFoo.String(), + StartTime: now.Add(-14 * 24 * time.Hour), + EndTime: now.Add(-10 * 24 * time.Hour), + Status: StatusReceived, + }, + { + UserID: testUserID, + Query: lblFoo.String(), + StartTime: now.Add(-2 * time.Hour), + EndTime: now, + Status: StatusReceived, + }, + { + UserID: testUserID, + Query: lblFoo.String(), + StartTime: now.Add(-6 * time.Hour), + EndTime: now.Add(-5 * time.Hour), + Status: StatusReceived, + }, + { + UserID: testUserID, + Query: lblFoo.String(), + StartTime: now.Add(-10 * time.Hour), + EndTime: now.Add(-8 * time.Hour), + Status: StatusReceived, + }, + { + UserID: testUserID, + Query: lblFoo.String(), + StartTime: now.Add(-13 * time.Hour), + EndTime: now.Add(-11 * time.Hour), + Status: StatusReceived, + }, + }, + expectedResp: resp{ + isExpired: true, + expectedFilter: func(ts time.Time, s string, _ ...labels.Label) bool { + tsUnixNano := ts.UnixNano() + if (now.Add(-13*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.Add(-11*time.Hour).UnixNano()) || + (now.Add(-10*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.Add(-8*time.Hour).UnixNano()) { + return true + } + + return false + }, + }, + expectedDeletionRangeByUser: map[string]model.Interval{ + testUserID: { + Start: now.Add(-13 * time.Hour), + End: now.Add(-8 * time.Hour), + }, + }, + expectedRequestsMarkedAsProcessed: []int{0, 1, 4, 5}, + }, + { + name: "All deletes beyond retention", + deletionMode: deletionmode.FilterAndDelete, + batchSize: 2, + deleteRequestsFromStore: []DeleteRequest{ + { + UserID: "different-user", + Query: lblFoo.String(), + StartTime: now.Add(-14 * 24 * time.Hour), + EndTime: now.Add(-10 * 24 * time.Hour), + Status: StatusReceived, + }, + { + UserID: testUserID, + Query: lblFoo.String(), + StartTime: now.Add(-14 * 24 * time.Hour), + EndTime: now.Add(-10 * 24 * time.Hour), + Status: StatusReceived, + }, + }, + expectedResp: resp{ + isExpired: false, + }, + expectedRequestsMarkedAsProcessed: []int{0, 1}, }, } { t.Run(tc.name, func(t *testing.T) { - mgr := NewDeleteRequestsManager(&mockDeleteRequestsStore{deleteRequests: tc.deleteRequestsFromStore}, time.Hour, tc.batchSize, &fakeLimits{mode: tc.deletionMode.String()}, nil) + mockDeleteRequestsStore := &mockDeleteRequestsStore{deleteRequests: tc.deleteRequestsFromStore} + mgr := NewDeleteRequestsManager(mockDeleteRequestsStore, time.Hour, tc.batchSize, &fakeLimits{defaultLimit: limit{ + retentionPeriod: 7 * 24 * time.Hour, + deletionMode: tc.deletionMode.String(), + }}, nil) require.NoError(t, mgr.loadDeleteRequestsToProcess()) for _, deleteRequests := range mgr.deleteRequestsToProcess { @@ -749,28 +914,38 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { require.Equal(t, tc.expectedResp.isExpired, isExpired) if tc.expectedResp.expectedFilter == nil { require.Nil(t, filterFunc) - return - } - require.NotNil(t, filterFunc) + } else { + require.NotNil(t, filterFunc) - for start := chunkEntry.From; start <= chunkEntry.Through; start = start.Add(time.Minute) { - line := "foo bar" - if start.Time().Minute()%2 == 1 { - line = "fizz buzz" + for start := chunkEntry.From; start <= chunkEntry.Through; start = start.Add(time.Minute) { + line := "foo bar" + if start.Time().Minute()%2 == 1 { + line = "fizz buzz" + } + // mix of empty, ding=dong and ping=pong as structured metadata + var structuredMetadata []labels.Label + if start.Time().Minute()%3 == 0 { + structuredMetadata = []labels.Label{{Name: lblPing, Value: lblPong}} + } else if start.Time().Minute()%2 == 0 { + structuredMetadata = []labels.Label{{Name: "ting", Value: "tong"}} + } + require.Equal(t, tc.expectedResp.expectedFilter(start.Time(), line, structuredMetadata...), filterFunc(start.Time(), line, structuredMetadata...), "line", line, "time", start.Time(), "now", now.Time()) } - // mix of empty, ding=dong and ping=pong as structured metadata - var structuredMetadata []labels.Label - if start.Time().Minute()%3 == 0 { - structuredMetadata = []labels.Label{{Name: lblPing, Value: lblPong}} - } else if start.Time().Minute()%2 == 0 { - structuredMetadata = []labels.Label{{Name: "ting", Value: "tong"}} + + require.Equal(t, len(tc.expectedDeletionRangeByUser), len(mgr.deleteRequestsToProcess)) + for userID, dr := range tc.expectedDeletionRangeByUser { + require.Equal(t, dr, mgr.deleteRequestsToProcess[userID].requestsInterval) } - require.Equal(t, tc.expectedResp.expectedFilter(start.Time(), line, structuredMetadata...), filterFunc(start.Time(), line, structuredMetadata...), "line", line, "time", start.Time(), "now", now.Time()) } - require.Equal(t, len(tc.expectedDeletionRangeByUser), len(mgr.deleteRequestsToProcess)) - for userID, dr := range tc.expectedDeletionRangeByUser { - require.Equal(t, dr, mgr.deleteRequestsToProcess[userID].requestsInterval) + mgr.MarkPhaseFinished() + + processedRequests, err := mockDeleteRequestsStore.GetDeleteRequestsByStatus(context.Background(), StatusProcessed) + require.NoError(t, err) + require.Len(t, processedRequests, len(tc.expectedRequestsMarkedAsProcessed)) + + for i, reqIdx := range tc.expectedRequestsMarkedAsProcessed { + require.True(t, requestsAreEqual(tc.deleteRequestsFromStore[reqIdx], processedRequests[i])) } }) } @@ -782,18 +957,18 @@ func TestDeleteRequestsManager_IntervalMayHaveExpiredChunks(t *testing.T) { hasChunks bool user string }{ - {[]DeleteRequest{{Query: `0`, UserID: "test-user", StartTime: 0, EndTime: 100}}, true, "test-user"}, - {[]DeleteRequest{{Query: `1`, UserID: "test-user", StartTime: 200, EndTime: 400}}, true, "test-user"}, - {[]DeleteRequest{{Query: `2`, UserID: "test-user", StartTime: 400, EndTime: 500}}, true, "test-user"}, - {[]DeleteRequest{{Query: `3`, UserID: "test-user", StartTime: 500, EndTime: 700}}, true, "test-user"}, - {[]DeleteRequest{{Query: `3`, UserID: "other-user", StartTime: 500, EndTime: 700}}, false, "test-user"}, - {[]DeleteRequest{{Query: `4`, UserID: "test-user", StartTime: 700, EndTime: 900}}, true, "test-user"}, - {[]DeleteRequest{{Query: `4`, UserID: "", StartTime: 700, EndTime: 900}}, true, ""}, + {[]DeleteRequest{{Query: `0`, UserID: "test-user", StartTime: 0, EndTime: 100, Status: StatusReceived}}, true, "test-user"}, + {[]DeleteRequest{{Query: `1`, UserID: "test-user", StartTime: 200, EndTime: 400, Status: StatusReceived}}, true, "test-user"}, + {[]DeleteRequest{{Query: `2`, UserID: "test-user", StartTime: 400, EndTime: 500, Status: StatusReceived}}, true, "test-user"}, + {[]DeleteRequest{{Query: `3`, UserID: "test-user", StartTime: 500, EndTime: 700, Status: StatusReceived}}, true, "test-user"}, + {[]DeleteRequest{{Query: `3`, UserID: "other-user", StartTime: 500, EndTime: 700, Status: StatusReceived}}, false, "test-user"}, + {[]DeleteRequest{{Query: `4`, UserID: "test-user", StartTime: 700, EndTime: 900, Status: StatusReceived}}, true, "test-user"}, + {[]DeleteRequest{{Query: `4`, UserID: "", StartTime: 700, EndTime: 900, Status: StatusReceived}}, true, ""}, {[]DeleteRequest{}, false, ""}, } for _, tc := range tt { - mgr := NewDeleteRequestsManager(&mockDeleteRequestsStore{deleteRequests: tc.deleteRequestsFromStore}, time.Hour, 70, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}, nil) + mgr := NewDeleteRequestsManager(&mockDeleteRequestsStore{deleteRequests: tc.deleteRequestsFromStore}, time.Hour, 70, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, nil) require.NoError(t, mgr.loadDeleteRequestsToProcess()) interval := model.Interval{Start: 300, End: 600} @@ -823,8 +998,14 @@ type mockDeleteRequestsStore struct { genNumber string } -func (m *mockDeleteRequestsStore) GetDeleteRequestsByStatus(_ context.Context, _ DeleteRequestStatus) ([]DeleteRequest, error) { - return m.deleteRequests, nil +func (m *mockDeleteRequestsStore) GetDeleteRequestsByStatus(_ context.Context, status DeleteRequestStatus) ([]DeleteRequest, error) { + reqs := make([]DeleteRequest, 0, len(m.deleteRequests)) + for i := range m.deleteRequests { + if m.deleteRequests[i].Status == status { + reqs = append(reqs, m.deleteRequests[i]) + } + } + return reqs, nil } func (m *mockDeleteRequestsStore) AddDeleteRequestGroup(_ context.Context, reqs []DeleteRequest) ([]DeleteRequest, error) { @@ -854,3 +1035,24 @@ func (m *mockDeleteRequestsStore) GetAllDeleteRequestsForUser(_ context.Context, func (m *mockDeleteRequestsStore) GetCacheGenerationNumber(_ context.Context, _ string) (string, error) { return m.genNumber, m.getErr } + +func (m *mockDeleteRequestsStore) UpdateStatus(_ context.Context, req DeleteRequest, newStatus DeleteRequestStatus) error { + for i := range m.deleteRequests { + if requestsAreEqual(m.deleteRequests[i], req) { + m.deleteRequests[i].Status = newStatus + } + } + + return nil +} + +func requestsAreEqual(req1, req2 DeleteRequest) bool { + if req1.UserID == req2.UserID && + req1.Query == req2.Query && + req1.StartTime == req2.StartTime && + req1.EndTime == req2.EndTime { + return true + } + + return false +} diff --git a/pkg/compactor/deletion/grpc_request_handler_test.go b/pkg/compactor/deletion/grpc_request_handler_test.go index f0b2002e8d59..c7171e6ac340 100644 --- a/pkg/compactor/deletion/grpc_request_handler_test.go +++ b/pkg/compactor/deletion/grpc_request_handler_test.go @@ -74,7 +74,7 @@ func TestGRPCGetDeleteRequests(t *testing.T) { t.Run("it gets all the delete requests for the user", func(t *testing.T) { store := &mockDeleteRequestsStore{} store.getAllResult = []DeleteRequest{{RequestID: "test-request-1", Status: StatusReceived}, {RequestID: "test-request-2", Status: StatusReceived}} - h := NewGRPCRequestHandler(store, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) + h := NewGRPCRequestHandler(store, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}) grpcClient, closer := server(t, h) t.Cleanup(closer) @@ -96,7 +96,7 @@ func TestGRPCGetDeleteRequests(t *testing.T) { {RequestID: "test-request-2", CreatedAt: now.Add(time.Minute), StartTime: now.Add(30 * time.Minute), EndTime: now.Add(90 * time.Minute)}, {RequestID: "test-request-1", CreatedAt: now, StartTime: now.Add(time.Hour), EndTime: now.Add(2 * time.Hour)}, } - h := NewGRPCRequestHandler(store, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) + h := NewGRPCRequestHandler(store, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}) grpcClient, closer := server(t, h) t.Cleanup(closer) @@ -124,7 +124,7 @@ func TestGRPCGetDeleteRequests(t *testing.T) { {RequestID: "test-request-2", CreatedAt: now.Add(time.Minute), Status: StatusProcessed}, {RequestID: "test-request-3", CreatedAt: now.Add(2 * time.Minute), Status: StatusReceived}, } - h := NewGRPCRequestHandler(store, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) + h := NewGRPCRequestHandler(store, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}) grpcClient, closer := server(t, h) t.Cleanup(closer) @@ -145,7 +145,7 @@ func TestGRPCGetDeleteRequests(t *testing.T) { t.Run("error getting from store", func(t *testing.T) { store := &mockDeleteRequestsStore{} store.getAllErr = errors.New("something bad") - h := NewGRPCRequestHandler(store, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) + h := NewGRPCRequestHandler(store, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}) grpcClient, closer := server(t, h) t.Cleanup(closer) @@ -162,7 +162,7 @@ func TestGRPCGetDeleteRequests(t *testing.T) { t.Run("validation", func(t *testing.T) { t.Run("no org id", func(t *testing.T) { - h := NewGRPCRequestHandler(&mockDeleteRequestsStore{}, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) + h := NewGRPCRequestHandler(&mockDeleteRequestsStore{}, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}) grpcClient, closer := server(t, h) t.Cleanup(closer) @@ -178,7 +178,7 @@ func TestGRPCGetCacheGenNumbers(t *testing.T) { t.Run("get gen number", func(t *testing.T) { store := &mockDeleteRequestsStore{} store.genNumber = "123" - h := NewGRPCRequestHandler(store, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) + h := NewGRPCRequestHandler(store, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}) grpcClient, closer := server(t, h) t.Cleanup(closer) @@ -195,7 +195,7 @@ func TestGRPCGetCacheGenNumbers(t *testing.T) { t.Run("error getting from store", func(t *testing.T) { store := &mockDeleteRequestsStore{} store.getErr = errors.New("something bad") - h := NewGRPCRequestHandler(store, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) + h := NewGRPCRequestHandler(store, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}) grpcClient, closer := server(t, h) t.Cleanup(closer) @@ -212,7 +212,7 @@ func TestGRPCGetCacheGenNumbers(t *testing.T) { t.Run("validation", func(t *testing.T) { t.Run("no org id", func(t *testing.T) { - h := NewGRPCRequestHandler(&mockDeleteRequestsStore{}, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}) + h := NewGRPCRequestHandler(&mockDeleteRequestsStore{}, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}) grpcClient, closer := server(t, h) t.Cleanup(closer) diff --git a/pkg/compactor/deletion/request_handler.go b/pkg/compactor/deletion/request_handler.go index db5a22a83d54..458279d3b852 100644 --- a/pkg/compactor/deletion/request_handler.go +++ b/pkg/compactor/deletion/request_handler.go @@ -10,14 +10,13 @@ import ( "sort" "time" - "github.com/grafana/loki/pkg/util" - "github.com/go-kit/log/level" + "github.com/grafana/dskit/tenant" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" - "github.com/grafana/dskit/tenant" - + "github.com/grafana/loki/pkg/logql/syntax" + "github.com/grafana/loki/pkg/util" util_log "github.com/grafana/loki/pkg/util/log" ) @@ -49,7 +48,7 @@ func (dm *DeleteRequestHandler) AddDeleteRequestHandler(w http.ResponseWriter, r } params := r.URL.Query() - query, err := query(params) + query, parsedExpr, err := query(params) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return @@ -67,13 +66,19 @@ func (dm *DeleteRequestHandler) AddDeleteRequestHandler(w http.ResponseWriter, r return } - interval, err := dm.interval(params, startTime, endTime) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return + var shardByInterval time.Duration + if parsedExpr.HasFilter() { + var err error + shardByInterval, err = dm.interval(params, startTime, endTime) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + } else { + shardByInterval = endTime.Sub(startTime) + time.Minute } - deleteRequests := shardDeleteRequestsByInterval(startTime, endTime, query, userID, interval) + deleteRequests := shardDeleteRequestsByInterval(startTime, endTime, query, userID, shardByInterval) createdDeleteRequests, err := dm.deleteRequestsStore.AddDeleteRequestGroup(ctx, deleteRequests) if err != nil { level.Error(util_log.Logger).Log("msg", "error adding delete request to the store", "err", err) @@ -92,7 +97,7 @@ func (dm *DeleteRequestHandler) AddDeleteRequestHandler(w http.ResponseWriter, r "delete_request_id", createdDeleteRequests[0].RequestID, "user", userID, "query", query, - "interval", interval.String(), + "interval", shardByInterval.String(), ) dm.metrics.deleteRequestsReceivedTotal.WithLabelValues(userID).Inc() @@ -315,17 +320,18 @@ func (dm *DeleteRequestHandler) GetCacheGenerationNumberHandler(w http.ResponseW } } -func query(params url.Values) (string, error) { +func query(params url.Values) (string, syntax.LogSelectorExpr, error) { query := params.Get("query") if len(query) == 0 { - return "", errors.New("query not set") + return "", nil, errors.New("query not set") } - if _, err := parseDeletionQuery(query); err != nil { - return "", err + parsedExpr, err := parseDeletionQuery(query) + if err != nil { + return "", nil, err } - return query, nil + return query, parsedExpr, nil } func startTime(params url.Values) (model.Time, error) { diff --git a/pkg/compactor/deletion/request_handler_test.go b/pkg/compactor/deletion/request_handler_test.go index 1aaf0b582b36..58e2ffd13c32 100644 --- a/pkg/compactor/deletion/request_handler_test.go +++ b/pkg/compactor/deletion/request_handler_test.go @@ -10,14 +10,12 @@ import ( "testing" "time" + "github.com/grafana/dskit/user" "github.com/pkg/errors" - "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/util" - - "github.com/grafana/dskit/user" ) func TestAddDeleteRequestHandler(t *testing.T) { @@ -50,14 +48,14 @@ func TestAddDeleteRequestHandler(t *testing.T) { require.Equal(t, w.Code, http.StatusInternalServerError) }) - t.Run("it shards deletes based on a query param", func(t *testing.T) { + t.Run("it only shards deletes with line filter based on a query param", func(t *testing.T) { store := &mockDeleteRequestsStore{} h := NewDeleteRequestHandler(store, 0, nil) from := model.TimeFromUnix(model.Now().Add(-3 * time.Hour).Unix()) to := model.TimeFromUnix(from.Add(3 * time.Hour).Unix()) - req := buildRequest("org-id", `{foo="bar"}`, unixString(from), unixString(to)) + req := buildRequest("org-id", `{foo="bar"} |= "foo"`, unixString(from), unixString(to)) params := req.URL.Query() params.Set("max_interval", "1h") req.URL.RawQuery = params.Encode() @@ -87,7 +85,7 @@ func TestAddDeleteRequestHandler(t *testing.T) { from := model.TimeFromUnix(model.Now().Add(-3 * time.Hour).Unix()) to := model.TimeFromUnix(from.Add(3 * time.Hour).Unix()) - req := buildRequest("org-id", `{foo="bar"}`, unixString(from), unixString(to)) + req := buildRequest("org-id", `{foo="bar"} |= "foo"`, unixString(from), unixString(to)) w := httptest.NewRecorder() h.AddDeleteRequestHandler(w, req) @@ -107,6 +105,27 @@ func TestAddDeleteRequestHandler(t *testing.T) { } }) + t.Run("it does not shard deletes without line filter", func(t *testing.T) { + store := &mockDeleteRequestsStore{} + h := NewDeleteRequestHandler(store, 0, nil) + + from := model.TimeFromUnix(model.Now().Add(-3 * time.Hour).Unix()) + to := model.TimeFromUnix(from.Add(3 * time.Hour).Unix()) + + req := buildRequest("org-id", `{foo="bar"}`, unixString(from), unixString(to)) + params := req.URL.Query() + params.Set("max_interval", "1h") + req.URL.RawQuery = params.Encode() + + w := httptest.NewRecorder() + h.AddDeleteRequestHandler(w, req) + + require.Equal(t, w.Code, http.StatusNoContent) + require.Len(t, store.addReqs, 1) + require.Equal(t, from, store.addReqs[0].StartTime) + require.Equal(t, to, store.addReqs[0].EndTime) + }) + t.Run("it works with RFC3339", func(t *testing.T) { store := &mockDeleteRequestsStore{} h := NewDeleteRequestHandler(store, 0, nil) @@ -166,11 +185,11 @@ func TestAddDeleteRequestHandler(t *testing.T) { {"org-id", `{foo="bar"}`, "0000000000", "0000000000001", "", "invalid end time: require unix seconds or RFC3339 format\n"}, {"org-id", `{foo="bar"}`, "0000000000", fmt.Sprint(time.Now().Add(time.Hour).Unix())[:10], "", "deletes in the future are not allowed\n"}, {"org-id", `{foo="bar"}`, "0000000001", "0000000000", "", "start time can't be greater than end time\n"}, - {"org-id", `{foo="bar"}`, "0000000000", "0000000001", "not-a-duration", "invalid max_interval: valid time units are 's', 'm', 'h'\n"}, - {"org-id", `{foo="bar"}`, "0000000000", "0000000001", "1ms", "invalid max_interval: valid time units are 's', 'm', 'h'\n"}, - {"org-id", `{foo="bar"}`, "0000000000", "0000000001", "1h", "max_interval can't be greater than 1m0s\n"}, - {"org-id", `{foo="bar"}`, "0000000000", "0000000001", "30s", "max_interval can't be greater than the interval to be deleted (1s)\n"}, - {"org-id", `{foo="bar"}`, "0000000000", "0000000000", "", "difference between start time and end time must be at least one second\n"}, + {"org-id", `{foo="bar"} |= "foo"`, "0000000000", "0000000001", "not-a-duration", "invalid max_interval: valid time units are 's', 'm', 'h'\n"}, + {"org-id", `{foo="bar"} |= "foo"`, "0000000000", "0000000001", "1ms", "invalid max_interval: valid time units are 's', 'm', 'h'\n"}, + {"org-id", `{foo="bar"} |= "foo"`, "0000000000", "0000000001", "1h", "max_interval can't be greater than 1m0s\n"}, + {"org-id", `{foo="bar"} |= "foo"`, "0000000000", "0000000001", "30s", "max_interval can't be greater than the interval to be deleted (1s)\n"}, + {"org-id", `{foo="bar"} |= "foo"`, "0000000000", "0000000000", "", "difference between start time and end time must be at least one second\n"}, } { t.Run(strings.TrimSpace(tc.error), func(t *testing.T) { req := buildRequest(tc.orgID, tc.query, tc.startTime, tc.endTime) diff --git a/pkg/compactor/deletion/tenant_delete_requests_client.go b/pkg/compactor/deletion/tenant_delete_requests_client.go index 29b6a5692286..d3ba3a9905a3 100644 --- a/pkg/compactor/deletion/tenant_delete_requests_client.go +++ b/pkg/compactor/deletion/tenant_delete_requests_client.go @@ -2,12 +2,17 @@ package deletion import ( "context" + "time" + + "github.com/grafana/loki/pkg/validation" ) const deletionNotAvailableMsg = "deletion is not available for this tenant" type Limits interface { DeletionMode(userID string) string + RetentionPeriod(userID string) time.Duration + StreamRetention(userID string) []validation.StreamRetention } type perTenantDeleteRequestsClient struct { diff --git a/pkg/compactor/deletion/tenant_delete_requests_client_test.go b/pkg/compactor/deletion/tenant_delete_requests_client_test.go index ba063f713b84..20e97d463f4f 100644 --- a/pkg/compactor/deletion/tenant_delete_requests_client_test.go +++ b/pkg/compactor/deletion/tenant_delete_requests_client_test.go @@ -3,6 +3,7 @@ package deletion import ( "context" "testing" + "time" "github.com/stretchr/testify/require" ) @@ -13,7 +14,7 @@ func TestTenantDeleteRequestsClient(t *testing.T) { RequestID: "test-request", }}, } - perTenantClient := NewPerTenantDeleteRequestsClient(fakeClient, limits) + perTenantClient := NewPerTenantDeleteRequestsClient(fakeClient, defaultLimits) t.Run("tenant enabled", func(t *testing.T) { reqs, err := perTenantClient.GetAllDeleteRequestsForUser(context.Background(), "1") @@ -39,10 +40,11 @@ func (c *fakeRequestsClient) GetAllDeleteRequestsForUser(_ context.Context, _ st } var ( - limits = &fakeLimits{ - limits: map[string]string{ - "1": "filter-only", - "2": "disabled", + defaultLimits = &fakeLimits{ + tenantLimits: map[string]limit{ + "1": {deletionMode: "filter-only"}, + "2": {deletionMode: "disabled"}, + "3": {retentionPeriod: time.Hour}, }, } ) diff --git a/pkg/compactor/deletion/tenant_request_handler_test.go b/pkg/compactor/deletion/tenant_request_handler_test.go index e979a5c8c4d0..c4a18543ccef 100644 --- a/pkg/compactor/deletion/tenant_request_handler_test.go +++ b/pkg/compactor/deletion/tenant_request_handler_test.go @@ -4,16 +4,19 @@ import ( "net/http" "net/http/httptest" "testing" + "time" "github.com/grafana/dskit/user" "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/validation" ) func TestDeleteRequestHandlerDeletionMiddleware(t *testing.T) { fl := &fakeLimits{ - limits: map[string]string{ - "1": "filter-only", - "2": "disabled", + tenantLimits: map[string]limit{ + "1": {deletionMode: "filter-only"}, + "2": {deletionMode: "disabled"}, }, } @@ -47,15 +50,34 @@ func TestDeleteRequestHandlerDeletionMiddleware(t *testing.T) { require.Equal(t, http.StatusBadRequest, res.Result().StatusCode) } +type limit struct { + deletionMode string + retentionPeriod time.Duration + streamRetention []validation.StreamRetention +} + type fakeLimits struct { - limits map[string]string - mode string + tenantLimits map[string]limit + defaultLimit limit } -func (f *fakeLimits) DeletionMode(userID string) string { - if f.mode != "" { - return f.mode +func (f *fakeLimits) getLimitForUser(userID string) limit { + limit := f.defaultLimit + if override, ok := f.tenantLimits[userID]; ok { + limit = override } - return f.limits[userID] + return limit +} + +func (f *fakeLimits) DeletionMode(userID string) string { + return f.getLimitForUser(userID).deletionMode +} + +func (f *fakeLimits) RetentionPeriod(userID string) time.Duration { + return f.getLimitForUser(userID).retentionPeriod +} + +func (f *fakeLimits) StreamRetention(userID string) []validation.StreamRetention { + return f.getLimitForUser(userID).streamRetention }