Skip to content

Commit

Permalink
run deletion from gc concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
trinity-1686a committed Sep 9, 2024
1 parent ffbeb5b commit 2937cbb
Showing 1 changed file with 46 additions and 24 deletions.
70 changes: 46 additions & 24 deletions quickwit/quickwit-index-management/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;

use futures::Future;
use futures::{Future, StreamExt};
use itertools::Itertools;
use quickwit_common::pretty::PrettySample;
use quickwit_common::{Progress, ServiceStream};
Expand Down Expand Up @@ -183,7 +183,6 @@ async fn delete_splits_marked_for_deletion(
let mut failed_splits = Vec::new();

loop {
let mut exit = false;
let query = ListSplitsQuery::try_from_index_uids(index_uids.clone())?
.with_split_state(SplitState::MarkedForDeletion)
.with_update_timestamp_lte(updated_before_timestamp)
Expand Down Expand Up @@ -229,32 +228,55 @@ async fn delete_splits_marked_for_deletion(
.map(|meta| (meta.index_uid.clone(), meta))
.into_group_map();

for (index_uid, splits_metadata_to_delete) in splits_metadata_to_delete_per_index {
if let Some(storage) = storages.get(&index_uid).cloned() {
let delete_splits_result = delete_splits_from_storage_and_metastore(
index_uid,
storage,
metastore.clone(),
splits_metadata_to_delete,
progress_opt,
)
.await;

match delete_splits_result {
Ok(entries) => removed_splits.extend(entries),
let exit = futures::stream::iter(splits_metadata_to_delete_per_index)
.map(|(index_uid, splits_metadata_to_delete)| {
let storage = storages.get(&index_uid).cloned();
let metastore = metastore.clone();
async move {
if let Some(storage) = storage {
delete_splits_from_storage_and_metastore(
index_uid,
storage,
metastore,
splits_metadata_to_delete,
progress_opt,
)
.await
} else {
error!(
"we are trying to GC without knowing the storage, this shouldn't \
happen"
);
Err(DeleteSplitsError {
successes: Vec::new(),
storage_error: None,
storage_failures: splits_metadata_to_delete
.into_iter()
.map(|split| split.as_split_info())
.collect(),
metastore_error: None,
metastore_failures: Vec::new(),
})
}
}
})
.buffer_unordered(10)
.fold(false, |previous_err, result| {
let err_here = match result {
Ok(entries) => {
removed_splits.extend(entries);
false
}
Err(delete_splits_error) => {
removed_splits.extend(delete_splits_error.successes);
failed_splits.extend(delete_splits_error.storage_failures);
failed_splits.extend(delete_splits_error.metastore_failures);
exit = true;
true
}
}
} else {
error!("we are trying to GC without knowing the storage, this shouldn't happen");
// we stop there, or we could easily end up looping indefinitely if there are more
// than DELETE_SPLITS_BATCH_SIZE to delete in this index
exit = true;
};
}
};
std::future::ready(previous_err || err_here)
})
.await;
if num_splits_to_delete < DELETE_SPLITS_BATCH_SIZE || exit {
// stop the gc if this was the last batch or we encountered an error
// (otherwise we might try deleting the same splits in an endless loop)
Expand Down

0 comments on commit 2937cbb

Please sign in to comment.