Skip to content

Commit

Permalink
chore: delete request processing improvements (#12259)
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeepsukhani authored Apr 1, 2024
1 parent a509871 commit 0b7ff48
Show file tree
Hide file tree
Showing 10 changed files with 402 additions and 108 deletions.
6 changes: 3 additions & 3 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2487,9 +2487,9 @@ The `compactor` block configures the compactor component, which compacts index s
# CLI flag: -compactor.delete-request-cancel-period
[delete_request_cancel_period: <duration> | default = 24h]
# Constrain the size of any single delete request. When a delete request >
# delete_max_interval is input, the request is sharded into smaller requests of
# no more than delete_max_interval
# Constrain the size of any single delete request with line filters. When a
# delete request > delete_max_interval is input, the request is sharded into
# smaller requests of no more than delete_max_interval
# CLI flag: -compactor.delete-max-interval
[delete_max_interval: <duration> | default = 24h]
Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.DeleteRequestStoreKeyPrefix, "compactor.delete-request-store.key-prefix", "index/", "Path prefix for storing delete requests.")
f.IntVar(&cfg.DeleteBatchSize, "compactor.delete-batch-size", 70, "The max number of delete requests to run per compaction cycle.")
f.DurationVar(&cfg.DeleteRequestCancelPeriod, "compactor.delete-request-cancel-period", 24*time.Hour, "Allow cancellation of delete request until duration after they are created. Data would be deleted only after delete requests have been older than this duration. Ideally this should be set to at least 24h.")
f.DurationVar(&cfg.DeleteMaxInterval, "compactor.delete-max-interval", 24*time.Hour, "Constrain the size of any single delete request. When a delete request > delete_max_interval is input, the request is sharded into smaller requests of no more than delete_max_interval")
f.DurationVar(&cfg.DeleteMaxInterval, "compactor.delete-max-interval", 24*time.Hour, "Constrain the size of any single delete request with line filters. When a delete request > delete_max_interval is input, the request is sharded into smaller requests of no more than delete_max_interval")
f.DurationVar(&cfg.RetentionTableTimeout, "compactor.retention-table-timeout", 0, "The maximum amount of time to spend running retention and deletion on any given table in the index.")
f.IntVar(&cfg.MaxCompactionParallelism, "compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.")
f.IntVar(&cfg.UploadParallelism, "compactor.upload-parallelism", 10, "Number of upload/remove operations to execute in parallel when finalizing a compaction. NOTE: This setting is per compaction operation, which can be executed in parallel. The upper bound on the number of concurrent uploads is upload_parallelism * max_compaction_parallelism.")
Expand Down
80 changes: 59 additions & 21 deletions pkg/compactor/deletion/delete_requests_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,25 @@ func (d *DeleteRequestsManager) loadDeleteRequestsToProcess() error {
return err
}

reqCount := 0
for i := range deleteRequests {
deleteRequest := deleteRequests[i]
if i >= d.batchSize {
logBatchTruncation(i, len(deleteRequests))
maxRetentionInterval := getMaxRetentionInterval(deleteRequest.UserID, d.limits)
// retention interval 0 means retain the data forever
if maxRetentionInterval != 0 {
oldestRetainedLogTimestamp := model.Now().Add(-maxRetentionInterval)
if deleteRequest.StartTime.Before(oldestRetainedLogTimestamp) && deleteRequest.EndTime.Before(oldestRetainedLogTimestamp) {
level.Info(util_log.Logger).Log(
"msg", "Marking delete request with interval beyond retention period as processed",
"delete_request_id", deleteRequest.RequestID,
"user", deleteRequest.UserID,
)
d.markRequestAsProcessed(deleteRequest)
continue
}
}
if reqCount >= d.batchSize {
logBatchTruncation(reqCount, len(deleteRequests))
break
}

Expand All @@ -149,6 +164,7 @@ func (d *DeleteRequestsManager) loadDeleteRequestsToProcess() error {
if deleteRequest.EndTime > ur.requestsInterval.End {
ur.requestsInterval.End = deleteRequest.EndTime
}
reqCount++
}

return nil
Expand Down Expand Up @@ -305,6 +321,28 @@ func (d *DeleteRequestsManager) MarkPhaseTimedOut() {
d.deleteRequestsToProcess = map[string]*userDeleteRequests{}
}

func (d *DeleteRequestsManager) markRequestAsProcessed(deleteRequest DeleteRequest) {
if err := d.deleteRequestsStore.UpdateStatus(context.Background(), deleteRequest, StatusProcessed); err != nil {
level.Error(util_log.Logger).Log(
"msg", "failed to mark delete request for user as processed",
"delete_request_id", deleteRequest.RequestID,
"sequence_num", deleteRequest.SequenceNum,
"user", deleteRequest.UserID,
"err", err,
"deleted_lines", deleteRequest.DeletedLines,
)
} else {
level.Info(util_log.Logger).Log(
"msg", "delete request for user marked as processed",
"delete_request_id", deleteRequest.RequestID,
"sequence_num", deleteRequest.SequenceNum,
"user", deleteRequest.UserID,
"deleted_lines", deleteRequest.DeletedLines,
)
d.metrics.deleteRequestsProcessedTotal.WithLabelValues(deleteRequest.UserID).Inc()
}
}

func (d *DeleteRequestsManager) MarkPhaseFinished() {
d.deleteRequestsToProcessMtx.Lock()
defer d.deleteRequestsToProcessMtx.Unlock()
Expand All @@ -315,25 +353,7 @@ func (d *DeleteRequestsManager) MarkPhaseFinished() {
}

for _, deleteRequest := range userDeleteRequests.requests {
if err := d.deleteRequestsStore.UpdateStatus(context.Background(), *deleteRequest, StatusProcessed); err != nil {
level.Error(util_log.Logger).Log(
"msg", "failed to mark delete request for user as processed",
"delete_request_id", deleteRequest.RequestID,
"sequence_num", deleteRequest.SequenceNum,
"user", deleteRequest.UserID,
"err", err,
"deleted_lines", deleteRequest.DeletedLines,
)
} else {
level.Info(util_log.Logger).Log(
"msg", "delete request for user marked as processed",
"delete_request_id", deleteRequest.RequestID,
"sequence_num", deleteRequest.SequenceNum,
"user", deleteRequest.UserID,
"deleted_lines", deleteRequest.DeletedLines,
)
}
d.metrics.deleteRequestsProcessedTotal.WithLabelValues(deleteRequest.UserID).Inc()
d.markRequestAsProcessed(*deleteRequest)
}
}
}
Expand All @@ -355,3 +375,21 @@ func (d *DeleteRequestsManager) IntervalMayHaveExpiredChunks(_ model.Interval, u
func (d *DeleteRequestsManager) DropFromIndex(_ retention.ChunkEntry, _ model.Time, _ model.Time) bool {
return false
}

func getMaxRetentionInterval(userID string, limits Limits) time.Duration {
maxRetention := model.Duration(limits.RetentionPeriod(userID))
if maxRetention == 0 {
return 0
}

for _, streamRetention := range limits.StreamRetention(userID) {
if streamRetention.Period == 0 {
return 0
}
if streamRetention.Period > maxRetention {
maxRetention = streamRetention.Period
}
}

return time.Duration(maxRetention)
}
Loading

0 comments on commit 0b7ff48

Please sign in to comment.