diff --git a/quickwit/quickwit-index-management/src/garbage_collection.rs b/quickwit/quickwit-index-management/src/garbage_collection.rs index 03a6b7e88c6..72b7c00b66d 100644 --- a/quickwit/quickwit-index-management/src/garbage_collection.rs +++ b/quickwit/quickwit-index-management/src/garbage_collection.rs @@ -22,7 +22,7 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; -use futures::Future; +use futures::{Future, StreamExt}; use itertools::Itertools; use quickwit_common::pretty::PrettySample; use quickwit_common::{Progress, ServiceStream}; @@ -183,7 +183,6 @@ async fn delete_splits_marked_for_deletion( let mut failed_splits = Vec::new(); loop { - let mut exit = false; let query = ListSplitsQuery::try_from_index_uids(index_uids.clone())? .with_split_state(SplitState::MarkedForDeletion) .with_update_timestamp_lte(updated_before_timestamp) @@ -229,32 +228,55 @@ async fn delete_splits_marked_for_deletion( .map(|meta| (meta.index_uid.clone(), meta)) .into_group_map(); - for (index_uid, splits_metadata_to_delete) in splits_metadata_to_delete_per_index { - if let Some(storage) = storages.get(&index_uid).cloned() { - let delete_splits_result = delete_splits_from_storage_and_metastore( - index_uid, - storage, - metastore.clone(), - splits_metadata_to_delete, - progress_opt, - ) - .await; - - match delete_splits_result { - Ok(entries) => removed_splits.extend(entries), + let exit = futures::stream::iter(splits_metadata_to_delete_per_index) + .map(|(index_uid, splits_metadata_to_delete)| { + let storage = storages.get(&index_uid).cloned(); + let metastore = metastore.clone(); + async move { + if let Some(storage) = storage { + delete_splits_from_storage_and_metastore( + index_uid, + storage, + metastore, + splits_metadata_to_delete, + progress_opt, + ) + .await + } else { + error!( + "we are trying to GC without knowing the storage, this shouldn't \ + happen" + ); + Err(DeleteSplitsError { + successes: Vec::new(), + storage_error: None, + storage_failures: splits_metadata_to_delete + .into_iter() + .map(|split| split.as_split_info()) + .collect(), + metastore_error: None, + metastore_failures: Vec::new(), + }) + } + } + }) + .buffer_unordered(10) + .fold(false, |previous_err, result| { + let err_here = match result { + Ok(entries) => { + removed_splits.extend(entries); + false + } Err(delete_splits_error) => { + removed_splits.extend(delete_splits_error.successes); failed_splits.extend(delete_splits_error.storage_failures); failed_splits.extend(delete_splits_error.metastore_failures); - exit = true; + true } - } - } else { - error!("we are trying to GC without knowing the storage, this shouldn't happen"); - // we stop there, or we could easily end up looping indefinitely if there are more - // than DELETE_SPLITS_BATCH_SIZE to delete in this index - exit = true; - }; - } + }; + std::future::ready(previous_err || err_here) + }) + .await; if num_splits_to_delete < DELETE_SPLITS_BATCH_SIZE || exit { // stop the gc if this was the last batch or we encountered an error // (otherwise we might try deleting the same splits in an endless loop)