Skip to content

Commit

Permalink
improve gc resilience and add metrics (#5420)
Browse files Browse the repository at this point in the history
* add search after to listing splits

* don't stop gcing after first error

* add metrics to garbage collector

* add test for sort

and fix bug in file metastore with multiple index_uids

* add test for search after split
  • Loading branch information
trinity-1686a authored Oct 1, 2024
1 parent 8758287 commit 2dcc696
Show file tree
Hide file tree
Showing 12 changed files with 670 additions and 91 deletions.
73 changes: 62 additions & 11 deletions quickwit/quickwit-index-management/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::time::Duration;
use anyhow::Context;
use futures::{Future, StreamExt};
use itertools::Itertools;
use quickwit_common::metrics::IntCounter;
use quickwit_common::pretty::PrettySample;
use quickwit_common::Progress;
use quickwit_metastore::{
Expand All @@ -44,6 +45,26 @@ use tracing::{error, instrument};
/// The maximum number of splits that the GC should delete per attempt.
const DELETE_SPLITS_BATCH_SIZE: usize = 10_000;

pub struct GcMetrics {
pub deleted_splits: IntCounter,
pub deleted_bytes: IntCounter,
pub failed_splits: IntCounter,
}

trait RecordGcMetrics {
fn record(&self, num_delete_splits: usize, num_deleted_bytes: u64, num_failed_splits: usize);
}

impl RecordGcMetrics for Option<GcMetrics> {
fn record(&self, num_deleted_splits: usize, num_deleted_bytes: u64, num_failed_splits: usize) {
if let Some(metrics) = self {
metrics.deleted_splits.inc_by(num_deleted_splits as u64);
metrics.deleted_bytes.inc_by(num_deleted_bytes);
metrics.failed_splits.inc_by(num_failed_splits as u64);
}
}
}

/// [`DeleteSplitsError`] describes the errors that occurred during the deletion of splits from
/// storage and metastore.
#[derive(Error, Debug)]
Expand Down Expand Up @@ -94,6 +115,7 @@ pub async fn run_garbage_collect(
deletion_grace_period: Duration,
dry_run: bool,
progress_opt: Option<&Progress>,
metrics: Option<GcMetrics>,
) -> anyhow::Result<SplitRemovalInfo> {
let grace_period_timestamp =
OffsetDateTime::now_utc().unix_timestamp() - staged_grace_period.as_secs() as i64;
Expand Down Expand Up @@ -170,6 +192,7 @@ pub async fn run_garbage_collect(
metastore,
indexes,
progress_opt,
metrics,
)
.await)
}
Expand All @@ -179,6 +202,7 @@ async fn delete_splits(
storages: &HashMap<IndexUid, Arc<dyn Storage>>,
metastore: MetastoreServiceClient,
progress_opt: Option<&Progress>,
metrics: &Option<GcMetrics>,
split_removal_info: &mut SplitRemovalInfo,
) -> Result<(), ()> {
let mut delete_split_from_index_res_stream =
Expand Down Expand Up @@ -219,9 +243,26 @@ async fn delete_splits(
while let Some(delete_split_result) = delete_split_from_index_res_stream.next().await {
match delete_split_result {
Ok(entries) => {
let deleted_bytes = entries
.iter()
.map(|entry| entry.file_size_bytes.as_u64())
.sum::<u64>();
let deleted_splits_count = entries.len();

metrics.record(deleted_splits_count, deleted_bytes, 0);
split_removal_info.removed_split_entries.extend(entries);
}
Err(delete_split_error) => {
let deleted_bytes = delete_split_error
.successes
.iter()
.map(|entry| entry.file_size_bytes.as_u64())
.sum::<u64>();
let deleted_splits_count = delete_split_error.successes.len();
let failed_splits_count = delete_split_error.storage_failures.len()
+ delete_split_error.metastore_failures.len();

metrics.record(deleted_splits_count, deleted_bytes, failed_splits_count);
split_removal_info
.removed_split_entries
.extend(delete_split_error.successes);
Expand Down Expand Up @@ -265,13 +306,14 @@ async fn list_splits_metadata(
///
/// The aim of this is to spread the load out across a longer period
/// rather than short, heavy bursts on the metastore and storage system itself.
#[instrument(skip(index_uids, storages, metastore, progress_opt), fields(num_indexes=%index_uids.len()))]
#[instrument(skip(index_uids, storages, metastore, progress_opt, metrics), fields(num_indexes=%index_uids.len()))]
async fn delete_splits_marked_for_deletion_several_indexes(
index_uids: Vec<IndexUid>,
updated_before_timestamp: i64,
metastore: MetastoreServiceClient,
storages: HashMap<IndexUid, Arc<dyn Storage>>,
progress_opt: Option<&Progress>,
metrics: Option<GcMetrics>,
) -> SplitRemovalInfo {
let mut split_removal_info = SplitRemovalInfo::default();

Expand All @@ -280,7 +322,7 @@ async fn delete_splits_marked_for_deletion_several_indexes(
return split_removal_info;
};

let list_splits_query = list_splits_query
let mut list_splits_query = list_splits_query
.with_split_state(SplitState::MarkedForDeletion)
.with_update_timestamp_lte(updated_before_timestamp)
.with_limit(DELETE_SPLITS_BATCH_SIZE)
Expand All @@ -300,30 +342,34 @@ async fn delete_splits_marked_for_deletion_several_indexes(
}
};

let num_splits_to_delete = splits_metadata_to_delete.len();

if num_splits_to_delete == 0 {
// set split after which to search for the next loop
let Some(last_split_metadata) = splits_metadata_to_delete.last() else {
break;
}
};
list_splits_query = list_splits_query.after_split(last_split_metadata);

let num_splits_to_delete = splits_metadata_to_delete.len();

let splits_metadata_to_delete_per_index: HashMap<IndexUid, Vec<SplitMetadata>> =
splits_metadata_to_delete
.into_iter()
.map(|meta| (meta.index_uid.clone(), meta))
.into_group_map();

let delete_split_res = delete_splits(
// ignore return we continue either way
let _: Result<(), ()> = delete_splits(
splits_metadata_to_delete_per_index,
&storages,
metastore.clone(),
progress_opt,
&metrics,
&mut split_removal_info,
)
.await;

if num_splits_to_delete < DELETE_SPLITS_BATCH_SIZE || delete_split_res.is_err() {
// stop the gc if this was the last batch or we encountered an error
// (otherwise we might try deleting the same splits in an endless loop)
if num_splits_to_delete < DELETE_SPLITS_BATCH_SIZE {
// stop the gc if this was the last batch
// we are guaranteed to make progress due to .after_split()
break;
}
}
Expand All @@ -345,7 +391,7 @@ pub async fn delete_splits_from_storage_and_metastore(
metastore: MetastoreServiceClient,
splits: Vec<SplitMetadata>,
progress_opt: Option<&Progress>,
) -> anyhow::Result<Vec<SplitInfo>, DeleteSplitsError> {
) -> Result<Vec<SplitInfo>, DeleteSplitsError> {
let mut split_infos: HashMap<PathBuf, SplitInfo> = HashMap::with_capacity(splits.len());

for split in splits {
Expand Down Expand Up @@ -511,6 +557,7 @@ mod tests {
Duration::from_secs(30),
false,
None,
None,
)
.await
.unwrap();
Expand Down Expand Up @@ -538,6 +585,7 @@ mod tests {
Duration::from_secs(30),
false,
None,
None,
)
.await
.unwrap();
Expand Down Expand Up @@ -615,6 +663,7 @@ mod tests {
Duration::from_secs(30),
false,
None,
None,
)
.await
.unwrap();
Expand Down Expand Up @@ -642,6 +691,7 @@ mod tests {
Duration::from_secs(0),
false,
None,
None,
)
.await
.unwrap();
Expand Down Expand Up @@ -680,6 +730,7 @@ mod tests {
Duration::from_secs(30),
false,
None,
None,
)
.await
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-index-management/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ impl IndexService {
Duration::ZERO,
dry_run,
None,
None,
)
.await?;

Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-index-management/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@
mod garbage_collection;
mod index;

pub use garbage_collection::run_garbage_collect;
pub use garbage_collection::{run_garbage_collect, GcMetrics};
pub use index::{clear_cache_directory, validate_storage_uri, IndexService, IndexServiceError};
Loading

0 comments on commit 2dcc696

Please sign in to comment.