Skip to content

Commit

Permalink
better batching of delete operations
Browse files Browse the repository at this point in the history
  • Loading branch information
trinity-1686a committed Sep 9, 2024
1 parent 79acfe4 commit ffbeb5b
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 55 deletions.
46 changes: 24 additions & 22 deletions quickwit/quickwit-index-management/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use time::OffsetDateTime;
use tracing::{error, instrument};

/// The maximum number of splits that the GC should delete per attempt.
const DELETE_SPLITS_BATCH_SIZE: usize = 1000;
const DELETE_SPLITS_BATCH_SIZE: usize = 10_000;

/// [`DeleteSplitsError`] describes the errors that occurred during the deletion of splits from
/// storage and metastore.
Expand Down Expand Up @@ -182,12 +182,13 @@ async fn delete_splits_marked_for_deletion(
let mut removed_splits = Vec::new();
let mut failed_splits = Vec::new();

'outer: loop {
loop {
let mut exit = false;
let query = ListSplitsQuery::try_from_index_uids(index_uids.clone())?
.with_split_state(SplitState::MarkedForDeletion)
.with_update_timestamp_lte(updated_before_timestamp)
.with_limit(DELETE_SPLITS_BATCH_SIZE);
.with_limit(DELETE_SPLITS_BATCH_SIZE)
.sort_by_index_uid();

let list_splits_request = match ListSplitsRequest::try_from_list_splits_query(&query) {
Ok(request) => request,
Expand Down Expand Up @@ -229,29 +230,30 @@ async fn delete_splits_marked_for_deletion(
.into_group_map();

for (index_uid, splits_metadata_to_delete) in splits_metadata_to_delete_per_index {
let Some(storage) = storages.get(&index_uid).cloned() else {
if let Some(storage) = storages.get(&index_uid).cloned() {
let delete_splits_result = delete_splits_from_storage_and_metastore(
index_uid,
storage,
metastore.clone(),
splits_metadata_to_delete,
progress_opt,
)
.await;

match delete_splits_result {
Ok(entries) => removed_splits.extend(entries),
Err(delete_splits_error) => {
failed_splits.extend(delete_splits_error.storage_failures);
failed_splits.extend(delete_splits_error.metastore_failures);
exit = true;
}
}
} else {
error!("we are trying to GC without knowing the storage, this shouldn't happen");
// we stop there, or we could easily end up looping indefinitely if there are more
// than DELETE_SPLITS_BATCH_SIZE to delete in this index
break 'outer;
exit = true;
};
let delete_splits_result = delete_splits_from_storage_and_metastore(
index_uid,
storage,
metastore.clone(),
splits_metadata_to_delete,
progress_opt,
)
.await;

match delete_splits_result {
Ok(entries) => removed_splits.extend(entries),
Err(delete_splits_error) => {
failed_splits.extend(delete_splits_error.storage_failures);
failed_splits.extend(delete_splits_error.metastore_failures);
exit = true;
}
}
}
if num_splits_to_delete < DELETE_SPLITS_BATCH_SIZE || exit {
// stop the gc if this was the last batch or we encountered an error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use tracing::{info, warn};

use super::MutationOccurred;
use crate::checkpoint::IndexCheckpointDelta;
use crate::metastore::use_shard_api;
use crate::metastore::{use_shard_api, SortBy};
use crate::{split_tag_filter, IndexMetadata, ListSplitsQuery, Split, SplitMetadata, SplitState};

/// A `FileBackedIndex` object carries an index metadata and its split metadata.
Expand Down Expand Up @@ -428,8 +428,9 @@ impl FileBackedIndex {
let limit = query.limit.unwrap_or(usize::MAX);
let offset = query.offset.unwrap_or_default();

let splits: Vec<Split> = if query.sort_by_staleness {
self.splits
let splits: Vec<Split> = match query.sort_by {
SortBy::Staleness => self
.splits
.values()
.filter(|split| split_query_predicate(split, query))
.sorted_unstable_by(|left_split, right_split| {
Expand All @@ -446,15 +447,24 @@ impl FileBackedIndex {
.skip(offset)
.take(limit)
.cloned()
.collect()
} else {
self.splits
.collect(),
SortBy::IndexUid => self
.splits
.values()
.filter(|split| split_query_predicate(split, query))
.sorted_unstable_by_key(|split| &split.split_metadata.index_uid)
.skip(offset)
.take(limit)
.cloned()
.collect(),
SortBy::None => self
.splits
.values()
.filter(|split| split_query_predicate(split, query))
.skip(offset)
.take(limit)
.cloned()
.collect()
.collect(),
};
Ok(splits)
}
Expand Down
21 changes: 17 additions & 4 deletions quickwit/quickwit-metastore/src/metastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,14 @@ pub struct ListSplitsQuery {

/// Sorts the splits by staleness, i.e. by delete opstamp and publish timestamp in ascending
/// order.
pub sort_by_staleness: bool,
pub sort_by: SortBy,
}

#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
pub enum SortBy {
None,
Staleness,
IndexUid,
}

#[allow(unused_attributes)]
Expand All @@ -650,7 +657,7 @@ impl ListSplitsQuery {
update_timestamp: Default::default(),
create_timestamp: Default::default(),
mature: Bound::Unbounded,
sort_by_staleness: false,
sort_by: SortBy::None,
}
}

Expand All @@ -675,7 +682,7 @@ impl ListSplitsQuery {
update_timestamp: Default::default(),
create_timestamp: Default::default(),
mature: Bound::Unbounded,
sort_by_staleness: false,
sort_by: SortBy::None,
})
}

Expand Down Expand Up @@ -842,7 +849,13 @@ impl ListSplitsQuery {
/// Sorts the splits by staleness, i.e. by delete opstamp and publish timestamp in ascending
/// order.
pub fn sort_by_staleness(mut self) -> Self {
self.sort_by_staleness = true;
self.sort_by = SortBy::Staleness;
self
}

/// Sorts the splits by index_uid.
pub fn sort_by_index_uid(mut self) -> Self {
self.sort_by = SortBy::IndexUid;
self
}
}
Expand Down
53 changes: 35 additions & 18 deletions quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,9 @@ impl MetastoreService for PostgresqlMetastore {
) -> MetastoreResult<MetastoreServiceStream<ListSplitsResponse>> {
let list_splits_query = request.deserialize_list_splits_query()?;
let mut sql_query_builder = Query::select();
sql_query_builder.column(Asterisk).from(Splits::Table);
sql_query_builder
.column((Splits::Table, Asterisk))
.from(Splits::Table);
append_query_filters(&mut sql_query_builder, &list_splits_query);

let (sql_query, values) = sql_query_builder.build_sqlx(PostgresQueryBuilder);
Expand Down Expand Up @@ -1918,7 +1920,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "split_state" IN ('Staged')"#
r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND "split_state" IN ('Staged')"#
)
);

Expand All @@ -1932,7 +1934,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "split_state" IN ('Published')"#
r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND "split_state" IN ('Published')"#
)
);

Expand All @@ -1945,7 +1947,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "split_state" IN ('Published', 'MarkedForDeletion')"#
r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND "split_state" IN ('Published', 'MarkedForDeletion')"#
)
);

Expand All @@ -1957,7 +1959,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "update_timestamp" < TO_TIMESTAMP(51)"#
r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND "update_timestamp" < TO_TIMESTAMP(51)"#
)
);

Expand All @@ -1969,7 +1971,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "create_timestamp" <= TO_TIMESTAMP(55)"#
r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND "create_timestamp" <= TO_TIMESTAMP(55)"#
)
);

Expand All @@ -1984,7 +1986,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND ("maturity_timestamp" = TO_TIMESTAMP(0) OR "maturity_timestamp" <= TO_TIMESTAMP(55))"#
r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND ("maturity_timestamp" = TO_TIMESTAMP(0) OR "maturity_timestamp" <= TO_TIMESTAMP(55))"#
)
);

Expand All @@ -1997,7 +1999,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "maturity_timestamp" > TO_TIMESTAMP(55)"#
r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND "maturity_timestamp" > TO_TIMESTAMP(55)"#
)
);

Expand All @@ -2009,7 +2011,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "delete_opstamp" >= 4"#
r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND "delete_opstamp" >= 4"#
)
);

Expand All @@ -2021,7 +2023,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND ("time_range_end" > 45 OR "time_range_end" IS NULL)"#
r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND ("time_range_end" > 45 OR "time_range_end" IS NULL)"#
)
);

Expand All @@ -2033,7 +2035,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND ("time_range_start" < 45 OR "time_range_start" IS NULL)"#
r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND ("time_range_start" < 45 OR "time_range_start" IS NULL)"#
)
);

Expand All @@ -2050,7 +2052,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND (NOT ($$tag-2$$ = ANY(tags)))"#
r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND (NOT ($$tag-2$$ = ANY(tags)))"#
)
);

Expand All @@ -2063,7 +2065,22 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') ORDER BY "split_id" ASC OFFSET 4"#
r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') ORDER BY "split_id" ASC OFFSET 4"#
)
);

let mut select_statement = Query::select();
let sql = select_statement
.column((Splits::Table, Asterisk))
.from(Splits::Table);

let query = ListSplitsQuery::for_index(index_uid.clone()).sort_by_index_uid();
append_query_filters(sql, &query);

assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT "splits".* FROM "splits" JOIN "indexes" ON "splits"."index_uid" = "indexes"."index_uid" WHERE "splits"."index_uid" IN ('{index_uid}') ORDER BY "index_uid" ASC"#
)
);
}
Expand All @@ -2081,7 +2098,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND ("time_range_end" > 0 OR "time_range_end" IS NULL) AND ("time_range_start" < 40 OR "time_range_start" IS NULL)"#
r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND ("time_range_end" > 0 OR "time_range_end" IS NULL) AND ("time_range_start" < 40 OR "time_range_start" IS NULL)"#
)
);

Expand All @@ -2095,7 +2112,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND ("time_range_end" > 45 OR "time_range_end" IS NULL) AND "delete_opstamp" > 0"#
r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND ("time_range_end" > 45 OR "time_range_end" IS NULL) AND "delete_opstamp" > 0"#
)
);

Expand All @@ -2109,7 +2126,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "update_timestamp" < TO_TIMESTAMP(51) AND "create_timestamp" <= TO_TIMESTAMP(63)"#
r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND "update_timestamp" < TO_TIMESTAMP(51) AND "create_timestamp" <= TO_TIMESTAMP(63)"#
)
);

Expand All @@ -2126,7 +2143,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND ($$tag-1$$ = ANY(tags)) AND ("time_range_end" > 90 OR "time_range_end" IS NULL)"#
r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND ($$tag-1$$ = ANY(tags)) AND ("time_range_end" > 90 OR "time_range_end" IS NULL)"#
)
);

Expand All @@ -2141,7 +2158,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}', '{index_uid_2}')"#
r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}', '{index_uid_2}')"#
)
);
}
Expand Down
10 changes: 10 additions & 0 deletions quickwit/quickwit-metastore/src/metastore/postgres/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,16 @@ use tracing::error;

use crate::{IndexMetadata, Split, SplitMetadata, SplitState};

#[derive(Iden, Clone, Copy)]
#[allow(dead_code)]
pub enum Indexes {
Table,
IndexUid,
IndexId,
IndexMetadataJson,
CreateTimestamp,
}

/// A model structure for handling index metadata in a database.
#[derive(sqlx::FromRow)]
pub(super) struct PgIndex {
Expand Down
Loading

0 comments on commit ffbeb5b

Please sign in to comment.