diff --git a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs index f733db8776c..604f5a918f9 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs @@ -201,7 +201,7 @@ impl MergePipeline { ); let query = ListSplitsQuery::for_index(self.params.pipeline_id.index_uid.clone()) .with_split_state(SplitState::Published) - .with_maturity_timestamp_lte(OffsetDateTime::now_utc().unix_timestamp()); + .is_mature(false, OffsetDateTime::now_utc()); let published_splits = ctx .protect_future(self.params.metastore.list_splits(query)) .await? @@ -441,7 +441,6 @@ pub struct MergePipelineParams { #[cfg(test)] mod tests { - use std::ops::Bound; use std::sync::Arc; use quickwit_actors::{ActorExitStatus, Universe}; @@ -450,7 +449,6 @@ mod tests { use quickwit_metastore::MockMetastore; use quickwit_proto::IndexUid; use quickwit_storage::RamStorage; - use time::OffsetDateTime; use crate::actors::merge_pipeline::{MergePipeline, MergePipelineParams}; use crate::merge_policy::default_merge_policy; @@ -476,18 +474,10 @@ mod tests { list_split_query.split_states, vec![quickwit_metastore::SplitState::Published] ); - match list_split_query.maturity_timestamp.end { - Bound::Included(maturity_timestamp_end) => { - assert!( - maturity_timestamp_end - < OffsetDateTime::now_utc().unix_timestamp() + 3600 - ); - assert!( - maturity_timestamp_end - > OffsetDateTime::now_utc().unix_timestamp() - 3600 - ) - } - _ => panic!("Expected unbounded maturity timestamp."), + if let Some(maturity_filter) = list_split_query.maturity { + assert!(!maturity_filter.mature); + } else { + panic!("Expected maturity filter."); } Ok(Vec::new()) }); diff --git a/quickwit/quickwit-indexing/src/actors/merge_planner.rs b/quickwit/quickwit-indexing/src/actors/merge_planner.rs index 970fc158033..74c4198c7fc 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_planner.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_planner.rs @@ -28,6 +28,7 @@ use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, Qu use quickwit_metastore::SplitMetadata; use serde::Serialize; use tantivy::Inventory; +use time::OffsetDateTime; use tracing::info; use crate::actors::MergeSplitDownloader; @@ -148,7 +149,7 @@ impl MergePlanner { } fn record_split(&mut self, new_split: SplitMetadata) { - if new_split.is_mature() { + if new_split.is_mature(OffsetDateTime::now_utc()) { return; } let splits_for_partition: &mut Vec = self diff --git a/quickwit/quickwit-indexing/src/merge_policy/const_write_amplification.rs b/quickwit/quickwit-indexing/src/merge_policy/const_write_amplification.rs index 53073f6bb7a..f266dde59ac 100644 --- a/quickwit/quickwit-indexing/src/merge_policy/const_write_amplification.rs +++ b/quickwit/quickwit-indexing/src/merge_policy/const_write_amplification.rs @@ -22,6 +22,7 @@ use std::collections::HashMap; use quickwit_config::merge_policy_config::ConstWriteAmplificationMergePolicyConfig; use quickwit_config::IndexingSettings; use quickwit_metastore::{SplitMaturity, SplitMetadata}; +use time::OffsetDateTime; use super::MergeOperation; use crate::merge_policy::MergePolicy; @@ -134,7 +135,7 @@ impl MergePolicy for ConstWriteAmplificationMergePolicy { let mut group_by_num_merge_ops: HashMap> = HashMap::default(); let mut mature_splits = Vec::new(); for split in splits.drain(..) { - if split.is_mature() { + if split.is_mature(OffsetDateTime::now_utc()) { mature_splits.push(split); } else { group_by_num_merge_ops diff --git a/quickwit/quickwit-indexing/src/merge_policy/mod.rs b/quickwit/quickwit-indexing/src/merge_policy/mod.rs index 8589fd2a25a..78793c6f305 100644 --- a/quickwit/quickwit-indexing/src/merge_policy/mod.rs +++ b/quickwit/quickwit-indexing/src/merge_policy/mod.rs @@ -303,7 +303,7 @@ pub mod tests { // merge_policy.operations(&mut splits).is_empty(), // "Merge policy are expected to return all available merge operations." // ); - + let now_utc = OffsetDateTime::now_utc(); for merge_op in &mut operations { assert_eq!(merge_op.operation_type, MergeOperationType::Merge, "A merge policy should only emit Merge operations." @@ -311,7 +311,7 @@ pub mod tests { assert!(merge_op.splits_as_slice().len() >= 2, "Merge policies should not suggest merging a single split."); for split in merge_op.splits_as_slice() { - assert!(!split.is_mature(), "Merges should not contain mature splits."); + assert!(!split.is_mature(now_utc), "Merges should not contain mature splits."); } merge_policy.check_is_valid(merge_op, &splits[..]); } diff --git a/quickwit/quickwit-indexing/src/merge_policy/stable_log_merge_policy.rs b/quickwit/quickwit-indexing/src/merge_policy/stable_log_merge_policy.rs index 8666a2f7cd2..6e3c8bc29c2 100644 --- a/quickwit/quickwit-indexing/src/merge_policy/stable_log_merge_policy.rs +++ b/quickwit/quickwit-indexing/src/merge_policy/stable_log_merge_policy.rs @@ -23,6 +23,7 @@ use std::ops::Range; use quickwit_config::merge_policy_config::StableLogMergePolicyConfig; use quickwit_config::IndexingSettings; use quickwit_metastore::{SplitMaturity, SplitMetadata}; +use time::OffsetDateTime; use tracing::debug; use crate::merge_policy::{splits_short_debug, MergeOperation, MergePolicy}; @@ -187,7 +188,8 @@ impl StableLogMergePolicy { return Vec::new(); } // First we isolate splits that are mature. - let splits_not_for_merge = remove_matching_items(splits, |split| split.is_mature()); + let splits_not_for_merge = + remove_matching_items(splits, |split| split.is_mature(OffsetDateTime::now_utc())); let mut merge_operations: Vec = Vec::new(); splits.sort_unstable_by(cmp_splits_by_reverse_time_end); diff --git a/quickwit/quickwit-indexing/src/split_store/indexing_split_store.rs b/quickwit/quickwit-indexing/src/split_store/indexing_split_store.rs index 6aed2149553..aed43f73b0b 100644 --- a/quickwit/quickwit-indexing/src/split_store/indexing_split_store.rs +++ b/quickwit/quickwit-indexing/src/split_store/indexing_split_store.rs @@ -31,6 +31,7 @@ use quickwit_metastore::SplitMetadata; use quickwit_storage::{PutPayload, Storage, StorageResult}; use tantivy::directory::MmapDirectory; use tantivy::{Advice, Directory}; +use time::OffsetDateTime; use tracing::{info, info_span, instrument, Instrument}; use super::LocalSplitStore; @@ -125,7 +126,7 @@ impl IndexingSplitStore { let split_num_bytes = put_payload.len(); let key = PathBuf::from(quickwit_common::split_file(split.split_id())); - let is_mature = split.is_mature(); + let is_mature = split.is_mature(OffsetDateTime::now_utc()); self.inner .remote_storage .put(&key, put_payload) diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs b/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs index 3e96935f249..bc1e60fd7a0 100644 --- a/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs +++ b/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs @@ -335,12 +335,9 @@ impl DeleteTaskPlanner { last_delete_opstamp = last_delete_opstamp, num_stale_splits_from_metastore = stale_splits.len() ); - // Keep only mature splits and splits that are not already part of ongoing delete - // operations. let ongoing_delete_operations = self.ongoing_delete_operations_inventory.list(); let filtered_splits = stale_splits .into_iter() - .filter(|stale_split| stale_split.split_metadata.is_mature()) .filter(|stale_split| { !ongoing_delete_operations.iter().any(|operation| { operation diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/file_backed_index/mod.rs index 89674510516..d135835f4a2 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/file_backed_index/mod.rs @@ -500,11 +500,11 @@ fn split_query_predicate(split: &&Split, query: &ListSplitsQuery) -> bool { return false; } - if !query - .maturity_timestamp - .contains(&split.split_metadata.maturity_timestamp()) - { - return false; + if let Some(maturity_filter) = &query.maturity { + let is_mature = split + .split_metadata + .is_mature(maturity_filter.evaluation_datetime); + return is_mature == maturity_filter.mature; } if let Some(range) = split.split_metadata.time_range.as_ref() { diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index efa28769c2f..c7248960a75 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -37,6 +37,7 @@ use quickwit_config::{IndexConfig, SourceConfig}; use quickwit_doc_mapper::tag_pruning::TagFilterAst; use quickwit_proto::metastore_api::{DeleteQuery, DeleteTask}; use quickwit_proto::IndexUid; +use time::OffsetDateTime; use crate::checkpoint::IndexCheckpointDelta; use crate::{MetastoreError, MetastoreResult, Split, SplitMetadata, SplitState}; @@ -209,7 +210,8 @@ pub trait Metastore: Send + Sync + 'static { ) -> MetastoreResult> { let query = ListSplitsQuery::for_index(index_uid) .with_delete_opstamp_lt(delete_opstamp) - .with_split_state(SplitState::Published); + .with_split_state(SplitState::Published) + .is_mature(true, OffsetDateTime::now_utc()); let mut splits = self.list_splits(query).await?; splits.sort_by(|split_left, split_right| { @@ -337,8 +339,14 @@ pub struct ListSplitsQuery { /// The create timestamp range to filter by. pub create_timestamp: FilterRange, - /// The maturity timestamp range to filter by. - pub maturity_timestamp: FilterRange, + /// Is the split mature or immature. + pub maturity: Option, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct SplitMaturityFilter { + pub mature: bool, + pub evaluation_datetime: OffsetDateTime, } #[allow(unused_attributes)] @@ -355,7 +363,7 @@ impl ListSplitsQuery { delete_opstamp: Default::default(), update_timestamp: Default::default(), create_timestamp: Default::default(), - maturity_timestamp: Default::default(), + maturity: None, } } @@ -501,31 +509,13 @@ impl ListSplitsQuery { self } - /// Set the field's lower bound to match values that are - /// *less than or equal to* the provided value. - pub fn with_maturity_timestamp_lte(mut self, v: i64) -> Self { - self.maturity_timestamp.end = Bound::Included(v); - self - } - - /// Set the field's lower bound to match values that are - /// *less than* the provided value. - pub fn with_maturity_timestamp_lt(mut self, v: i64) -> Self { - self.maturity_timestamp.end = Bound::Excluded(v); - self - } - - /// Set the field's upper bound to match values that are - /// *greater than or equal to* the provided value. - pub fn with_maturity_timestamp_gte(mut self, v: i64) -> Self { - self.maturity_timestamp.start = Bound::Included(v); - self - } - - /// Set the field's upper bound to match values that are - /// *greater than* the provided value. - pub fn with_maturity_timestamp_gt(mut self, v: i64) -> Self { - self.maturity_timestamp.start = Bound::Excluded(v); + /// Set the maturity filter to match splits that are mature or immature + /// at a given datetime. + pub fn is_mature(mut self, mature: bool, evaluation_datetime: OffsetDateTime) -> Self { + self.maturity = Some(SplitMaturityFilter { + mature, + evaluation_datetime, + }); self } } diff --git a/quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs index 6b784d7ec6c..dd61593af5f 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs @@ -48,7 +48,7 @@ use crate::metastore::postgresql_model::{ use crate::metastore::FilterRange; use crate::{ IndexMetadata, ListSplitsQuery, Metastore, MetastoreError, MetastoreFactory, - MetastoreResolverError, MetastoreResult, Split, SplitMetadata, SplitState, + MetastoreResolverError, MetastoreResult, Split, SplitMaturity, SplitMetadata, SplitState, }; static MIGRATOR: Migrator = sqlx::migrate!("migrations/postgresql"); @@ -269,6 +269,22 @@ fn build_query_filter(mut sql: String, query: &ListSplitsQuery) -> String { Bound::Unbounded => {} }; + if let Some(maturity_filter) = &query.maturity { + let evaluation_timestamp = maturity_filter.evaluation_datetime.unix_timestamp(); + if maturity_filter.mature { + let _ = write!( + sql, + " AND (maturity_timestamp = to_timestamp(0) OR \ + to_timestamp({evaluation_timestamp}) >= maturity_timestamp)" + ); + } else { + let _ = write!( + sql, + " AND to_timestamp({evaluation_timestamp}) < maturity_timestamp" + ); + } + } + // WARNING: Not SQL injection proof write_sql_filter( &mut sql, @@ -282,12 +298,6 @@ fn build_query_filter(mut sql: String, query: &ListSplitsQuery) -> String { &query.create_timestamp, |val| format!("to_timestamp({val})"), ); - write_sql_filter( - &mut sql, - "maturity_timestamp", - &query.maturity_timestamp, - |val| format!("to_timestamp({val})"), - ); write_sql_filter(&mut sql, "delete_opstamp", &query.delete_opstamp, |val| { val.to_string() }); @@ -303,6 +313,19 @@ fn build_query_filter(mut sql: String, query: &ListSplitsQuery) -> String { sql } +/// Returns the unix timestamp at which the split becomes mature. +/// If the split is mature (`SplitMaturity::Mature`), we return 0 +/// as we don't want to depend on the current time when we know for sure +/// that a split is mature. +fn split_maturity_timestamp(split_metadata: &SplitMetadata) -> i64 { + match split_metadata.maturity { + SplitMaturity::Mature => 0, + SplitMaturity::Immature { maturation_period } => { + split_metadata.create_timestamp + maturation_period.as_secs() as i64 + } + } +} + fn convert_sqlx_err(index_id: &str, sqlx_err: sqlx::Error) -> MetastoreError { match &sqlx_err { sqlx::Error::Database(boxed_db_err) => { @@ -493,7 +516,7 @@ impl Metastore for PostgresqlMetastore { .as_ref() .map(|range| *range.start()); time_range_start_list.push(time_range_start); - maturity_timestamps.push(split_metadata.maturity_timestamp()); + maturity_timestamps.push(split_maturity_timestamp(&split_metadata)); let time_range_end = split_metadata.time_range.map(|range| *range.end()); time_range_end_list.push(time_range_end); @@ -1096,6 +1119,7 @@ impl Metastore for PostgresqlMetastore { index_uid = $1 AND delete_opstamp < $2 AND split_state = $3 + AND (maturity_timestamp = to_timestamp(0) OR (CURRENT_TIMESTAMP AT TIME ZONE 'UTC') >= maturity_timestamp) ORDER BY delete_opstamp ASC, publish_timestamp ASC LIMIT $4 "#, @@ -1279,6 +1303,7 @@ metastore_test_suite!(crate::PostgresqlMetastore); mod tests { use quickwit_doc_mapper::tag_pruning::{no_tag, tag, TagFilterAst}; use quickwit_proto::IndexUid; + use time::OffsetDateTime; use super::{build_query_filter, tags_filter_expression_helper}; use crate::{ListSplitsQuery, SplitState}; @@ -1380,11 +1405,22 @@ mod tests { " WHERE index_uid = $1 AND create_timestamp <= to_timestamp(55)" ); - let query = ListSplitsQuery::for_index(index_uid.clone()).with_maturity_timestamp_lte(55); + let maturity_evaluation_datetime = OffsetDateTime::from_unix_timestamp(55).unwrap(); + let query = ListSplitsQuery::for_index(index_uid.clone()) + .is_mature(true, maturity_evaluation_datetime); + let sql = build_query_filter(String::new(), &query); + assert_eq!( + sql, + " WHERE index_uid = $1 AND (maturity_timestamp = to_timestamp(0) OR to_timestamp(55) \ + >= maturity_timestamp)" + ); + + let query = ListSplitsQuery::for_index(index_uid.clone()) + .is_mature(false, maturity_evaluation_datetime); let sql = build_query_filter(String::new(), &query); assert_eq!( sql, - " WHERE index_uid = $1 AND maturity_timestamp <= to_timestamp(55)" + " WHERE index_uid = $1 AND to_timestamp(55) < maturity_timestamp" ); let query = ListSplitsQuery::for_index(index_uid.clone()).with_delete_opstamp_gte(4); diff --git a/quickwit/quickwit-metastore/src/split_metadata.rs b/quickwit/quickwit-metastore/src/split_metadata.rs index d71c6bb5a50..be618044cee 100644 --- a/quickwit/quickwit-metastore/src/split_metadata.rs +++ b/quickwit/quickwit-metastore/src/split_metadata.rs @@ -157,27 +157,16 @@ impl SplitMetadata { &self.split_id } - /// Returns true if the split is mature at `utc_now_timetamp()`. - pub fn is_mature(&self) -> bool { + /// Returns true if the split is mature at the unix `timestamp`. + pub fn is_mature(&self, datetime: OffsetDateTime) -> bool { match self.maturity { SplitMaturity::Mature => true, SplitMaturity::Immature { maturation_period: time_to_maturity, - } => self.create_timestamp + time_to_maturity.as_secs() as i64 <= utc_now_timestamp(), - } - } - - /// Returns the unix timestamp at which the split becomes mature. - /// If the split is mature (`SplitMaturity::Mature`), we return 0. - /// This is handy only for metastore implementations that need to filter - /// on split maturity and/or for persisting the maturity timestamp - /// in a database. - pub(crate) fn maturity_timestamp(&self) -> i64 { - match self.maturity { - SplitMaturity::Mature => 0, - SplitMaturity::Immature { - maturation_period: time_to_maturity, - } => self.create_timestamp + time_to_maturity.as_secs() as i64, + } => { + self.create_timestamp + time_to_maturity.as_secs() as i64 + <= datetime.unix_timestamp() + } } } @@ -275,19 +264,17 @@ impl FromStr for SplitState { } /// `SplitMaturity` defines the maturity of a split, is is either `Mature` -/// or becomes mature after a given period. -/// A mature split does not undergo new merge operations. +/// or `Immature` with a given maturation period. /// The maturity is determined by the `MergePolicy`. #[derive(Clone, Copy, Debug, Default, Eq, Serialize, Deserialize, PartialEq)] #[serde(tag = "type")] #[serde(rename_all = "snake_case")] pub enum SplitMaturity { - /// Split is mature. + /// The split is mature and no longer a candidates for merges. #[default] Mature, - /// Period after which the split is mature. - /// Period is truncated to seconds - /// on serialization. + /// The split is immature and can undergo merges until `ripening_period` passes, + /// measured relatively from the split's creation timestamp. Immature { /// Time to maturity. #[serde(with = "serde_duration_millisecs")] diff --git a/quickwit/quickwit-metastore/src/tests.rs b/quickwit/quickwit-metastore/src/tests.rs index 0ab8c15db5c..1edc1348b5a 100644 --- a/quickwit/quickwit-metastore/src/tests.rs +++ b/quickwit/quickwit-metastore/src/tests.rs @@ -39,8 +39,9 @@ pub mod test_suite { use crate::checkpoint::{ IndexCheckpointDelta, PartitionId, Position, SourceCheckpoint, SourceCheckpointDelta, }; - use crate::split_metadata::SplitMaturity; - use crate::{ListSplitsQuery, Metastore, MetastoreError, Split, SplitMetadata, SplitState}; + use crate::{ + ListSplitsQuery, Metastore, MetastoreError, Split, SplitMaturity, SplitMetadata, SplitState, + }; #[async_trait] pub trait DefaultForTest { @@ -2079,9 +2080,11 @@ pub mod test_suite { &[&split_id_1, &split_id_2, &split_id_3, &split_id_6,] ); - // Test maturity_timestamp filter + // Test maturity filter + let maturity_evaluation_timestamp = + OffsetDateTime::from_unix_timestamp(current_timestamp).unwrap(); let query = ListSplitsQuery::for_index(index_uid.clone()) - .with_maturity_timestamp_lte(current_timestamp); + .is_mature(true, maturity_evaluation_timestamp); let splits = metastore.list_splits(query.clone()).await.unwrap(); let split_ids = collect_split_ids(&splits); assert_eq!( @@ -2090,10 +2093,10 @@ pub mod test_suite { ); let query = ListSplitsQuery::for_index(index_uid.clone()) - .with_maturity_timestamp_gte(current_timestamp); + .is_mature(false, maturity_evaluation_timestamp); let splits = metastore.list_splits(query.clone()).await.unwrap(); let split_ids = collect_split_ids(&splits); - assert_eq!(split_ids, &[&split_id_1, &split_id_2, &split_id_3]); + assert_eq!(split_ids, &[&split_id_2, &split_id_3]); cleanup_index(&metastore, index_uid).await; } @@ -2418,6 +2421,18 @@ pub mod test_suite { delete_opstamp: 20, ..Default::default() }; + // immature split + let split_id_5 = format!("{index_id}--split-5"); + let split_metadata_5 = SplitMetadata { + split_id: split_id_5.clone(), + index_uid: index_uid.clone(), + create_timestamp: current_timestamp, + maturity: SplitMaturity::Immature { + maturation_period: Duration::from_secs(100), + }, + delete_opstamp: 0, + ..Default::default() + }; let error = metastore .list_stale_splits(IndexUid::new("index-not-found"), 0, 10) @@ -2436,6 +2451,7 @@ pub mod test_suite { split_metadata_1.clone(), split_metadata_2.clone(), split_metadata_3.clone(), + split_metadata_5.clone(), ], ) .await @@ -2455,7 +2471,12 @@ pub mod test_suite { // Sleep for 1 second to have different publish timestamps. tokio::time::sleep(Duration::from_secs(1)).await; metastore - .publish_splits(index_uid.clone(), &[&split_id_1, &split_id_2], &[], None) + .publish_splits( + index_uid.clone(), + &[&split_id_1, &split_id_2, &split_id_5], + &[], + None, + ) .await .unwrap(); let splits = metastore