Skip to content

Commit

Permalink
Refactor split maturity to hide ()postgresql) implementation details …
Browse files Browse the repository at this point in the history
…and avoid confusing the reader.
  • Loading branch information
fmassot committed Jul 4, 2023
1 parent 964fb4f commit 9604a84
Show file tree
Hide file tree
Showing 12 changed files with 124 additions and 98 deletions.
20 changes: 5 additions & 15 deletions quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ impl MergePipeline {
);
let query = ListSplitsQuery::for_index(self.params.pipeline_id.index_uid.clone())
.with_split_state(SplitState::Published)
.with_maturity_timestamp_lte(OffsetDateTime::now_utc().unix_timestamp());
.is_mature(false, OffsetDateTime::now_utc());
let published_splits = ctx
.protect_future(self.params.metastore.list_splits(query))
.await?
Expand Down Expand Up @@ -441,7 +441,6 @@ pub struct MergePipelineParams {

#[cfg(test)]
mod tests {
use std::ops::Bound;
use std::sync::Arc;

use quickwit_actors::{ActorExitStatus, Universe};
Expand All @@ -450,7 +449,6 @@ mod tests {
use quickwit_metastore::MockMetastore;
use quickwit_proto::IndexUid;
use quickwit_storage::RamStorage;
use time::OffsetDateTime;

use crate::actors::merge_pipeline::{MergePipeline, MergePipelineParams};
use crate::merge_policy::default_merge_policy;
Expand All @@ -476,18 +474,10 @@ mod tests {
list_split_query.split_states,
vec![quickwit_metastore::SplitState::Published]
);
match list_split_query.maturity_timestamp.end {
Bound::Included(maturity_timestamp_end) => {
assert!(
maturity_timestamp_end
< OffsetDateTime::now_utc().unix_timestamp() + 3600
);
assert!(
maturity_timestamp_end
> OffsetDateTime::now_utc().unix_timestamp() - 3600
)
}
_ => panic!("Expected unbounded maturity timestamp."),
if let Some(maturity_filter) = list_split_query.maturity {
assert!(!maturity_filter.mature);
} else {
panic!("Expected maturity filter.");
}
Ok(Vec::new())
});
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-indexing/src/actors/merge_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, Qu
use quickwit_metastore::SplitMetadata;
use serde::Serialize;
use tantivy::Inventory;
use time::OffsetDateTime;
use tracing::info;

use crate::actors::MergeSplitDownloader;
Expand Down Expand Up @@ -148,7 +149,7 @@ impl MergePlanner {
}

fn record_split(&mut self, new_split: SplitMetadata) {
if new_split.is_mature() {
if new_split.is_mature(OffsetDateTime::now_utc()) {
return;
}
let splits_for_partition: &mut Vec<SplitMetadata> = self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::collections::HashMap;
use quickwit_config::merge_policy_config::ConstWriteAmplificationMergePolicyConfig;
use quickwit_config::IndexingSettings;
use quickwit_metastore::{SplitMaturity, SplitMetadata};
use time::OffsetDateTime;

use super::MergeOperation;
use crate::merge_policy::MergePolicy;
Expand Down Expand Up @@ -134,7 +135,7 @@ impl MergePolicy for ConstWriteAmplificationMergePolicy {
let mut group_by_num_merge_ops: HashMap<usize, Vec<SplitMetadata>> = HashMap::default();
let mut mature_splits = Vec::new();
for split in splits.drain(..) {
if split.is_mature() {
if split.is_mature(OffsetDateTime::now_utc()) {
mature_splits.push(split);
} else {
group_by_num_merge_ops
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-indexing/src/merge_policy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,15 +303,15 @@ pub mod tests {
// merge_policy.operations(&mut splits).is_empty(),
// "Merge policy are expected to return all available merge operations."
// );

let now_utc = OffsetDateTime::now_utc();
for merge_op in &mut operations {
assert_eq!(merge_op.operation_type, MergeOperationType::Merge,
"A merge policy should only emit Merge operations."
);
assert!(merge_op.splits_as_slice().len() >= 2,
"Merge policies should not suggest merging a single split.");
for split in merge_op.splits_as_slice() {
assert!(!split.is_mature(), "Merges should not contain mature splits.");
assert!(!split.is_mature(now_utc), "Merges should not contain mature splits.");
}
merge_policy.check_is_valid(merge_op, &splits[..]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::ops::Range;
use quickwit_config::merge_policy_config::StableLogMergePolicyConfig;
use quickwit_config::IndexingSettings;
use quickwit_metastore::{SplitMaturity, SplitMetadata};
use time::OffsetDateTime;
use tracing::debug;

use crate::merge_policy::{splits_short_debug, MergeOperation, MergePolicy};
Expand Down Expand Up @@ -187,7 +188,8 @@ impl StableLogMergePolicy {
return Vec::new();
}
// First we isolate splits that are mature.
let splits_not_for_merge = remove_matching_items(splits, |split| split.is_mature());
let splits_not_for_merge =
remove_matching_items(splits, |split| split.is_mature(OffsetDateTime::now_utc()));

let mut merge_operations: Vec<MergeOperation> = Vec::new();
splits.sort_unstable_by(cmp_splits_by_reverse_time_end);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use quickwit_metastore::SplitMetadata;
use quickwit_storage::{PutPayload, Storage, StorageResult};
use tantivy::directory::MmapDirectory;
use tantivy::{Advice, Directory};
use time::OffsetDateTime;
use tracing::{info, info_span, instrument, Instrument};

use super::LocalSplitStore;
Expand Down Expand Up @@ -125,7 +126,7 @@ impl IndexingSplitStore {
let split_num_bytes = put_payload.len();

let key = PathBuf::from(quickwit_common::split_file(split.split_id()));
let is_mature = split.is_mature();
let is_mature = split.is_mature(OffsetDateTime::now_utc());
self.inner
.remote_storage
.put(&key, put_payload)
Expand Down
3 changes: 0 additions & 3 deletions quickwit/quickwit-janitor/src/actors/delete_task_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,12 +335,9 @@ impl DeleteTaskPlanner {
last_delete_opstamp = last_delete_opstamp,
num_stale_splits_from_metastore = stale_splits.len()
);
// Keep only mature splits and splits that are not already part of ongoing delete
// operations.
let ongoing_delete_operations = self.ongoing_delete_operations_inventory.list();
let filtered_splits = stale_splits
.into_iter()
.filter(|stale_split| stale_split.split_metadata.is_mature())
.filter(|stale_split| {
!ongoing_delete_operations.iter().any(|operation| {
operation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,11 +500,11 @@ fn split_query_predicate(split: &&Split, query: &ListSplitsQuery) -> bool {
return false;
}

if !query
.maturity_timestamp
.contains(&split.split_metadata.maturity_timestamp())
{
return false;
if let Some(maturity_filter) = &query.maturity {
let is_mature = split
.split_metadata
.is_mature(maturity_filter.evaluation_datetime);
return is_mature == maturity_filter.mature;
}

if let Some(range) = split.split_metadata.time_range.as_ref() {
Expand Down
48 changes: 19 additions & 29 deletions quickwit/quickwit-metastore/src/metastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use quickwit_config::{IndexConfig, SourceConfig};
use quickwit_doc_mapper::tag_pruning::TagFilterAst;
use quickwit_proto::metastore_api::{DeleteQuery, DeleteTask};
use quickwit_proto::IndexUid;
use time::OffsetDateTime;

use crate::checkpoint::IndexCheckpointDelta;
use crate::{MetastoreError, MetastoreResult, Split, SplitMetadata, SplitState};
Expand Down Expand Up @@ -209,7 +210,8 @@ pub trait Metastore: Send + Sync + 'static {
) -> MetastoreResult<Vec<Split>> {
let query = ListSplitsQuery::for_index(index_uid)
.with_delete_opstamp_lt(delete_opstamp)
.with_split_state(SplitState::Published);
.with_split_state(SplitState::Published)
.is_mature(true, OffsetDateTime::now_utc());

let mut splits = self.list_splits(query).await?;
splits.sort_by(|split_left, split_right| {
Expand Down Expand Up @@ -337,8 +339,14 @@ pub struct ListSplitsQuery {
/// The create timestamp range to filter by.
pub create_timestamp: FilterRange<i64>,

/// The maturity timestamp range to filter by.
pub maturity_timestamp: FilterRange<i64>,
/// Is the split mature or immature.
pub maturity: Option<SplitMaturityFilter>,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct SplitMaturityFilter {
pub mature: bool,
pub evaluation_datetime: OffsetDateTime,
}

#[allow(unused_attributes)]
Expand All @@ -355,7 +363,7 @@ impl ListSplitsQuery {
delete_opstamp: Default::default(),
update_timestamp: Default::default(),
create_timestamp: Default::default(),
maturity_timestamp: Default::default(),
maturity: None,
}
}

Expand Down Expand Up @@ -501,31 +509,13 @@ impl ListSplitsQuery {
self
}

/// Set the field's lower bound to match values that are
/// *less than or equal to* the provided value.
pub fn with_maturity_timestamp_lte(mut self, v: i64) -> Self {
self.maturity_timestamp.end = Bound::Included(v);
self
}

/// Set the field's lower bound to match values that are
/// *less than* the provided value.
pub fn with_maturity_timestamp_lt(mut self, v: i64) -> Self {
self.maturity_timestamp.end = Bound::Excluded(v);
self
}

/// Set the field's upper bound to match values that are
/// *greater than or equal to* the provided value.
pub fn with_maturity_timestamp_gte(mut self, v: i64) -> Self {
self.maturity_timestamp.start = Bound::Included(v);
self
}

/// Set the field's upper bound to match values that are
/// *greater than* the provided value.
pub fn with_maturity_timestamp_gt(mut self, v: i64) -> Self {
self.maturity_timestamp.start = Bound::Excluded(v);
/// Set the maturity filter to match splits that are mature or immature
/// at a given datetime.
pub fn is_mature(mut self, mature: bool, evaluation_datetime: OffsetDateTime) -> Self {
self.maturity = Some(SplitMaturityFilter {
mature,
evaluation_datetime,
});
self
}
}
Expand Down
56 changes: 46 additions & 10 deletions quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use crate::metastore::postgresql_model::{
use crate::metastore::FilterRange;
use crate::{
IndexMetadata, ListSplitsQuery, Metastore, MetastoreError, MetastoreFactory,
MetastoreResolverError, MetastoreResult, Split, SplitMetadata, SplitState,
MetastoreResolverError, MetastoreResult, Split, SplitMaturity, SplitMetadata, SplitState,
};

static MIGRATOR: Migrator = sqlx::migrate!("migrations/postgresql");
Expand Down Expand Up @@ -269,6 +269,22 @@ fn build_query_filter(mut sql: String, query: &ListSplitsQuery) -> String {
Bound::Unbounded => {}
};

if let Some(maturity_filter) = &query.maturity {
let evaluation_timestamp = maturity_filter.evaluation_datetime.unix_timestamp();
if maturity_filter.mature {
let _ = write!(
sql,
" AND (maturity_timestamp = to_timestamp(0) OR \
to_timestamp({evaluation_timestamp}) >= maturity_timestamp)"
);
} else {
let _ = write!(
sql,
" AND to_timestamp({evaluation_timestamp}) < maturity_timestamp"
);
}
}

// WARNING: Not SQL injection proof
write_sql_filter(
&mut sql,
Expand All @@ -282,12 +298,6 @@ fn build_query_filter(mut sql: String, query: &ListSplitsQuery) -> String {
&query.create_timestamp,
|val| format!("to_timestamp({val})"),
);
write_sql_filter(
&mut sql,
"maturity_timestamp",
&query.maturity_timestamp,
|val| format!("to_timestamp({val})"),
);
write_sql_filter(&mut sql, "delete_opstamp", &query.delete_opstamp, |val| {
val.to_string()
});
Expand All @@ -303,6 +313,19 @@ fn build_query_filter(mut sql: String, query: &ListSplitsQuery) -> String {
sql
}

/// Returns the unix timestamp at which the split becomes mature.
/// If the split is mature (`SplitMaturity::Mature`), we return 0
/// as we don't want to depend on the current time when we know for sure
/// that a split is mature.
fn split_maturity_timestamp(split_metadata: &SplitMetadata) -> i64 {
match split_metadata.maturity {
SplitMaturity::Mature => 0,
SplitMaturity::Immature { maturation_period } => {
split_metadata.create_timestamp + maturation_period.as_secs() as i64
}
}
}

fn convert_sqlx_err(index_id: &str, sqlx_err: sqlx::Error) -> MetastoreError {
match &sqlx_err {
sqlx::Error::Database(boxed_db_err) => {
Expand Down Expand Up @@ -493,7 +516,7 @@ impl Metastore for PostgresqlMetastore {
.as_ref()
.map(|range| *range.start());
time_range_start_list.push(time_range_start);
maturity_timestamps.push(split_metadata.maturity_timestamp());
maturity_timestamps.push(split_maturity_timestamp(&split_metadata));

let time_range_end = split_metadata.time_range.map(|range| *range.end());
time_range_end_list.push(time_range_end);
Expand Down Expand Up @@ -1096,6 +1119,7 @@ impl Metastore for PostgresqlMetastore {
index_uid = $1
AND delete_opstamp < $2
AND split_state = $3
AND (maturity_timestamp = to_timestamp(0) OR (CURRENT_TIMESTAMP AT TIME ZONE 'UTC') >= maturity_timestamp)
ORDER BY delete_opstamp ASC, publish_timestamp ASC
LIMIT $4
"#,
Expand Down Expand Up @@ -1279,6 +1303,7 @@ metastore_test_suite!(crate::PostgresqlMetastore);
mod tests {
use quickwit_doc_mapper::tag_pruning::{no_tag, tag, TagFilterAst};
use quickwit_proto::IndexUid;
use time::OffsetDateTime;

use super::{build_query_filter, tags_filter_expression_helper};
use crate::{ListSplitsQuery, SplitState};
Expand Down Expand Up @@ -1380,11 +1405,22 @@ mod tests {
" WHERE index_uid = $1 AND create_timestamp <= to_timestamp(55)"
);

let query = ListSplitsQuery::for_index(index_uid.clone()).with_maturity_timestamp_lte(55);
let maturity_evaluation_datetime = OffsetDateTime::from_unix_timestamp(55).unwrap();
let query = ListSplitsQuery::for_index(index_uid.clone())
.is_mature(true, maturity_evaluation_datetime);
let sql = build_query_filter(String::new(), &query);
assert_eq!(
sql,
" WHERE index_uid = $1 AND (maturity_timestamp = to_timestamp(0) OR to_timestamp(55) \
>= maturity_timestamp)"
);

let query = ListSplitsQuery::for_index(index_uid.clone())
.is_mature(false, maturity_evaluation_datetime);
let sql = build_query_filter(String::new(), &query);
assert_eq!(
sql,
" WHERE index_uid = $1 AND maturity_timestamp <= to_timestamp(55)"
" WHERE index_uid = $1 AND to_timestamp(55) < maturity_timestamp"
);

let query = ListSplitsQuery::for_index(index_uid.clone()).with_delete_opstamp_gte(4);
Expand Down
Loading

0 comments on commit 9604a84

Please sign in to comment.