diff --git a/quickwit/quickwit-index-management/src/garbage_collection.rs b/quickwit/quickwit-index-management/src/garbage_collection.rs index 76a4afa0790..4a24aedc012 100644 --- a/quickwit/quickwit-index-management/src/garbage_collection.rs +++ b/quickwit/quickwit-index-management/src/garbage_collection.rs @@ -22,17 +22,18 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; -use futures::Future; +use anyhow::Context; +use futures::{Future, StreamExt}; use itertools::Itertools; use quickwit_common::pretty::PrettySample; -use quickwit_common::{Progress, ServiceStream}; +use quickwit_common::Progress; use quickwit_metastore::{ ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitInfo, SplitMetadata, SplitState, }; use quickwit_proto::metastore::{ - DeleteSplitsRequest, ListSplitsRequest, ListSplitsResponse, MarkSplitsForDeletionRequest, - MetastoreError, MetastoreResult, MetastoreService, MetastoreServiceClient, + DeleteSplitsRequest, ListSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError, + MetastoreService, MetastoreServiceClient, }; use quickwit_proto::types::{IndexUid, SplitId}; use quickwit_storage::{BulkDeleteError, Storage}; @@ -41,7 +42,7 @@ use time::OffsetDateTime; use tracing::{error, instrument}; /// The maximum number of splits that the GC should delete per attempt. -const DELETE_SPLITS_BATCH_SIZE: usize = 1000; +const DELETE_SPLITS_BATCH_SIZE: usize = 10_000; /// [`DeleteSplitsError`] describes the errors that occurred during the deletion of splits from /// storage and metastore. @@ -67,7 +68,7 @@ where Fut: Future { } /// Information on what splits have and have not been cleaned up by the GC. -#[derive(Debug)] +#[derive(Debug, Default)] pub struct SplitRemovalInfo { /// The set of splits that have been removed. pub removed_split_entries: Vec, @@ -94,17 +95,23 @@ pub async fn run_garbage_collect( dry_run: bool, progress_opt: Option<&Progress>, ) -> anyhow::Result { - // Select staged splits with staging timestamp older than grace period timestamp. let grace_period_timestamp = OffsetDateTime::now_utc().unix_timestamp() - staged_grace_period.as_secs() as i64; let index_uids: Vec = indexes.keys().cloned().collect(); - let query = ListSplitsQuery::try_from_index_uids(index_uids.clone())? + let Some(list_splits_query_for_index_uids) = + ListSplitsQuery::try_from_index_uids(index_uids.clone()) + else { + return Ok(SplitRemovalInfo::default()); + }; + let list_splits_query = list_splits_query_for_index_uids + .clone() .with_split_state(SplitState::Staged) .with_update_timestamp_lte(grace_period_timestamp); - let list_deletable_staged_request = ListSplitsRequest::try_from_list_splits_query(&query)?; + let list_deletable_staged_request = + ListSplitsRequest::try_from_list_splits_query(&list_splits_query)?; let deletable_staged_splits: Vec = protect_future( progress_opt, metastore.list_splits(list_deletable_staged_request), @@ -114,8 +121,8 @@ pub async fn run_garbage_collect( .await?; if dry_run { - let marked_for_deletion_query = ListSplitsQuery::try_from_index_uids(index_uids.clone())? - .with_split_state(SplitState::MarkedForDeletion); + let marked_for_deletion_query = + list_splits_query_for_index_uids.with_split_state(SplitState::MarkedForDeletion); let marked_for_deletion_request = ListSplitsRequest::try_from_list_splits_query(&marked_for_deletion_query)?; let mut splits_marked_for_deletion: Vec = protect_future( @@ -157,64 +164,141 @@ pub async fn run_garbage_collect( let updated_before_timestamp = OffsetDateTime::now_utc().unix_timestamp() - deletion_grace_period.as_secs() as i64; - delete_splits_marked_for_deletion( + Ok(delete_splits_marked_for_deletion_several_indexes( index_uids, updated_before_timestamp, metastore, indexes, progress_opt, ) - .await + .await) } -#[instrument(skip(index_uids, storages, metastore, progress_opt), fields(num_indexes=%index_uids.len()))] + +async fn delete_splits( + splits_metadata_to_delete_per_index: HashMap>, + storages: &HashMap>, + metastore: MetastoreServiceClient, + progress_opt: Option<&Progress>, + split_removal_info: &mut SplitRemovalInfo, +) -> Result<(), ()> { + let mut delete_split_from_index_res_stream = + futures::stream::iter(splits_metadata_to_delete_per_index) + .map(|(index_uid, splits_metadata_to_delete)| { + let storage = storages.get(&index_uid).cloned(); + let metastore = metastore.clone(); + async move { + if let Some(storage) = storage { + delete_splits_from_storage_and_metastore( + index_uid, + storage, + metastore, + splits_metadata_to_delete, + progress_opt, + ) + .await + } else { + error!( + "we are trying to GC without knowing the storage, this shouldn't \ + happen" + ); + Err(DeleteSplitsError { + successes: Vec::new(), + storage_error: None, + storage_failures: splits_metadata_to_delete + .into_iter() + .map(|split| split.as_split_info()) + .collect(), + metastore_error: None, + metastore_failures: Vec::new(), + }) + } + } + }) + .buffer_unordered(10); + let mut error_encountered = false; + while let Some(delete_split_result) = delete_split_from_index_res_stream.next().await { + match delete_split_result { + Ok(entries) => { + split_removal_info.removed_split_entries.extend(entries); + } + Err(delete_split_error) => { + split_removal_info + .removed_split_entries + .extend(delete_split_error.successes); + split_removal_info + .failed_splits + .extend(delete_split_error.storage_failures); + split_removal_info + .failed_splits + .extend(delete_split_error.metastore_failures); + error_encountered = true; + } + } + } + if error_encountered { + Err(()) + } else { + Ok(()) + } +} + +/// Fetch the list metadata from the metastore and returns them as a Vec. +async fn list_splits_metadata( + metastore: &MetastoreServiceClient, + query: &ListSplitsQuery, +) -> anyhow::Result> { + let list_splits_request = ListSplitsRequest::try_from_list_splits_query(query) + .context("failed to build list splits request")?; + let splits_to_delete_stream = metastore + .list_splits(list_splits_request) + .await + .context("failed to fetch stream splits")?; + let splits = splits_to_delete_stream + .collect_splits_metadata() + .await + .context("failed to collect splits")?; + Ok(splits) +} + /// Removes any splits marked for deletion which haven't been /// updated after `updated_before_timestamp` in batches of 1000 splits. /// /// The aim of this is to spread the load out across a longer period /// rather than short, heavy bursts on the metastore and storage system itself. -async fn delete_splits_marked_for_deletion( +#[instrument(skip(index_uids, storages, metastore, progress_opt), fields(num_indexes=%index_uids.len()))] +async fn delete_splits_marked_for_deletion_several_indexes( index_uids: Vec, updated_before_timestamp: i64, metastore: MetastoreServiceClient, storages: HashMap>, progress_opt: Option<&Progress>, -) -> anyhow::Result { - let mut removed_splits = Vec::new(); - let mut failed_splits = Vec::new(); - - 'outer: loop { - let mut exit = false; - let query = ListSplitsQuery::try_from_index_uids(index_uids.clone())? - .with_split_state(SplitState::MarkedForDeletion) - .with_update_timestamp_lte(updated_before_timestamp) - .with_limit(DELETE_SPLITS_BATCH_SIZE); - - let list_splits_request = match ListSplitsRequest::try_from_list_splits_query(&query) { - Ok(request) => request, - Err(error) => { - error!(error = ?error, "failed to build list splits request"); +) -> SplitRemovalInfo { + let mut split_removal_info = SplitRemovalInfo::default(); + + let Some(list_splits_query) = ListSplitsQuery::try_from_index_uids(index_uids) else { + error!("failed to create list splits query. this should never happen"); + return split_removal_info; + }; + + let list_splits_query = list_splits_query + .with_split_state(SplitState::MarkedForDeletion) + .with_update_timestamp_lte(updated_before_timestamp) + .with_limit(DELETE_SPLITS_BATCH_SIZE) + .sort_by_index_uid(); + + loop { + let splits_metadata_to_delete: Vec = match protect_future( + progress_opt, + list_splits_metadata(&metastore, &list_splits_query), + ) + .await + { + Ok(splits) => splits, + Err(list_splits_err) => { + error!(error=?list_splits_err, "failed to list splits"); break; } }; - let splits_stream_result = - protect_future(progress_opt, metastore.list_splits(list_splits_request)).await; - let splits_to_delete_stream: ServiceStream> = - match splits_stream_result { - Ok(splits_stream) => splits_stream, - Err(error) => { - error!(error = ?error, "failed to fetch stream splits"); - break; - } - }; - - let splits_metadata_to_delete: Vec = - match splits_to_delete_stream.collect_splits_metadata().await { - Ok(splits) => splits, - Err(error) => { - error!(error = ?error, "failed to collect splits"); - break; - } - }; let num_splits_to_delete = splits_metadata_to_delete.len(); @@ -228,41 +312,23 @@ async fn delete_splits_marked_for_deletion( .map(|meta| (meta.index_uid.clone(), meta)) .into_group_map(); - for (index_uid, splits_metadata_to_delete) in splits_metadata_to_delete_per_index { - let Some(storage) = storages.get(&index_uid).cloned() else { - error!("we are trying to GC without knowing the storage, this shouldn't happen"); - // we stop there, or we could easily end up looping indefinitely if there are more - // than DELETE_SPLITS_BATCH_SIZE to delete in this index - break 'outer; - }; - let delete_splits_result = delete_splits_from_storage_and_metastore( - index_uid, - storage, - metastore.clone(), - splits_metadata_to_delete, - progress_opt, - ) - .await; - - match delete_splits_result { - Ok(entries) => removed_splits.extend(entries), - Err(delete_splits_error) => { - failed_splits.extend(delete_splits_error.storage_failures); - failed_splits.extend(delete_splits_error.metastore_failures); - exit = true; - } - } - } - if num_splits_to_delete < DELETE_SPLITS_BATCH_SIZE || exit { + let delete_split_res = delete_splits( + splits_metadata_to_delete_per_index, + &storages, + metastore.clone(), + progress_opt, + &mut split_removal_info, + ) + .await; + + if num_splits_to_delete < DELETE_SPLITS_BATCH_SIZE || delete_split_res.is_err() { // stop the gc if this was the last batch or we encountered an error // (otherwise we might try deleting the same splits in an endless loop) break; } } - Ok(SplitRemovalInfo { - removed_split_entries: removed_splits, - failed_splits, - }) + + split_removal_info } /// Delete a list of splits from the storage and the metastore. diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs index b1bc18d3f71..426a91530b5 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs @@ -48,7 +48,7 @@ use tracing::{info, warn}; use super::MutationOccurred; use crate::checkpoint::IndexCheckpointDelta; -use crate::metastore::use_shard_api; +use crate::metastore::{use_shard_api, SortBy}; use crate::{split_tag_filter, IndexMetadata, ListSplitsQuery, Split, SplitMetadata, SplitState}; /// A `FileBackedIndex` object carries an index metadata and its split metadata. @@ -428,8 +428,9 @@ impl FileBackedIndex { let limit = query.limit.unwrap_or(usize::MAX); let offset = query.offset.unwrap_or_default(); - let splits: Vec = if query.sort_by_staleness { - self.splits + let splits: Vec = match query.sort_by { + SortBy::Staleness => self + .splits .values() .filter(|split| split_query_predicate(split, query)) .sorted_unstable_by(|left_split, right_split| { @@ -446,15 +447,24 @@ impl FileBackedIndex { .skip(offset) .take(limit) .cloned() - .collect() - } else { - self.splits + .collect(), + SortBy::IndexUid => self + .splits + .values() + .filter(|split| split_query_predicate(split, query)) + .sorted_unstable_by_key(|split| &split.split_metadata.index_uid) + .skip(offset) + .take(limit) + .cloned() + .collect(), + SortBy::None => self + .splits .values() .filter(|split| split_query_predicate(split, query)) .skip(offset) .take(limit) .cloned() - .collect() + .collect(), }; Ok(splits) } diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index 148d8de342e..06211e1f63a 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -631,7 +631,14 @@ pub struct ListSplitsQuery { /// Sorts the splits by staleness, i.e. by delete opstamp and publish timestamp in ascending /// order. - pub sort_by_staleness: bool, + pub sort_by: SortBy, +} + +#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)] +pub enum SortBy { + None, + Staleness, + IndexUid, } #[allow(unused_attributes)] @@ -650,20 +657,17 @@ impl ListSplitsQuery { update_timestamp: Default::default(), create_timestamp: Default::default(), mature: Bound::Unbounded, - sort_by_staleness: false, + sort_by: SortBy::None, } } /// Creates a new [`ListSplitsQuery`] from a non-empty list of index UIDs. - /// Returns an error if the list is empty. - pub fn try_from_index_uids(index_uids: Vec) -> MetastoreResult { + /// Returns None if the list is empty. + pub fn try_from_index_uids(index_uids: Vec) -> Option { if index_uids.is_empty() { - return Err(MetastoreError::Internal { - message: "ListSplitQuery should define at least one index uid".to_string(), - cause: "".to_string(), - }); + return None; } - Ok(Self { + Some(Self { index_uids, node_id: None, limit: None, @@ -675,7 +679,7 @@ impl ListSplitsQuery { update_timestamp: Default::default(), create_timestamp: Default::default(), mature: Bound::Unbounded, - sort_by_staleness: false, + sort_by: SortBy::None, }) } @@ -842,7 +846,13 @@ impl ListSplitsQuery { /// Sorts the splits by staleness, i.e. by delete opstamp and publish timestamp in ascending /// order. pub fn sort_by_staleness(mut self) -> Self { - self.sort_by_staleness = true; + self.sort_by = SortBy::Staleness; + self + } + + /// Sorts the splits by index_uid. + pub fn sort_by_index_uid(mut self) -> Self { + self.sort_by = SortBy::IndexUid; self } } diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index 20064cae706..c9f554875ba 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -861,7 +861,9 @@ impl MetastoreService for PostgresqlMetastore { ) -> MetastoreResult> { let list_splits_query = request.deserialize_list_splits_query()?; let mut sql_query_builder = Query::select(); - sql_query_builder.column(Asterisk).from(Splits::Table); + sql_query_builder + .column((Splits::Table, Asterisk)) + .from(Splits::Table); append_query_filters(&mut sql_query_builder, &list_splits_query); let (sql_query, values) = sql_query_builder.build_sqlx(PostgresQueryBuilder); @@ -1918,7 +1920,7 @@ mod tests { assert_eq!( sql.to_string(PostgresQueryBuilder), format!( - r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "split_state" IN ('Staged')"# + r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND "split_state" IN ('Staged')"# ) ); @@ -1932,7 +1934,7 @@ mod tests { assert_eq!( sql.to_string(PostgresQueryBuilder), format!( - r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "split_state" IN ('Published')"# + r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND "split_state" IN ('Published')"# ) ); @@ -1945,7 +1947,7 @@ mod tests { assert_eq!( sql.to_string(PostgresQueryBuilder), format!( - r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "split_state" IN ('Published', 'MarkedForDeletion')"# + r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND "split_state" IN ('Published', 'MarkedForDeletion')"# ) ); @@ -1957,7 +1959,7 @@ mod tests { assert_eq!( sql.to_string(PostgresQueryBuilder), format!( - r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "update_timestamp" < TO_TIMESTAMP(51)"# + r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND "update_timestamp" < TO_TIMESTAMP(51)"# ) ); @@ -1969,7 +1971,7 @@ mod tests { assert_eq!( sql.to_string(PostgresQueryBuilder), format!( - r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "create_timestamp" <= TO_TIMESTAMP(55)"# + r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND "create_timestamp" <= TO_TIMESTAMP(55)"# ) ); @@ -1984,7 +1986,7 @@ mod tests { assert_eq!( sql.to_string(PostgresQueryBuilder), format!( - r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND ("maturity_timestamp" = TO_TIMESTAMP(0) OR "maturity_timestamp" <= TO_TIMESTAMP(55))"# + r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND ("maturity_timestamp" = TO_TIMESTAMP(0) OR "maturity_timestamp" <= TO_TIMESTAMP(55))"# ) ); @@ -1997,7 +1999,7 @@ mod tests { assert_eq!( sql.to_string(PostgresQueryBuilder), format!( - r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "maturity_timestamp" > TO_TIMESTAMP(55)"# + r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND "maturity_timestamp" > TO_TIMESTAMP(55)"# ) ); @@ -2009,7 +2011,7 @@ mod tests { assert_eq!( sql.to_string(PostgresQueryBuilder), format!( - r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "delete_opstamp" >= 4"# + r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND "delete_opstamp" >= 4"# ) ); @@ -2021,7 +2023,7 @@ mod tests { assert_eq!( sql.to_string(PostgresQueryBuilder), format!( - r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND ("time_range_end" > 45 OR "time_range_end" IS NULL)"# + r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND ("time_range_end" > 45 OR "time_range_end" IS NULL)"# ) ); @@ -2033,7 +2035,7 @@ mod tests { assert_eq!( sql.to_string(PostgresQueryBuilder), format!( - r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND ("time_range_start" < 45 OR "time_range_start" IS NULL)"# + r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND ("time_range_start" < 45 OR "time_range_start" IS NULL)"# ) ); @@ -2050,7 +2052,7 @@ mod tests { assert_eq!( sql.to_string(PostgresQueryBuilder), format!( - r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND (NOT ($$tag-2$$ = ANY(tags)))"# + r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND (NOT ($$tag-2$$ = ANY(tags)))"# ) ); @@ -2063,7 +2065,22 @@ mod tests { assert_eq!( sql.to_string(PostgresQueryBuilder), format!( - r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') ORDER BY "split_id" ASC OFFSET 4"# + r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') ORDER BY "split_id" ASC OFFSET 4"# + ) + ); + + let mut select_statement = Query::select(); + let sql = select_statement + .column((Splits::Table, Asterisk)) + .from(Splits::Table); + + let query = ListSplitsQuery::for_index(index_uid.clone()).sort_by_index_uid(); + append_query_filters(sql, &query); + + assert_eq!( + sql.to_string(PostgresQueryBuilder), + format!( + r#"SELECT "splits".* FROM "splits" JOIN "indexes" ON "splits"."index_uid" = "indexes"."index_uid" WHERE "splits"."index_uid" IN ('{index_uid}') ORDER BY "index_uid" ASC"# ) ); } @@ -2081,7 +2098,7 @@ mod tests { assert_eq!( sql.to_string(PostgresQueryBuilder), format!( - r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND ("time_range_end" > 0 OR "time_range_end" IS NULL) AND ("time_range_start" < 40 OR "time_range_start" IS NULL)"# + r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND ("time_range_end" > 0 OR "time_range_end" IS NULL) AND ("time_range_start" < 40 OR "time_range_start" IS NULL)"# ) ); @@ -2095,7 +2112,7 @@ mod tests { assert_eq!( sql.to_string(PostgresQueryBuilder), format!( - r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND ("time_range_end" > 45 OR "time_range_end" IS NULL) AND "delete_opstamp" > 0"# + r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND ("time_range_end" > 45 OR "time_range_end" IS NULL) AND "delete_opstamp" > 0"# ) ); @@ -2109,7 +2126,7 @@ mod tests { assert_eq!( sql.to_string(PostgresQueryBuilder), format!( - r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "update_timestamp" < TO_TIMESTAMP(51) AND "create_timestamp" <= TO_TIMESTAMP(63)"# + r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND "update_timestamp" < TO_TIMESTAMP(51) AND "create_timestamp" <= TO_TIMESTAMP(63)"# ) ); @@ -2126,7 +2143,7 @@ mod tests { assert_eq!( sql.to_string(PostgresQueryBuilder), format!( - r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND ($$tag-1$$ = ANY(tags)) AND ("time_range_end" > 90 OR "time_range_end" IS NULL)"# + r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND ($$tag-1$$ = ANY(tags)) AND ("time_range_end" > 90 OR "time_range_end" IS NULL)"# ) ); @@ -2141,7 +2158,7 @@ mod tests { assert_eq!( sql.to_string(PostgresQueryBuilder), format!( - r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}', '{index_uid_2}')"# + r#"SELECT * FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}', '{index_uid_2}')"# ) ); } diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/model.rs b/quickwit/quickwit-metastore/src/metastore/postgres/model.rs index c745b364643..e7baac2e6f2 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/model.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/model.rs @@ -30,6 +30,16 @@ use tracing::error; use crate::{IndexMetadata, Split, SplitMetadata, SplitState}; +#[derive(Iden, Clone, Copy)] +#[allow(dead_code)] +pub enum Indexes { + Table, + IndexUid, + IndexId, + IndexMetadataJson, + CreateTimestamp, +} + /// A model structure for handling index metadata in a database. #[derive(sqlx::FromRow)] pub(super) struct PgIndex { diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs b/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs index 63d1f6722f7..c76a0cac673 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs @@ -24,16 +24,16 @@ use std::time::Duration; use quickwit_common::uri::Uri; use quickwit_proto::metastore::{MetastoreError, MetastoreResult}; -use sea_query::{any, Expr, Func, Order, SelectStatement}; +use sea_query::{any, Expr, Func, JoinType, Order, SelectStatement}; use sqlx::postgres::{PgConnectOptions, PgPoolOptions}; use sqlx::{ConnectOptions, Postgres}; use tracing::error; use tracing::log::LevelFilter; -use super::model::{Splits, ToTimestampFunc}; +use super::model::{Indexes, Splits, ToTimestampFunc}; use super::pool::TrackedPool; use super::tags::generate_sql_condition; -use crate::metastore::FilterRange; +use crate::metastore::{FilterRange, SortBy}; use crate::{ListSplitsQuery, SplitMaturity, SplitMetadata}; /// Establishes a connection to the given database URI. @@ -96,7 +96,7 @@ pub(super) fn append_range_filters( pub(super) fn append_query_filters(sql: &mut SelectStatement, query: &ListSplitsQuery) { // Note: `ListSplitsQuery` builder enforces a non empty `index_uids` list. - sql.cond_where(Expr::col(Splits::IndexUid).is_in(&query.index_uids)); + sql.cond_where(Expr::col((Splits::Table, Splits::IndexUid)).is_in(&query.index_uids)); if let Some(node_id) = &query.node_id { sql.cond_where(Expr::col(Splits::NodeId).eq(node_id)); @@ -179,6 +179,39 @@ pub(super) fn append_query_filters(sql: &mut SelectStatement, query: &ListSplits Expr::expr(val) }); + match query.sort_by { + SortBy::Staleness => { + sql.order_by( + (Splits::DeleteOpstamp, Splits::PublishTimestamp), + Order::Asc, + ); + } + SortBy::IndexUid => { + // this order by can be fairly costly, + // from testing, adding a join here was way faster, because we do an index-only scan on + // indexes.index_uid, nested-loop merged with a bitmap index scan on splits.index_uid, + // filter for our conditions, and just take the first N results. This is guaranteed to + // return correct result because indexes.index_uid is a non-null foreign key + // + // We also need to do .column((Splits::Table, Asterisk)) from the caller side, to not + // return unexpected columns + // + // On the other hand, without join, we do a seq scan on splits, sort everything, and + // truncate. + // + // Or we could just add a btree index to splits.index_uid. That might be the better + // long term solution. + sql.join( + JoinType::Join, + Indexes::Table, + Expr::col((Splits::Table, Splits::IndexUid)) + .equals((Indexes::Table, Indexes::IndexUid)), + ) + .order_by(Splits::IndexUid, Order::Asc); + } + SortBy::None => (), + } + if let Some(limit) = query.limit { sql.limit(limit as u64); } diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index b5e7a881c08..a09545d92b8 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -193,8 +193,10 @@ pub async fn list_relevant_splits( tags_filter_opt: Option, metastore: &mut MetastoreServiceClient, ) -> crate::Result> { - let mut query = - ListSplitsQuery::try_from_index_uids(index_uids)?.with_split_state(SplitState::Published); + let Some(mut query) = ListSplitsQuery::try_from_index_uids(index_uids) else { + return Ok(Vec::new()); + }; + query = query.with_split_state(SplitState::Published); if let Some(start_ts) = start_timestamp { query = query.with_time_range_start_gte(start_ts); diff --git a/quickwit/quickwit-search/src/list_terms.rs b/quickwit/quickwit-search/src/list_terms.rs index 5bd6d5f32b4..0a781355162 100644 --- a/quickwit/quickwit-search/src/list_terms.rs +++ b/quickwit/quickwit-search/src/list_terms.rs @@ -90,8 +90,12 @@ pub async fn root_list_terms( .iter() .map(|index_metadata| index_metadata.index_uid.clone()) .collect(); - let mut query = quickwit_metastore::ListSplitsQuery::try_from_index_uids(index_uids)? - .with_split_state(quickwit_metastore::SplitState::Published); + + let Some(mut query) = quickwit_metastore::ListSplitsQuery::try_from_index_uids(index_uids) + else { + return Ok(ListTermsResponse::default()); + }; + query = query.with_split_state(quickwit_metastore::SplitState::Published); if let Some(start_ts) = list_terms_request.start_timestamp { query = query.with_time_range_start_gte(start_ts);