Skip to content

Commit

Permalink
gc multiple indexes at once (#5380)
Browse files Browse the repository at this point in the history
* run gc collectively instead of per index

* update tests
  • Loading branch information
trinity-1686a authored Sep 5, 2024
1 parent b41c3b7 commit e905929
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 212 deletions.
120 changes: 70 additions & 50 deletions quickwit/quickwit-index-management/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::sync::Arc;
use std::time::Duration;

use futures::Future;
use itertools::Itertools;
use quickwit_common::pretty::PrettySample;
use quickwit_common::{Progress, ServiceStream};
use quickwit_metastore::{
Expand Down Expand Up @@ -66,6 +67,7 @@ where Fut: Future<Output = T> {
}

/// Information on what splits have and have not been cleaned up by the GC.
#[derive(Debug)]
pub struct SplitRemovalInfo {
/// The set of splits that have been removed.
pub removed_split_entries: Vec<SplitInfo>,
Expand All @@ -75,7 +77,7 @@ pub struct SplitRemovalInfo {

/// Detect all dangling splits and associated files from the index and removes them.
///
/// * `index_id` - The target index id.
/// * `indexes` - The target index uids and storages.
/// * `storage - The storage managing the target index.
/// * `metastore` - The metastore managing the target index.
/// * `staged_grace_period` - Threshold period after which a staged split can be safely garbage
Expand All @@ -85,8 +87,7 @@ pub struct SplitRemovalInfo {
/// * `dry_run` - Should this only return a list of affected files without performing deletion.
/// * `progress` - For reporting progress (useful when called from within a quickwit actor).
pub async fn run_garbage_collect(
index_uid: IndexUid,
storage: Arc<dyn Storage>,
indexes: HashMap<IndexUid, Arc<dyn Storage>>,
metastore: MetastoreServiceClient,
staged_grace_period: Duration,
deletion_grace_period: Duration,
Expand All @@ -97,7 +98,9 @@ pub async fn run_garbage_collect(
let grace_period_timestamp =
OffsetDateTime::now_utc().unix_timestamp() - staged_grace_period.as_secs() as i64;

let query = ListSplitsQuery::for_index(index_uid.clone())
let index_uids: Vec<IndexUid> = indexes.keys().cloned().collect();

let query = ListSplitsQuery::try_from_index_uids(index_uids.clone())?
.with_split_state(SplitState::Staged)
.with_update_timestamp_lte(grace_period_timestamp);

Expand All @@ -111,7 +114,7 @@ pub async fn run_garbage_collect(
.await?;

if dry_run {
let marked_for_deletion_query = ListSplitsQuery::for_index(index_uid.clone())
let marked_for_deletion_query = ListSplitsQuery::try_from_index_uids(index_uids.clone())?
.with_split_state(SplitState::MarkedForDeletion);
let marked_for_deletion_request =
ListSplitsRequest::try_from_list_splits_query(&marked_for_deletion_query)?;
Expand All @@ -135,13 +138,13 @@ pub async fn run_garbage_collect(
}

// Schedule all eligible staged splits for delete
let split_ids: Vec<SplitId> = deletable_staged_splits
.iter()
.map(|split| split.split_id.to_string())
.collect();
if !split_ids.is_empty() {
let split_ids: HashMap<IndexUid, Vec<SplitId>> = deletable_staged_splits
.into_iter()
.map(|split| (split.index_uid, split.split_id))
.into_group_map();
for (index_uid, split_ids) in split_ids {
let mark_splits_for_deletion_request =
MarkSplitsForDeletionRequest::new(index_uid.clone(), split_ids);
MarkSplitsForDeletionRequest::new(index_uid, split_ids);
protect_future(
progress_opt,
metastore.mark_splits_for_deletion(mark_splits_for_deletion_request),
Expand All @@ -154,35 +157,33 @@ pub async fn run_garbage_collect(
let updated_before_timestamp =
OffsetDateTime::now_utc().unix_timestamp() - deletion_grace_period.as_secs() as i64;

let deleted_splits = delete_splits_marked_for_deletion(
index_uid,
delete_splits_marked_for_deletion(
index_uids,
updated_before_timestamp,
storage,
metastore,
indexes,
progress_opt,
)
.await;

Ok(deleted_splits)
.await
}
#[instrument(skip(storage, metastore, progress_opt))]
#[instrument(skip(storages, metastore, progress_opt))]
/// Removes any splits marked for deletion which haven't been
/// updated after `updated_before_timestamp` in batches of 1000 splits.
///
/// The aim of this is to spread the load out across a longer period
/// rather than short, heavy bursts on the metastore and storage system itself.
async fn delete_splits_marked_for_deletion(
index_uid: IndexUid,
index_uids: Vec<IndexUid>,
updated_before_timestamp: i64,
storage: Arc<dyn Storage>,
metastore: MetastoreServiceClient,
storages: HashMap<IndexUid, Arc<dyn Storage>>,
progress_opt: Option<&Progress>,
) -> SplitRemovalInfo {
) -> anyhow::Result<SplitRemovalInfo> {
let mut removed_splits = Vec::new();
let mut failed_splits = Vec::new();

loop {
let query = ListSplitsQuery::for_index(index_uid.clone())
'outer: loop {
let query = ListSplitsQuery::try_from_index_uids(index_uids.clone())?
.with_split_state(SplitState::MarkedForDeletion)
.with_update_timestamp_lte(updated_before_timestamp)
.with_limit(DELETE_SPLITS_BATCH_SIZE);
Expand Down Expand Up @@ -219,31 +220,46 @@ async fn delete_splits_marked_for_deletion(
if num_splits_to_delete == 0 {
break;
}
let delete_splits_result = delete_splits_from_storage_and_metastore(
index_uid.clone(),
storage.clone(),
metastore.clone(),
splits_metadata_to_delete,
progress_opt,
)
.await;

match delete_splits_result {
Ok(entries) => removed_splits.extend(entries),
Err(delete_splits_error) => {
failed_splits.extend(delete_splits_error.storage_failures);
failed_splits.extend(delete_splits_error.metastore_failures);
break;
let splits_metadata_to_delete_per_index: HashMap<IndexUid, Vec<SplitMetadata>> =
splits_metadata_to_delete
.into_iter()
.map(|meta| (meta.index_uid.clone(), meta))
.into_group_map();

for (index_uid, splits_metadata_to_delete) in splits_metadata_to_delete_per_index {
let Some(storage) = storages.get(&index_uid).cloned() else {
error!("we are trying to GC without knowing the storage, this shouldn't happen");
// we stop there, or we could easily end up looping indefinitely if there are more
// than DELETE_SPLITS_BATCH_SIZE to delete in this index
break 'outer;
};
let delete_splits_result = delete_splits_from_storage_and_metastore(
index_uid,
storage,
metastore.clone(),
splits_metadata_to_delete,
progress_opt,
)
.await;

match delete_splits_result {
Ok(entries) => removed_splits.extend(entries),
Err(delete_splits_error) => {
failed_splits.extend(delete_splits_error.storage_failures);
failed_splits.extend(delete_splits_error.metastore_failures);
break;
}
}
}
if num_splits_to_delete < DELETE_SPLITS_BATCH_SIZE {
break;
}
}
SplitRemovalInfo {
Ok(SplitRemovalInfo {
removed_split_entries: removed_splits,
failed_splits,
}
})
}

/// Delete a list of splits from the storage and the metastore.
Expand Down Expand Up @@ -369,6 +385,12 @@ mod tests {
use super::*;
use crate::run_garbage_collect;

fn hashmap<K: Eq + std::hash::Hash, V>(key: K, value: V) -> HashMap<K, V> {
let mut map = HashMap::new();
map.insert(key, value);
map
}

#[tokio::test]
async fn test_run_gc_marks_stale_staged_splits_for_deletion_after_grace_period() {
let storage = storage_for_test();
Expand Down Expand Up @@ -414,8 +436,7 @@ mod tests {

// The staging grace period hasn't passed yet so the split remains staged.
run_garbage_collect(
index_uid.clone(),
storage.clone(),
hashmap(index_uid.clone(), storage.clone()),
metastore.clone(),
Duration::from_secs(30),
Duration::from_secs(30),
Expand All @@ -442,8 +463,7 @@ mod tests {

// The staging grace period has passed so the split is marked for deletion.
run_garbage_collect(
index_uid.clone(),
storage.clone(),
hashmap(index_uid.clone(), storage.clone()),
metastore.clone(),
Duration::from_secs(0),
Duration::from_secs(30),
Expand Down Expand Up @@ -489,7 +509,7 @@ mod tests {
let split_id = "test-run-gc--split";
let split_metadata = SplitMetadata {
split_id: split_id.to_string(),
index_uid: IndexUid::new_with_random_ulid(index_id),
index_uid: index_uid.clone(),
..Default::default()
};
let stage_splits_request =
Expand Down Expand Up @@ -520,8 +540,7 @@ mod tests {

// The delete grace period hasn't passed yet so the split remains marked for deletion.
run_garbage_collect(
index_uid.clone(),
storage.clone(),
hashmap(index_uid.clone(), storage.clone()),
metastore.clone(),
Duration::from_secs(30),
Duration::from_secs(30),
Expand All @@ -548,8 +567,7 @@ mod tests {

// The delete grace period has passed so the split is deleted.
run_garbage_collect(
index_uid.clone(),
storage.clone(),
hashmap(index_uid.clone(), storage.clone()),
metastore.clone(),
Duration::from_secs(30),
Duration::from_secs(0),
Expand Down Expand Up @@ -584,8 +602,10 @@ mod tests {
.times(2)
.returning(|_| Ok(ServiceStream::empty()));
run_garbage_collect(
IndexUid::new_with_random_ulid("index-test-gc-deletes"),
storage.clone(),
hashmap(
IndexUid::new_with_random_ulid("index-test-gc-deletes"),
storage.clone(),
),
MetastoreServiceClient::from_mock(mock_metastore),
Duration::from_secs(30),
Duration::from_secs(30),
Expand Down
3 changes: 1 addition & 2 deletions quickwit/quickwit-index-management/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,7 @@ impl IndexService {
.await?;

let deleted_entries = run_garbage_collect(
index_uid,
storage,
[(index_uid, storage)].into_iter().collect(),
self.metastore.clone(),
grace_period,
// deletion_grace_period of zero, so that a cli call directly deletes splits after
Expand Down
Loading

0 comments on commit e905929

Please sign in to comment.