Skip to content

Commit

Permalink
batch delete from gc (#5404)
Browse files Browse the repository at this point in the history
* better batching of delete operations

* run deletion from gc concurrently

* refactoring attempt

* rustfmt and clippy

---------

Co-authored-by: Paul Masurel <[email protected]>
  • Loading branch information
trinity-1686a and fulmicoton authored Sep 10, 2024
1 parent ec951aa commit 20b4956
Show file tree
Hide file tree
Showing 8 changed files with 275 additions and 123 deletions.
224 changes: 145 additions & 79 deletions quickwit/quickwit-index-management/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,18 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;

use futures::Future;
use anyhow::Context;
use futures::{Future, StreamExt};
use itertools::Itertools;
use quickwit_common::pretty::PrettySample;
use quickwit_common::{Progress, ServiceStream};
use quickwit_common::Progress;
use quickwit_metastore::{
ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitInfo,
SplitMetadata, SplitState,
};
use quickwit_proto::metastore::{
DeleteSplitsRequest, ListSplitsRequest, ListSplitsResponse, MarkSplitsForDeletionRequest,
MetastoreError, MetastoreResult, MetastoreService, MetastoreServiceClient,
DeleteSplitsRequest, ListSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError,
MetastoreService, MetastoreServiceClient,
};
use quickwit_proto::types::{IndexUid, SplitId};
use quickwit_storage::{BulkDeleteError, Storage};
Expand All @@ -41,7 +42,7 @@ use time::OffsetDateTime;
use tracing::{error, instrument};

/// The maximum number of splits that the GC should delete per attempt.
const DELETE_SPLITS_BATCH_SIZE: usize = 1000;
const DELETE_SPLITS_BATCH_SIZE: usize = 10_000;

/// [`DeleteSplitsError`] describes the errors that occurred during the deletion of splits from
/// storage and metastore.
Expand All @@ -67,7 +68,7 @@ where Fut: Future<Output = T> {
}

/// Information on what splits have and have not been cleaned up by the GC.
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct SplitRemovalInfo {
/// The set of splits that have been removed.
pub removed_split_entries: Vec<SplitInfo>,
Expand All @@ -94,17 +95,23 @@ pub async fn run_garbage_collect(
dry_run: bool,
progress_opt: Option<&Progress>,
) -> anyhow::Result<SplitRemovalInfo> {
// Select staged splits with staging timestamp older than grace period timestamp.
let grace_period_timestamp =
OffsetDateTime::now_utc().unix_timestamp() - staged_grace_period.as_secs() as i64;

let index_uids: Vec<IndexUid> = indexes.keys().cloned().collect();

let query = ListSplitsQuery::try_from_index_uids(index_uids.clone())?
let Some(list_splits_query_for_index_uids) =
ListSplitsQuery::try_from_index_uids(index_uids.clone())
else {
return Ok(SplitRemovalInfo::default());
};
let list_splits_query = list_splits_query_for_index_uids
.clone()
.with_split_state(SplitState::Staged)
.with_update_timestamp_lte(grace_period_timestamp);

let list_deletable_staged_request = ListSplitsRequest::try_from_list_splits_query(&query)?;
let list_deletable_staged_request =
ListSplitsRequest::try_from_list_splits_query(&list_splits_query)?;
let deletable_staged_splits: Vec<SplitMetadata> = protect_future(
progress_opt,
metastore.list_splits(list_deletable_staged_request),
Expand All @@ -114,8 +121,8 @@ pub async fn run_garbage_collect(
.await?;

if dry_run {
let marked_for_deletion_query = ListSplitsQuery::try_from_index_uids(index_uids.clone())?
.with_split_state(SplitState::MarkedForDeletion);
let marked_for_deletion_query =
list_splits_query_for_index_uids.with_split_state(SplitState::MarkedForDeletion);
let marked_for_deletion_request =
ListSplitsRequest::try_from_list_splits_query(&marked_for_deletion_query)?;
let mut splits_marked_for_deletion: Vec<SplitMetadata> = protect_future(
Expand Down Expand Up @@ -157,64 +164,141 @@ pub async fn run_garbage_collect(
let updated_before_timestamp =
OffsetDateTime::now_utc().unix_timestamp() - deletion_grace_period.as_secs() as i64;

delete_splits_marked_for_deletion(
Ok(delete_splits_marked_for_deletion_several_indexes(
index_uids,
updated_before_timestamp,
metastore,
indexes,
progress_opt,
)
.await
.await)
}
#[instrument(skip(index_uids, storages, metastore, progress_opt), fields(num_indexes=%index_uids.len()))]

async fn delete_splits(
splits_metadata_to_delete_per_index: HashMap<IndexUid, Vec<SplitMetadata>>,
storages: &HashMap<IndexUid, Arc<dyn Storage>>,
metastore: MetastoreServiceClient,
progress_opt: Option<&Progress>,
split_removal_info: &mut SplitRemovalInfo,
) -> Result<(), ()> {
let mut delete_split_from_index_res_stream =
futures::stream::iter(splits_metadata_to_delete_per_index)
.map(|(index_uid, splits_metadata_to_delete)| {
let storage = storages.get(&index_uid).cloned();
let metastore = metastore.clone();
async move {
if let Some(storage) = storage {
delete_splits_from_storage_and_metastore(
index_uid,
storage,
metastore,
splits_metadata_to_delete,
progress_opt,
)
.await
} else {
error!(
"we are trying to GC without knowing the storage, this shouldn't \
happen"
);
Err(DeleteSplitsError {
successes: Vec::new(),
storage_error: None,
storage_failures: splits_metadata_to_delete
.into_iter()
.map(|split| split.as_split_info())
.collect(),
metastore_error: None,
metastore_failures: Vec::new(),
})
}
}
})
.buffer_unordered(10);
let mut error_encountered = false;
while let Some(delete_split_result) = delete_split_from_index_res_stream.next().await {
match delete_split_result {
Ok(entries) => {
split_removal_info.removed_split_entries.extend(entries);
}
Err(delete_split_error) => {
split_removal_info
.removed_split_entries
.extend(delete_split_error.successes);
split_removal_info
.failed_splits
.extend(delete_split_error.storage_failures);
split_removal_info
.failed_splits
.extend(delete_split_error.metastore_failures);
error_encountered = true;
}
}
}
if error_encountered {
Err(())
} else {
Ok(())
}
}

/// Fetch the list metadata from the metastore and returns them as a Vec.
async fn list_splits_metadata(
metastore: &MetastoreServiceClient,
query: &ListSplitsQuery,
) -> anyhow::Result<Vec<SplitMetadata>> {
let list_splits_request = ListSplitsRequest::try_from_list_splits_query(query)
.context("failed to build list splits request")?;
let splits_to_delete_stream = metastore
.list_splits(list_splits_request)
.await
.context("failed to fetch stream splits")?;
let splits = splits_to_delete_stream
.collect_splits_metadata()
.await
.context("failed to collect splits")?;
Ok(splits)
}

/// Removes any splits marked for deletion which haven't been
/// updated after `updated_before_timestamp` in batches of 1000 splits.
///
/// The aim of this is to spread the load out across a longer period
/// rather than short, heavy bursts on the metastore and storage system itself.
async fn delete_splits_marked_for_deletion(
#[instrument(skip(index_uids, storages, metastore, progress_opt), fields(num_indexes=%index_uids.len()))]
async fn delete_splits_marked_for_deletion_several_indexes(
index_uids: Vec<IndexUid>,
updated_before_timestamp: i64,
metastore: MetastoreServiceClient,
storages: HashMap<IndexUid, Arc<dyn Storage>>,
progress_opt: Option<&Progress>,
) -> anyhow::Result<SplitRemovalInfo> {
let mut removed_splits = Vec::new();
let mut failed_splits = Vec::new();

'outer: loop {
let mut exit = false;
let query = ListSplitsQuery::try_from_index_uids(index_uids.clone())?
.with_split_state(SplitState::MarkedForDeletion)
.with_update_timestamp_lte(updated_before_timestamp)
.with_limit(DELETE_SPLITS_BATCH_SIZE);

let list_splits_request = match ListSplitsRequest::try_from_list_splits_query(&query) {
Ok(request) => request,
Err(error) => {
error!(error = ?error, "failed to build list splits request");
) -> SplitRemovalInfo {
let mut split_removal_info = SplitRemovalInfo::default();

let Some(list_splits_query) = ListSplitsQuery::try_from_index_uids(index_uids) else {
error!("failed to create list splits query. this should never happen");
return split_removal_info;
};

let list_splits_query = list_splits_query
.with_split_state(SplitState::MarkedForDeletion)
.with_update_timestamp_lte(updated_before_timestamp)
.with_limit(DELETE_SPLITS_BATCH_SIZE)
.sort_by_index_uid();

loop {
let splits_metadata_to_delete: Vec<SplitMetadata> = match protect_future(
progress_opt,
list_splits_metadata(&metastore, &list_splits_query),
)
.await
{
Ok(splits) => splits,
Err(list_splits_err) => {
error!(error=?list_splits_err, "failed to list splits");
break;
}
};
let splits_stream_result =
protect_future(progress_opt, metastore.list_splits(list_splits_request)).await;
let splits_to_delete_stream: ServiceStream<MetastoreResult<ListSplitsResponse>> =
match splits_stream_result {
Ok(splits_stream) => splits_stream,
Err(error) => {
error!(error = ?error, "failed to fetch stream splits");
break;
}
};

let splits_metadata_to_delete: Vec<SplitMetadata> =
match splits_to_delete_stream.collect_splits_metadata().await {
Ok(splits) => splits,
Err(error) => {
error!(error = ?error, "failed to collect splits");
break;
}
};

let num_splits_to_delete = splits_metadata_to_delete.len();

Expand All @@ -228,41 +312,23 @@ async fn delete_splits_marked_for_deletion(
.map(|meta| (meta.index_uid.clone(), meta))
.into_group_map();

for (index_uid, splits_metadata_to_delete) in splits_metadata_to_delete_per_index {
let Some(storage) = storages.get(&index_uid).cloned() else {
error!("we are trying to GC without knowing the storage, this shouldn't happen");
// we stop there, or we could easily end up looping indefinitely if there are more
// than DELETE_SPLITS_BATCH_SIZE to delete in this index
break 'outer;
};
let delete_splits_result = delete_splits_from_storage_and_metastore(
index_uid,
storage,
metastore.clone(),
splits_metadata_to_delete,
progress_opt,
)
.await;

match delete_splits_result {
Ok(entries) => removed_splits.extend(entries),
Err(delete_splits_error) => {
failed_splits.extend(delete_splits_error.storage_failures);
failed_splits.extend(delete_splits_error.metastore_failures);
exit = true;
}
}
}
if num_splits_to_delete < DELETE_SPLITS_BATCH_SIZE || exit {
let delete_split_res = delete_splits(
splits_metadata_to_delete_per_index,
&storages,
metastore.clone(),
progress_opt,
&mut split_removal_info,
)
.await;

if num_splits_to_delete < DELETE_SPLITS_BATCH_SIZE || delete_split_res.is_err() {
// stop the gc if this was the last batch or we encountered an error
// (otherwise we might try deleting the same splits in an endless loop)
break;
}
}
Ok(SplitRemovalInfo {
removed_split_entries: removed_splits,
failed_splits,
})

split_removal_info
}

/// Delete a list of splits from the storage and the metastore.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use tracing::{info, warn};

use super::MutationOccurred;
use crate::checkpoint::IndexCheckpointDelta;
use crate::metastore::use_shard_api;
use crate::metastore::{use_shard_api, SortBy};
use crate::{split_tag_filter, IndexMetadata, ListSplitsQuery, Split, SplitMetadata, SplitState};

/// A `FileBackedIndex` object carries an index metadata and its split metadata.
Expand Down Expand Up @@ -428,8 +428,9 @@ impl FileBackedIndex {
let limit = query.limit.unwrap_or(usize::MAX);
let offset = query.offset.unwrap_or_default();

let splits: Vec<Split> = if query.sort_by_staleness {
self.splits
let splits: Vec<Split> = match query.sort_by {
SortBy::Staleness => self
.splits
.values()
.filter(|split| split_query_predicate(split, query))
.sorted_unstable_by(|left_split, right_split| {
Expand All @@ -446,15 +447,24 @@ impl FileBackedIndex {
.skip(offset)
.take(limit)
.cloned()
.collect()
} else {
self.splits
.collect(),
SortBy::IndexUid => self
.splits
.values()
.filter(|split| split_query_predicate(split, query))
.sorted_unstable_by_key(|split| &split.split_metadata.index_uid)
.skip(offset)
.take(limit)
.cloned()
.collect(),
SortBy::None => self
.splits
.values()
.filter(|split| split_query_predicate(split, query))
.skip(offset)
.take(limit)
.cloned()
.collect()
.collect(),
};
Ok(splits)
}
Expand Down
Loading

0 comments on commit 20b4956

Please sign in to comment.