Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test gc metrics #5432

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 51 additions & 11 deletions quickwit/quickwit-index-management/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ use tracing::{error, instrument};
/// The maximum number of splits that the GC should delete per attempt.
const DELETE_SPLITS_BATCH_SIZE: usize = 10_000;

pub trait RecordGcMetrics: Sync {
fn record(&self, num_delete_splits: usize, num_deleted_bytes: u64, num_failed_splits: usize);
}

pub(crate) struct DoNotRecordGcMetrics;

impl RecordGcMetrics for DoNotRecordGcMetrics {
fn record(&self, _num_deleted_splits: usize, _num_deleted_bytes: u64, _num_failed_splits: usize) {}
}

/// [`DeleteSplitsError`] describes the errors that occurred during the deletion of splits from
/// storage and metastore.
#[derive(Error, Debug)]
Expand Down Expand Up @@ -94,6 +104,7 @@ pub async fn run_garbage_collect(
deletion_grace_period: Duration,
dry_run: bool,
progress_opt: Option<&Progress>,
metrics: &dyn RecordGcMetrics,
) -> anyhow::Result<SplitRemovalInfo> {
let grace_period_timestamp =
OffsetDateTime::now_utc().unix_timestamp() - staged_grace_period.as_secs() as i64;
Expand Down Expand Up @@ -170,6 +181,7 @@ pub async fn run_garbage_collect(
metastore,
indexes,
progress_opt,
metrics,
)
.await)
}
Expand All @@ -179,6 +191,7 @@ async fn delete_splits(
storages: &HashMap<IndexUid, Arc<dyn Storage>>,
metastore: MetastoreServiceClient,
progress_opt: Option<&Progress>,
metrics: &dyn RecordGcMetrics,
split_removal_info: &mut SplitRemovalInfo,
) -> Result<(), ()> {
let mut delete_split_from_index_res_stream =
Expand Down Expand Up @@ -219,9 +232,26 @@ async fn delete_splits(
while let Some(delete_split_result) = delete_split_from_index_res_stream.next().await {
match delete_split_result {
Ok(entries) => {
let deleted_bytes = entries
.iter()
.map(|entry| entry.file_size_bytes.as_u64())
.sum::<u64>();
let deleted_splits_count = entries.len();

metrics.record(deleted_splits_count, deleted_bytes, 0);
split_removal_info.removed_split_entries.extend(entries);
}
Err(delete_split_error) => {
let deleted_bytes = delete_split_error
.successes
.iter()
.map(|entry| entry.file_size_bytes.as_u64())
.sum::<u64>();
let deleted_splits_count = delete_split_error.successes.len();
let failed_splits_count = delete_split_error.storage_failures.len()
+ delete_split_error.metastore_failures.len();

metrics.record(deleted_splits_count, deleted_bytes, failed_splits_count);
split_removal_info
.removed_split_entries
.extend(delete_split_error.successes);
Expand Down Expand Up @@ -265,13 +295,14 @@ async fn list_splits_metadata(
///
/// The aim of this is to spread the load out across a longer period
/// rather than short, heavy bursts on the metastore and storage system itself.
#[instrument(skip(index_uids, storages, metastore, progress_opt), fields(num_indexes=%index_uids.len()))]
#[instrument(skip(index_uids, storages, metastore, progress_opt, metrics), fields(num_indexes=%index_uids.len()))]
async fn delete_splits_marked_for_deletion_several_indexes(
index_uids: Vec<IndexUid>,
updated_before_timestamp: i64,
metastore: MetastoreServiceClient,
storages: HashMap<IndexUid, Arc<dyn Storage>>,
progress_opt: Option<&Progress>,
metrics: &dyn RecordGcMetrics,
) -> SplitRemovalInfo {
let mut split_removal_info = SplitRemovalInfo::default();

Expand All @@ -280,7 +311,7 @@ async fn delete_splits_marked_for_deletion_several_indexes(
return split_removal_info;
};

let list_splits_query = list_splits_query
let mut list_splits_query = list_splits_query
.with_split_state(SplitState::MarkedForDeletion)
.with_update_timestamp_lte(updated_before_timestamp)
.with_limit(DELETE_SPLITS_BATCH_SIZE)
Expand All @@ -300,30 +331,34 @@ async fn delete_splits_marked_for_deletion_several_indexes(
}
};

let num_splits_to_delete = splits_metadata_to_delete.len();

if num_splits_to_delete == 0 {
// set split after which to search for the next loop
let Some(last_split_metadata) = splits_metadata_to_delete.last() else {
break;
}
};
list_splits_query = list_splits_query.after_split(last_split_metadata);

let num_splits_to_delete = splits_metadata_to_delete.len();

let splits_metadata_to_delete_per_index: HashMap<IndexUid, Vec<SplitMetadata>> =
splits_metadata_to_delete
.into_iter()
.map(|meta| (meta.index_uid.clone(), meta))
.into_group_map();

let delete_split_res = delete_splits(
// ignore return we continue either way
let _: Result<(), ()> = delete_splits(
splits_metadata_to_delete_per_index,
&storages,
metastore.clone(),
progress_opt,
metrics,
&mut split_removal_info,
)
.await;

if num_splits_to_delete < DELETE_SPLITS_BATCH_SIZE || delete_split_res.is_err() {
// stop the gc if this was the last batch or we encountered an error
// (otherwise we might try deleting the same splits in an endless loop)
if num_splits_to_delete < DELETE_SPLITS_BATCH_SIZE {
// stop the gc if this was the last batch
// we are guaranteed to make progress due to .after_split()
break;
}
}
Expand All @@ -345,7 +380,7 @@ pub async fn delete_splits_from_storage_and_metastore(
metastore: MetastoreServiceClient,
splits: Vec<SplitMetadata>,
progress_opt: Option<&Progress>,
) -> anyhow::Result<Vec<SplitInfo>, DeleteSplitsError> {
) -> Result<Vec<SplitInfo>, DeleteSplitsError> {
let mut split_infos: HashMap<PathBuf, SplitInfo> = HashMap::with_capacity(splits.len());

for split in splits {
Expand Down Expand Up @@ -511,6 +546,7 @@ mod tests {
Duration::from_secs(30),
false,
None,
&DoNotRecordGcMetrics,
)
.await
.unwrap();
Expand Down Expand Up @@ -538,6 +574,7 @@ mod tests {
Duration::from_secs(30),
false,
None,
&DoNotRecordGcMetrics,
)
.await
.unwrap();
Expand Down Expand Up @@ -615,6 +652,7 @@ mod tests {
Duration::from_secs(30),
false,
None,
&DoNotRecordGcMetrics,
)
.await
.unwrap();
Expand Down Expand Up @@ -642,6 +680,7 @@ mod tests {
Duration::from_secs(0),
false,
None,
None,
)
.await
.unwrap();
Expand Down Expand Up @@ -680,6 +719,7 @@ mod tests {
Duration::from_secs(30),
false,
None,
None,
)
.await
.unwrap();
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-index-management/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use crate::garbage_collection::{
delete_splits_from_storage_and_metastore, run_garbage_collect, DeleteSplitsError,
SplitRemovalInfo,
};
use crate::DoNotRecordGcMetrics;

#[derive(Error, Debug)]
pub enum IndexServiceError {
Expand Down Expand Up @@ -373,6 +374,7 @@ impl IndexService {
Duration::ZERO,
dry_run,
None,
&DoNotRecordGcMetrics,
)
.await?;

Expand Down
3 changes: 3 additions & 0 deletions quickwit/quickwit-index-management/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,7 @@ mod garbage_collection;
mod index;

pub use garbage_collection::run_garbage_collect;
pub use garbage_collection::RecordGcMetrics;
pub use index::{clear_cache_directory, validate_storage_uri, IndexService, IndexServiceError};

use garbage_collection::DoNotRecordGcMetrics;
Loading
Loading