From dcbe67a89f6a188c11d115b20fcc5e72607b5fce Mon Sep 17 00:00:00 2001 From: Miklos Szots Date: Thu, 3 Aug 2023 19:12:03 +0200 Subject: [PATCH] Move garbage collector logic to index-management (#3708) --- quickwit/Cargo.lock | 4 +- quickwit/quickwit-index-management/Cargo.toml | 3 +- .../src/garbage_collection.rs | 66 +++++++++---------- .../quickwit-index-management/src/index.rs | 13 ++-- quickwit/quickwit-index-management/src/lib.rs | 2 + quickwit/quickwit-janitor/Cargo.toml | 1 + .../src/actors/garbage_collector.rs | 5 +- quickwit/quickwit-janitor/src/lib.rs | 5 -- 8 files changed, 48 insertions(+), 51 deletions(-) rename quickwit/{quickwit-janitor => quickwit-index-management}/src/garbage_collection.rs (94%) diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 6317057ef36..1fad4d9f25c 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -5090,13 +5090,13 @@ dependencies = [ "byte-unit", "futures", "futures-util", + "itertools 0.11.0", "mockall", "quickwit-common", "quickwit-config", "quickwit-directories", "quickwit-doc-mapper", "quickwit-indexing", - "quickwit-janitor", "quickwit-metastore", "quickwit-proto", "quickwit-storage", @@ -5107,6 +5107,7 @@ dependencies = [ "tantivy", "tempfile", "thiserror", + "time 0.3.23", "tokio", "tokio-stream", "tokio-util", @@ -5290,6 +5291,7 @@ dependencies = [ "quickwit-config", "quickwit-directories", "quickwit-doc-mapper", + "quickwit-index-management", "quickwit-indexing", "quickwit-metastore", "quickwit-proto", diff --git a/quickwit/quickwit-index-management/Cargo.toml b/quickwit/quickwit-index-management/Cargo.toml index c0d9cf6cd23..7a311d27212 100644 --- a/quickwit/quickwit-index-management/Cargo.toml +++ b/quickwit/quickwit-index-management/Cargo.toml @@ -15,11 +15,13 @@ async-trait = { workspace = true } byte-unit = { workspace = true } futures = { workspace = true } futures-util = { workspace = true } +itertools = { workspace = true } rand = { workspace = true } serde = { workspace = true } serde_json = "1.0" tantivy = { workspace = true } tempfile = { workspace = true } +time = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true } @@ -31,7 +33,6 @@ quickwit-config = { workspace = true } quickwit-directories = { workspace = true } quickwit-doc-mapper = { workspace = true } quickwit-indexing = { workspace = true } -quickwit-janitor = { workspace = true } quickwit-metastore = { workspace = true } quickwit-proto = { workspace = true } quickwit-storage = { workspace = true } diff --git a/quickwit/quickwit-janitor/src/garbage_collection.rs b/quickwit/quickwit-index-management/src/garbage_collection.rs similarity index 94% rename from quickwit/quickwit-janitor/src/garbage_collection.rs rename to quickwit/quickwit-index-management/src/garbage_collection.rs index 3372de1d5fe..d82af7905fa 100644 --- a/quickwit/quickwit-janitor/src/garbage_collection.rs +++ b/quickwit/quickwit-index-management/src/garbage_collection.rs @@ -23,8 +23,7 @@ use std::sync::Arc; use std::time::Duration; use futures::Future; -use quickwit_actors::ActorContext; -use quickwit_common::PrettySample; +use quickwit_common::{PrettySample, Progress}; use quickwit_metastore::{ ListSplitsQuery, Metastore, MetastoreError, SplitInfo, SplitMetadata, SplitState, }; @@ -34,8 +33,6 @@ use thiserror::Error; use time::OffsetDateTime; use tracing::{error, instrument}; -use crate::actors::GarbageCollector; - /// The maximum number of splits that the GC should delete per attempt. const DELETE_SPLITS_BATCH_SIZE: usize = 1000; @@ -51,17 +48,14 @@ pub struct DeleteSplitsError { metastore_failures: Vec, } -async fn protect_future( - ctx_opt: Option<&ActorContext>, - future: Fut, -) -> T -where - Fut: Future, -{ - if let Some(ctx) = ctx_opt { - ctx.protect_future(future).await - } else { - future.await +async fn protect_future(progress: Option<&Progress>, future: Fut) -> T +where Fut: Future { + match progress { + None => future.await, + Some(progress) => { + let _guard = progress.protect_zone(); + future.await + } } } @@ -83,7 +77,7 @@ pub struct SplitRemovalInfo { /// * `deletion_grace_period` - Threshold period after which a marked as deleted split can be /// safely deleted. /// * `dry_run` - Should this only return a list of affected files without performing deletion. -/// * `ctx_opt` - A context for reporting progress (only useful within quickwit actor). +/// * `progress` - For reporting progress (useful when called from within a quickwit actor). pub async fn run_garbage_collect( index_uid: IndexUid, storage: Arc, @@ -91,7 +85,7 @@ pub async fn run_garbage_collect( staged_grace_period: Duration, deletion_grace_period: Duration, dry_run: bool, - ctx_opt: Option<&ActorContext>, + progress_opt: Option<&Progress>, ) -> anyhow::Result { // Select staged splits with staging timestamp older than grace period timestamp. let grace_period_timestamp = @@ -102,7 +96,7 @@ pub async fn run_garbage_collect( .with_update_timestamp_lte(grace_period_timestamp); let deletable_staged_splits: Vec = - protect_future(ctx_opt, metastore.list_splits(query)) + protect_future(progress_opt, metastore.list_splits(query)) .await? .into_iter() .map(|meta| meta.split_metadata) @@ -112,11 +106,12 @@ pub async fn run_garbage_collect( let query = ListSplitsQuery::for_index(index_uid.clone()) .with_split_state(SplitState::MarkedForDeletion); - let mut splits_marked_for_deletion = protect_future(ctx_opt, metastore.list_splits(query)) - .await? - .into_iter() - .map(|split| split.split_metadata) - .collect::>(); + let mut splits_marked_for_deletion = + protect_future(progress_opt, metastore.list_splits(query)) + .await? + .into_iter() + .map(|split| split.split_metadata) + .collect::>(); splits_marked_for_deletion.extend(deletable_staged_splits); let candidate_entries: Vec = splits_marked_for_deletion @@ -136,7 +131,7 @@ pub async fn run_garbage_collect( .collect(); if !split_ids.is_empty() { protect_future( - ctx_opt, + progress_opt, metastore.mark_splits_for_deletion(index_uid.clone(), &split_ids), ) .await?; @@ -152,14 +147,13 @@ pub async fn run_garbage_collect( updated_before_timestamp, storage, metastore, - ctx_opt, + progress_opt, ) .await; Ok(deleted_splits) } - -#[instrument(skip(storage, metastore, ctx_opt))] +#[instrument(skip(storage, metastore, progress_opt))] /// Removes any splits marked for deletion which haven't been /// updated after `updated_before_timestamp` in batches of 1000 splits. /// @@ -170,7 +164,7 @@ async fn delete_splits_marked_for_deletion( updated_before_timestamp: i64, storage: Arc, metastore: Arc, - ctx_opt: Option<&ActorContext>, + progress_opt: Option<&Progress>, ) -> SplitRemovalInfo { let mut removed_splits = Vec::new(); let mut failed_splits = Vec::new(); @@ -181,7 +175,7 @@ async fn delete_splits_marked_for_deletion( .with_update_timestamp_lte(updated_before_timestamp) .with_limit(DELETE_SPLITS_BATCH_SIZE); - let list_splits_result = protect_future(ctx_opt, metastore.list_splits(query)).await; + let list_splits_result = protect_future(progress_opt, metastore.list_splits(query)).await; let splits_to_delete = match list_splits_result { Ok(splits) => splits, @@ -205,7 +199,7 @@ async fn delete_splits_marked_for_deletion( storage.clone(), metastore.clone(), splits_to_delete, - ctx_opt, + progress_opt, ) .await; @@ -234,13 +228,13 @@ async fn delete_splits_marked_for_deletion( /// * `storage - The storage managing the target index. /// * `metastore` - The metastore managing the target index. /// * `splits` - The list of splits to delete. -/// * `ctx_opt` - A context for reporting progress (only useful within quickwit actor). +/// * `progress` - For reporting progress (useful when called from within a quickwit actor). pub async fn delete_splits_from_storage_and_metastore( index_uid: IndexUid, storage: Arc, metastore: Arc, splits: Vec, - ctx_opt: Option<&ActorContext>, + progress_opt: Option<&Progress>, ) -> anyhow::Result, DeleteSplitsError> { let mut split_infos: HashMap = HashMap::with_capacity(splits.len()); @@ -252,10 +246,10 @@ pub async fn delete_splits_from_storage_and_metastore( .keys() .map(|split_path_buf| split_path_buf.as_path()) .collect::>(); - let delete_result = protect_future(ctx_opt, storage.bulk_delete(&split_paths)).await; + let delete_result = protect_future(progress_opt, storage.bulk_delete(&split_paths)).await; - if let Some(ctx) = ctx_opt { - ctx.record_progress(); + if let Some(progress) = progress_opt { + progress.record_progress(); } let mut successes = Vec::with_capacity(split_infos.len()); let mut storage_error: Option = None; @@ -292,7 +286,7 @@ pub async fn delete_splits_from_storage_and_metastore( .map(|split_info| split_info.split_id.as_str()) .collect(); let metastore_result = protect_future( - ctx_opt, + progress_opt, metastore.delete_splits(index_uid.clone(), &split_ids), ) .await; diff --git a/quickwit/quickwit-index-management/src/index.rs b/quickwit/quickwit-index-management/src/index.rs index 8047ef40981..4aaf81ed759 100644 --- a/quickwit/quickwit-index-management/src/index.rs +++ b/quickwit/quickwit-index-management/src/index.rs @@ -24,10 +24,6 @@ use std::time::Duration; use quickwit_common::fs::{empty_dir, get_cache_directory_path}; use quickwit_config::{validate_identifier, IndexConfig, SourceConfig}; use quickwit_indexing::check_source_connectivity; -use quickwit_janitor::{ - delete_splits_from_storage_and_metastore, run_garbage_collect, DeleteSplitsError, - SplitRemovalInfo, -}; use quickwit_metastore::{ IndexMetadata, ListSplitsQuery, Metastore, MetastoreError, SplitInfo, SplitMetadata, SplitState, }; @@ -36,6 +32,11 @@ use quickwit_storage::{StorageResolver, StorageResolverError}; use thiserror::Error; use tracing::{error, info}; +use crate::garbage_collection::{ + delete_splits_from_storage_and_metastore, run_garbage_collect, DeleteSplitsError, + SplitRemovalInfo, +}; + #[derive(Error, Debug)] pub enum IndexServiceError { #[error("Failed to resolve the storage `{0}`.")] @@ -351,8 +352,10 @@ pub async fn validate_storage_uri( #[cfg(test)] mod tests { + use quickwit_common::uri::Uri; - use quickwit_metastore::metastore_for_test; + use quickwit_config::IndexConfig; + use quickwit_metastore::{metastore_for_test, SplitMetadata}; use quickwit_storage::PutPayload; use super::*; diff --git a/quickwit/quickwit-index-management/src/lib.rs b/quickwit/quickwit-index-management/src/lib.rs index b22e7abecfb..3177017b8a7 100644 --- a/quickwit/quickwit-index-management/src/lib.rs +++ b/quickwit/quickwit-index-management/src/lib.rs @@ -17,6 +17,8 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +mod garbage_collection; mod index; +pub use garbage_collection::run_garbage_collect; pub use index::{clear_cache_directory, validate_storage_uri, IndexService, IndexServiceError}; diff --git a/quickwit/quickwit-janitor/Cargo.toml b/quickwit/quickwit-janitor/Cargo.toml index 42e7bcf9f69..435820583ef 100644 --- a/quickwit/quickwit-janitor/Cargo.toml +++ b/quickwit/quickwit-janitor/Cargo.toml @@ -30,6 +30,7 @@ utoipa = { workspace = true } quickwit-actors = { workspace = true } quickwit-cluster = { workspace = true } quickwit-common = { workspace = true } +quickwit-index-management = { workspace = true } quickwit-config = { workspace = true } quickwit-directories = { workspace = true } quickwit-doc-mapper = { workspace = true } diff --git a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs index ddfe2b79504..f67468abaf1 100644 --- a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs +++ b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs @@ -26,13 +26,12 @@ use async_trait::async_trait; use futures::{stream, StreamExt}; use itertools::Itertools; use quickwit_actors::{Actor, ActorContext, Handler}; +use quickwit_index_management::run_garbage_collect; use quickwit_metastore::Metastore; use quickwit_storage::StorageResolver; use serde::Serialize; use tracing::{error, info}; -use crate::garbage_collection::run_garbage_collect; - const RUN_INTERVAL: Duration = Duration::from_secs(10 * 60); // 10 minutes /// Staged files needs to be deleted if there was a failure. @@ -122,7 +121,7 @@ impl GarbageCollector { STAGED_GRACE_PERIOD, DELETION_GRACE_PERIOD, false, - Some(ctx), + Some(ctx.progress()), ).await; Some((index_uid, gc_res)) }}).buffer_unordered(MAX_CONCURRENT_GC_TASKS); diff --git a/quickwit/quickwit-janitor/src/lib.rs b/quickwit/quickwit-janitor/src/lib.rs index ed62101a01b..ad0d0c3d43c 100644 --- a/quickwit/quickwit-janitor/src/lib.rs +++ b/quickwit/quickwit-janitor/src/lib.rs @@ -30,17 +30,12 @@ use tracing::info; pub mod actors; pub mod error; -mod garbage_collection; mod janitor_service; mod metrics; mod retention_policy_execution; pub use janitor_service::JanitorService; -pub use self::garbage_collection::{ - delete_splits_from_storage_and_metastore, run_garbage_collect, DeleteSplitsError, - SplitRemovalInfo, -}; use crate::actors::{DeleteTaskService, GarbageCollector, RetentionPolicyExecutor}; #[derive(utoipa::OpenApi)]