From ffbeb5b80111c64633c80be0954a9c585e14a320 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Mon, 9 Sep 2024 10:46:48 +0200 Subject: [PATCH] better batching of delete operations --- .../src/garbage_collection.rs | 46 ++++++++-------- .../file_backed/file_backed_index/mod.rs | 24 ++++++--- .../quickwit-metastore/src/metastore/mod.rs | 21 ++++++-- .../src/metastore/postgres/metastore.rs | 53 ++++++++++++------- .../src/metastore/postgres/model.rs | 10 ++++ .../src/metastore/postgres/utils.rs | 41 ++++++++++++-- 6 files changed, 140 insertions(+), 55 deletions(-) diff --git a/quickwit/quickwit-index-management/src/garbage_collection.rs b/quickwit/quickwit-index-management/src/garbage_collection.rs index 76a4afa0790..03a6b7e88c6 100644 --- a/quickwit/quickwit-index-management/src/garbage_collection.rs +++ b/quickwit/quickwit-index-management/src/garbage_collection.rs @@ -41,7 +41,7 @@ use time::OffsetDateTime; use tracing::{error, instrument}; /// The maximum number of splits that the GC should delete per attempt. -const DELETE_SPLITS_BATCH_SIZE: usize = 1000; +const DELETE_SPLITS_BATCH_SIZE: usize = 10_000; /// [`DeleteSplitsError`] describes the errors that occurred during the deletion of splits from /// storage and metastore. @@ -182,12 +182,13 @@ async fn delete_splits_marked_for_deletion( let mut removed_splits = Vec::new(); let mut failed_splits = Vec::new(); - 'outer: loop { + loop { let mut exit = false; let query = ListSplitsQuery::try_from_index_uids(index_uids.clone())? .with_split_state(SplitState::MarkedForDeletion) .with_update_timestamp_lte(updated_before_timestamp) - .with_limit(DELETE_SPLITS_BATCH_SIZE); + .with_limit(DELETE_SPLITS_BATCH_SIZE) + .sort_by_index_uid(); let list_splits_request = match ListSplitsRequest::try_from_list_splits_query(&query) { Ok(request) => request, @@ -229,29 +230,30 @@ async fn delete_splits_marked_for_deletion( .into_group_map(); for (index_uid, splits_metadata_to_delete) in splits_metadata_to_delete_per_index { - let Some(storage) = storages.get(&index_uid).cloned() else { + if let Some(storage) = storages.get(&index_uid).cloned() { + let delete_splits_result = delete_splits_from_storage_and_metastore( + index_uid, + storage, + metastore.clone(), + splits_metadata_to_delete, + progress_opt, + ) + .await; + + match delete_splits_result { + Ok(entries) => removed_splits.extend(entries), + Err(delete_splits_error) => { + failed_splits.extend(delete_splits_error.storage_failures); + failed_splits.extend(delete_splits_error.metastore_failures); + exit = true; + } + } + } else { error!("we are trying to GC without knowing the storage, this shouldn't happen"); // we stop there, or we could easily end up looping indefinitely if there are more // than DELETE_SPLITS_BATCH_SIZE to delete in this index - break 'outer; + exit = true; }; - let delete_splits_result = delete_splits_from_storage_and_metastore( - index_uid, - storage, - metastore.clone(), - splits_metadata_to_delete, - progress_opt, - ) - .await; - - match delete_splits_result { - Ok(entries) => removed_splits.extend(entries), - Err(delete_splits_error) => { - failed_splits.extend(delete_splits_error.storage_failures); - failed_splits.extend(delete_splits_error.metastore_failures); - exit = true; - } - } } if num_splits_to_delete < DELETE_SPLITS_BATCH_SIZE || exit { // stop the gc if this was the last batch or we encountered an error diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs index b1bc18d3f71..426a91530b5 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs @@ -48,7 +48,7 @@ use tracing::{info, warn}; use super::MutationOccurred; use crate::checkpoint::IndexCheckpointDelta; -use crate::metastore::use_shard_api; +use crate::metastore::{use_shard_api, SortBy}; use crate::{split_tag_filter, IndexMetadata, ListSplitsQuery, Split, SplitMetadata, SplitState}; /// A `FileBackedIndex` object carries an index metadata and its split metadata. @@ -428,8 +428,9 @@ impl FileBackedIndex { let limit = query.limit.unwrap_or(usize::MAX); let offset = query.offset.unwrap_or_default(); - let splits: Vec = if query.sort_by_staleness { - self.splits + let splits: Vec = match query.sort_by { + SortBy::Staleness => self + .splits .values() .filter(|split| split_query_predicate(split, query)) .sorted_unstable_by(|left_split, right_split| { @@ -446,15 +447,24 @@ impl FileBackedIndex { .skip(offset) .take(limit) .cloned() - .collect() - } else { - self.splits + .collect(), + SortBy::IndexUid => self + .splits + .values() + .filter(|split| split_query_predicate(split, query)) + .sorted_unstable_by_key(|split| &split.split_metadata.index_uid) + .skip(offset) + .take(limit) + .cloned() + .collect(), + SortBy::None => self + .splits .values() .filter(|split| split_query_predicate(split, query)) .skip(offset) .take(limit) .cloned() - .collect() + .collect(), }; Ok(splits) } diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index 148d8de342e..758e5b45cac 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -631,7 +631,14 @@ pub struct ListSplitsQuery { /// Sorts the splits by staleness, i.e. by delete opstamp and publish timestamp in ascending /// order. - pub sort_by_staleness: bool, + pub sort_by: SortBy, +} + +#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)] +pub enum SortBy { + None, + Staleness, + IndexUid, } #[allow(unused_attributes)] @@ -650,7 +657,7 @@ impl ListSplitsQuery { update_timestamp: Default::default(), create_timestamp: Default::default(), mature: Bound::Unbounded, - sort_by_staleness: false, + sort_by: SortBy::None, } } @@ -675,7 +682,7 @@ impl ListSplitsQuery { update_timestamp: Default::default(), create_timestamp: Default::default(), mature: Bound::Unbounded, - sort_by_staleness: false, + sort_by: SortBy::None, }) } @@ -842,7 +849,13 @@ impl ListSplitsQuery { /// Sorts the splits by staleness, i.e. by delete opstamp and publish timestamp in ascending /// order. pub fn sort_by_staleness(mut self) -> Self { - self.sort_by_staleness = true; + self.sort_by = SortBy::Staleness; + self + } + + /// Sorts the splits by index_uid. + pub fn sort_by_index_uid(mut self) -> Self { + self.sort_by = SortBy::IndexUid; self } } diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index 20064cae706..c9f554875ba 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -861,7 +861,9 @@ impl MetastoreService for PostgresqlMetastore { ) -> MetastoreResult> { let list_splits_query = request.deserialize_list_splits_query()?; let mut sql_query_builder = Query::select(); - sql_query_builder.column(Asterisk).from(Splits::Table); + sql_query_builder + .column((Splits::Table, Asterisk)) + .from(Splits::Table); append_query_filters(&mut sql_query_builder, &list_splits_query); let (sql_query, values) = sql_query_builder.build_sqlx(PostgresQueryBuilder); @@ -1918,7 +1920,7 @@ mod tests { assert_eq!( sql.to_string(PostgresQueryBuilder), format!( - r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "split_state" IN ('Staged')"# + r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND "split_state" IN ('Staged')"# ) ); @@ -1932,7 +1934,7 @@ mod tests { assert_eq!( sql.to_string(PostgresQueryBuilder), format!( - r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "split_state" IN ('Published')"# + r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND "split_state" IN ('Published')"# ) ); @@ -1945,7 +1947,7 @@ mod tests { assert_eq!( sql.to_string(PostgresQueryBuilder), format!( - r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "split_state" IN ('Published', 'MarkedForDeletion')"# + r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND "split_state" IN ('Published', 'MarkedForDeletion')"# ) ); @@ -1957,7 +1959,7 @@ mod tests { assert_eq!( sql.to_string(PostgresQueryBuilder), format!( - r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "update_timestamp" < TO_TIMESTAMP(51)"# + r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND "update_timestamp" < TO_TIMESTAMP(51)"# ) ); @@ -1969,7 +1971,7 @@ mod tests { assert_eq!( sql.to_string(PostgresQueryBuilder), format!( - r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "create_timestamp" <= TO_TIMESTAMP(55)"# + r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND "create_timestamp" <= TO_TIMESTAMP(55)"# ) ); @@ -1984,7 +1986,7 @@ mod tests { assert_eq!( sql.to_string(PostgresQueryBuilder), format!( - r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND ("maturity_timestamp" = TO_TIMESTAMP(0) OR "maturity_timestamp" <= TO_TIMESTAMP(55))"# + r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND ("maturity_timestamp" = TO_TIMESTAMP(0) OR "maturity_timestamp" <= TO_TIMESTAMP(55))"# ) ); @@ -1997,7 +1999,7 @@ mod tests { assert_eq!( sql.to_string(PostgresQueryBuilder), format!( - r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "maturity_timestamp" > TO_TIMESTAMP(55)"# + r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND "maturity_timestamp" > TO_TIMESTAMP(55)"# ) ); @@ -2009,7 +2011,7 @@ mod tests { assert_eq!( sql.to_string(PostgresQueryBuilder), format!( - r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "delete_opstamp" >= 4"# + r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND "delete_opstamp" >= 4"# ) ); @@ -2021,7 +2023,7 @@ mod tests { assert_eq!( sql.to_string(PostgresQueryBuilder), format!( - r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND ("time_range_end" > 45 OR "time_range_end" IS NULL)"# + r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND ("time_range_end" > 45 OR "time_range_end" IS NULL)"# ) ); @@ -2033,7 +2035,7 @@ mod tests { assert_eq!( sql.to_string(PostgresQueryBuilder), format!( - r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND ("time_range_start" < 45 OR "time_range_start" IS NULL)"# + r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND ("time_range_start" < 45 OR "time_range_start" IS NULL)"# ) ); @@ -2050,7 +2052,7 @@ mod tests { assert_eq!( sql.to_string(PostgresQueryBuilder), format!( - r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND (NOT ($$tag-2$$ = ANY(tags)))"# + r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND (NOT ($$tag-2$$ = ANY(tags)))"# ) ); @@ -2063,7 +2065,22 @@ mod tests { assert_eq!( sql.to_string(PostgresQueryBuilder), format!( - r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') ORDER BY "split_id" ASC OFFSET 4"# + r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') ORDER BY "split_id" ASC OFFSET 4"# + ) + ); + + let mut select_statement = Query::select(); + let sql = select_statement + .column((Splits::Table, Asterisk)) + .from(Splits::Table); + + let query = ListSplitsQuery::for_index(index_uid.clone()).sort_by_index_uid(); + append_query_filters(sql, &query); + + assert_eq!( + sql.to_string(PostgresQueryBuilder), + format!( + r#"SELECT "splits".* FROM "splits" JOIN "indexes" ON "splits"."index_uid" = "indexes"."index_uid" WHERE "splits"."index_uid" IN ('{index_uid}') ORDER BY "index_uid" ASC"# ) ); } @@ -2081,7 +2098,7 @@ mod tests { assert_eq!( sql.to_string(PostgresQueryBuilder), format!( - r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND ("time_range_end" > 0 OR "time_range_end" IS NULL) AND ("time_range_start" < 40 OR "time_range_start" IS NULL)"# + r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND ("time_range_end" > 0 OR "time_range_end" IS NULL) AND ("time_range_start" < 40 OR "time_range_start" IS NULL)"# ) ); @@ -2095,7 +2112,7 @@ mod tests { assert_eq!( sql.to_string(PostgresQueryBuilder), format!( - r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND ("time_range_end" > 45 OR "time_range_end" IS NULL) AND "delete_opstamp" > 0"# + r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND ("time_range_end" > 45 OR "time_range_end" IS NULL) AND "delete_opstamp" > 0"# ) ); @@ -2109,7 +2126,7 @@ mod tests { assert_eq!( sql.to_string(PostgresQueryBuilder), format!( - r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "update_timestamp" < TO_TIMESTAMP(51) AND "create_timestamp" <= TO_TIMESTAMP(63)"# + r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND "update_timestamp" < TO_TIMESTAMP(51) AND "create_timestamp" <= TO_TIMESTAMP(63)"# ) ); @@ -2126,7 +2143,7 @@ mod tests { assert_eq!( sql.to_string(PostgresQueryBuilder), format!( - r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND ($$tag-1$$ = ANY(tags)) AND ("time_range_end" > 90 OR "time_range_end" IS NULL)"# + r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND ($$tag-1$$ = ANY(tags)) AND ("time_range_end" > 90 OR "time_range_end" IS NULL)"# ) ); @@ -2141,7 +2158,7 @@ mod tests { assert_eq!( sql.to_string(PostgresQueryBuilder), format!( - r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}', '{index_uid_2}')"# + r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}', '{index_uid_2}')"# ) ); } diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/model.rs b/quickwit/quickwit-metastore/src/metastore/postgres/model.rs index c745b364643..e7baac2e6f2 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/model.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/model.rs @@ -30,6 +30,16 @@ use tracing::error; use crate::{IndexMetadata, Split, SplitMetadata, SplitState}; +#[derive(Iden, Clone, Copy)] +#[allow(dead_code)] +pub enum Indexes { + Table, + IndexUid, + IndexId, + IndexMetadataJson, + CreateTimestamp, +} + /// A model structure for handling index metadata in a database. #[derive(sqlx::FromRow)] pub(super) struct PgIndex { diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs b/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs index 63d1f6722f7..c76a0cac673 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs @@ -24,16 +24,16 @@ use std::time::Duration; use quickwit_common::uri::Uri; use quickwit_proto::metastore::{MetastoreError, MetastoreResult}; -use sea_query::{any, Expr, Func, Order, SelectStatement}; +use sea_query::{any, Expr, Func, JoinType, Order, SelectStatement}; use sqlx::postgres::{PgConnectOptions, PgPoolOptions}; use sqlx::{ConnectOptions, Postgres}; use tracing::error; use tracing::log::LevelFilter; -use super::model::{Splits, ToTimestampFunc}; +use super::model::{Indexes, Splits, ToTimestampFunc}; use super::pool::TrackedPool; use super::tags::generate_sql_condition; -use crate::metastore::FilterRange; +use crate::metastore::{FilterRange, SortBy}; use crate::{ListSplitsQuery, SplitMaturity, SplitMetadata}; /// Establishes a connection to the given database URI. @@ -96,7 +96,7 @@ pub(super) fn append_range_filters( pub(super) fn append_query_filters(sql: &mut SelectStatement, query: &ListSplitsQuery) { // Note: `ListSplitsQuery` builder enforces a non empty `index_uids` list. - sql.cond_where(Expr::col(Splits::IndexUid).is_in(&query.index_uids)); + sql.cond_where(Expr::col((Splits::Table, Splits::IndexUid)).is_in(&query.index_uids)); if let Some(node_id) = &query.node_id { sql.cond_where(Expr::col(Splits::NodeId).eq(node_id)); @@ -179,6 +179,39 @@ pub(super) fn append_query_filters(sql: &mut SelectStatement, query: &ListSplits Expr::expr(val) }); + match query.sort_by { + SortBy::Staleness => { + sql.order_by( + (Splits::DeleteOpstamp, Splits::PublishTimestamp), + Order::Asc, + ); + } + SortBy::IndexUid => { + // this order by can be fairly costly, + // from testing, adding a join here was way faster, because we do an index-only scan on + // indexes.index_uid, nested-loop merged with a bitmap index scan on splits.index_uid, + // filter for our conditions, and just take the first N results. This is guaranteed to + // return correct result because indexes.index_uid is a non-null foreign key + // + // We also need to do .column((Splits::Table, Asterisk)) from the caller side, to not + // return unexpected columns + // + // On the other hand, without join, we do a seq scan on splits, sort everything, and + // truncate. + // + // Or we could just add a btree index to splits.index_uid. That might be the better + // long term solution. + sql.join( + JoinType::Join, + Indexes::Table, + Expr::col((Splits::Table, Splits::IndexUid)) + .equals((Indexes::Table, Indexes::IndexUid)), + ) + .order_by(Splits::IndexUid, Order::Asc); + } + SortBy::None => (), + } + if let Some(limit) = query.limit { sql.limit(limit as u64); }