Skip to content

Commit

Permalink
Move garbage collector logic to index-management (#3708)
Browse files Browse the repository at this point in the history
  • Loading branch information
smiklos authored Aug 3, 2023
1 parent 06ac446 commit dcbe67a
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 51 deletions.
4 changes: 3 additions & 1 deletion quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion quickwit/quickwit-index-management/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ async-trait = { workspace = true }
byte-unit = { workspace = true }
futures = { workspace = true }
futures-util = { workspace = true }
itertools = { workspace = true }
rand = { workspace = true }
serde = { workspace = true }
serde_json = "1.0"
tantivy = { workspace = true }
tempfile = { workspace = true }
time = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
Expand All @@ -31,7 +33,6 @@ quickwit-config = { workspace = true }
quickwit-directories = { workspace = true }
quickwit-doc-mapper = { workspace = true }
quickwit-indexing = { workspace = true }
quickwit-janitor = { workspace = true }
quickwit-metastore = { workspace = true }
quickwit-proto = { workspace = true }
quickwit-storage = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ use std::sync::Arc;
use std::time::Duration;

use futures::Future;
use quickwit_actors::ActorContext;
use quickwit_common::PrettySample;
use quickwit_common::{PrettySample, Progress};
use quickwit_metastore::{
ListSplitsQuery, Metastore, MetastoreError, SplitInfo, SplitMetadata, SplitState,
};
Expand All @@ -34,8 +33,6 @@ use thiserror::Error;
use time::OffsetDateTime;
use tracing::{error, instrument};

use crate::actors::GarbageCollector;

/// The maximum number of splits that the GC should delete per attempt.
const DELETE_SPLITS_BATCH_SIZE: usize = 1000;

Expand All @@ -51,17 +48,14 @@ pub struct DeleteSplitsError {
metastore_failures: Vec<SplitInfo>,
}

async fn protect_future<Fut, T>(
ctx_opt: Option<&ActorContext<GarbageCollector>>,
future: Fut,
) -> T
where
Fut: Future<Output = T>,
{
if let Some(ctx) = ctx_opt {
ctx.protect_future(future).await
} else {
future.await
async fn protect_future<Fut, T>(progress: Option<&Progress>, future: Fut) -> T
where Fut: Future<Output = T> {
match progress {
None => future.await,
Some(progress) => {
let _guard = progress.protect_zone();
future.await
}
}
}

Expand All @@ -83,15 +77,15 @@ pub struct SplitRemovalInfo {
/// * `deletion_grace_period` - Threshold period after which a marked as deleted split can be
/// safely deleted.
/// * `dry_run` - Should this only return a list of affected files without performing deletion.
/// * `ctx_opt` - A context for reporting progress (only useful within quickwit actor).
/// * `progress` - For reporting progress (useful when called from within a quickwit actor).
pub async fn run_garbage_collect(
index_uid: IndexUid,
storage: Arc<dyn Storage>,
metastore: Arc<dyn Metastore>,
staged_grace_period: Duration,
deletion_grace_period: Duration,
dry_run: bool,
ctx_opt: Option<&ActorContext<GarbageCollector>>,
progress_opt: Option<&Progress>,
) -> anyhow::Result<SplitRemovalInfo> {
// Select staged splits with staging timestamp older than grace period timestamp.
let grace_period_timestamp =
Expand All @@ -102,7 +96,7 @@ pub async fn run_garbage_collect(
.with_update_timestamp_lte(grace_period_timestamp);

let deletable_staged_splits: Vec<SplitMetadata> =
protect_future(ctx_opt, metastore.list_splits(query))
protect_future(progress_opt, metastore.list_splits(query))
.await?
.into_iter()
.map(|meta| meta.split_metadata)
Expand All @@ -112,11 +106,12 @@ pub async fn run_garbage_collect(
let query = ListSplitsQuery::for_index(index_uid.clone())
.with_split_state(SplitState::MarkedForDeletion);

let mut splits_marked_for_deletion = protect_future(ctx_opt, metastore.list_splits(query))
.await?
.into_iter()
.map(|split| split.split_metadata)
.collect::<Vec<_>>();
let mut splits_marked_for_deletion =
protect_future(progress_opt, metastore.list_splits(query))
.await?
.into_iter()
.map(|split| split.split_metadata)
.collect::<Vec<_>>();
splits_marked_for_deletion.extend(deletable_staged_splits);

let candidate_entries: Vec<SplitInfo> = splits_marked_for_deletion
Expand All @@ -136,7 +131,7 @@ pub async fn run_garbage_collect(
.collect();
if !split_ids.is_empty() {
protect_future(
ctx_opt,
progress_opt,
metastore.mark_splits_for_deletion(index_uid.clone(), &split_ids),
)
.await?;
Expand All @@ -152,14 +147,13 @@ pub async fn run_garbage_collect(
updated_before_timestamp,
storage,
metastore,
ctx_opt,
progress_opt,
)
.await;

Ok(deleted_splits)
}

#[instrument(skip(storage, metastore, ctx_opt))]
#[instrument(skip(storage, metastore, progress_opt))]
/// Removes any splits marked for deletion which haven't been
/// updated after `updated_before_timestamp` in batches of 1000 splits.
///
Expand All @@ -170,7 +164,7 @@ async fn delete_splits_marked_for_deletion(
updated_before_timestamp: i64,
storage: Arc<dyn Storage>,
metastore: Arc<dyn Metastore>,
ctx_opt: Option<&ActorContext<GarbageCollector>>,
progress_opt: Option<&Progress>,
) -> SplitRemovalInfo {
let mut removed_splits = Vec::new();
let mut failed_splits = Vec::new();
Expand All @@ -181,7 +175,7 @@ async fn delete_splits_marked_for_deletion(
.with_update_timestamp_lte(updated_before_timestamp)
.with_limit(DELETE_SPLITS_BATCH_SIZE);

let list_splits_result = protect_future(ctx_opt, metastore.list_splits(query)).await;
let list_splits_result = protect_future(progress_opt, metastore.list_splits(query)).await;

let splits_to_delete = match list_splits_result {
Ok(splits) => splits,
Expand All @@ -205,7 +199,7 @@ async fn delete_splits_marked_for_deletion(
storage.clone(),
metastore.clone(),
splits_to_delete,
ctx_opt,
progress_opt,
)
.await;

Expand Down Expand Up @@ -234,13 +228,13 @@ async fn delete_splits_marked_for_deletion(
/// * `storage - The storage managing the target index.
/// * `metastore` - The metastore managing the target index.
/// * `splits` - The list of splits to delete.
/// * `ctx_opt` - A context for reporting progress (only useful within quickwit actor).
/// * `progress` - For reporting progress (useful when called from within a quickwit actor).
pub async fn delete_splits_from_storage_and_metastore(
index_uid: IndexUid,
storage: Arc<dyn Storage>,
metastore: Arc<dyn Metastore>,
splits: Vec<SplitMetadata>,
ctx_opt: Option<&ActorContext<GarbageCollector>>,
progress_opt: Option<&Progress>,
) -> anyhow::Result<Vec<SplitInfo>, DeleteSplitsError> {
let mut split_infos: HashMap<PathBuf, SplitInfo> = HashMap::with_capacity(splits.len());

Expand All @@ -252,10 +246,10 @@ pub async fn delete_splits_from_storage_and_metastore(
.keys()
.map(|split_path_buf| split_path_buf.as_path())
.collect::<Vec<&Path>>();
let delete_result = protect_future(ctx_opt, storage.bulk_delete(&split_paths)).await;
let delete_result = protect_future(progress_opt, storage.bulk_delete(&split_paths)).await;

if let Some(ctx) = ctx_opt {
ctx.record_progress();
if let Some(progress) = progress_opt {
progress.record_progress();
}
let mut successes = Vec::with_capacity(split_infos.len());
let mut storage_error: Option<BulkDeleteError> = None;
Expand Down Expand Up @@ -292,7 +286,7 @@ pub async fn delete_splits_from_storage_and_metastore(
.map(|split_info| split_info.split_id.as_str())
.collect();
let metastore_result = protect_future(
ctx_opt,
progress_opt,
metastore.delete_splits(index_uid.clone(), &split_ids),
)
.await;
Expand Down
13 changes: 8 additions & 5 deletions quickwit/quickwit-index-management/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ use std::time::Duration;
use quickwit_common::fs::{empty_dir, get_cache_directory_path};
use quickwit_config::{validate_identifier, IndexConfig, SourceConfig};
use quickwit_indexing::check_source_connectivity;
use quickwit_janitor::{
delete_splits_from_storage_and_metastore, run_garbage_collect, DeleteSplitsError,
SplitRemovalInfo,
};
use quickwit_metastore::{
IndexMetadata, ListSplitsQuery, Metastore, MetastoreError, SplitInfo, SplitMetadata, SplitState,
};
Expand All @@ -36,6 +32,11 @@ use quickwit_storage::{StorageResolver, StorageResolverError};
use thiserror::Error;
use tracing::{error, info};

use crate::garbage_collection::{
delete_splits_from_storage_and_metastore, run_garbage_collect, DeleteSplitsError,
SplitRemovalInfo,
};

#[derive(Error, Debug)]
pub enum IndexServiceError {
#[error("Failed to resolve the storage `{0}`.")]
Expand Down Expand Up @@ -351,8 +352,10 @@ pub async fn validate_storage_uri(

#[cfg(test)]
mod tests {

use quickwit_common::uri::Uri;
use quickwit_metastore::metastore_for_test;
use quickwit_config::IndexConfig;
use quickwit_metastore::{metastore_for_test, SplitMetadata};
use quickwit_storage::PutPayload;

use super::*;
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-index-management/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

mod garbage_collection;
mod index;

pub use garbage_collection::run_garbage_collect;
pub use index::{clear_cache_directory, validate_storage_uri, IndexService, IndexServiceError};
1 change: 1 addition & 0 deletions quickwit/quickwit-janitor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ utoipa = { workspace = true }
quickwit-actors = { workspace = true }
quickwit-cluster = { workspace = true }
quickwit-common = { workspace = true }
quickwit-index-management = { workspace = true }
quickwit-config = { workspace = true }
quickwit-directories = { workspace = true }
quickwit-doc-mapper = { workspace = true }
Expand Down
5 changes: 2 additions & 3 deletions quickwit/quickwit-janitor/src/actors/garbage_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,12 @@ use async_trait::async_trait;
use futures::{stream, StreamExt};
use itertools::Itertools;
use quickwit_actors::{Actor, ActorContext, Handler};
use quickwit_index_management::run_garbage_collect;
use quickwit_metastore::Metastore;
use quickwit_storage::StorageResolver;
use serde::Serialize;
use tracing::{error, info};

use crate::garbage_collection::run_garbage_collect;

const RUN_INTERVAL: Duration = Duration::from_secs(10 * 60); // 10 minutes

/// Staged files needs to be deleted if there was a failure.
Expand Down Expand Up @@ -122,7 +121,7 @@ impl GarbageCollector {
STAGED_GRACE_PERIOD,
DELETION_GRACE_PERIOD,
false,
Some(ctx),
Some(ctx.progress()),
).await;
Some((index_uid, gc_res))
}}).buffer_unordered(MAX_CONCURRENT_GC_TASKS);
Expand Down
5 changes: 0 additions & 5 deletions quickwit/quickwit-janitor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,12 @@ use tracing::info;

pub mod actors;
pub mod error;
mod garbage_collection;
mod janitor_service;
mod metrics;
mod retention_policy_execution;

pub use janitor_service::JanitorService;

pub use self::garbage_collection::{
delete_splits_from_storage_and_metastore, run_garbage_collect, DeleteSplitsError,
SplitRemovalInfo,
};
use crate::actors::{DeleteTaskService, GarbageCollector, RetentionPolicyExecutor};

#[derive(utoipa::OpenApi)]
Expand Down

0 comments on commit dcbe67a

Please sign in to comment.