From 03220c61a3cbabe16b50edc5c5762b215c204782 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Tue, 8 Oct 2024 16:44:53 +0900 Subject: [PATCH] Integrating the finalize merge policy into merge policies themselves. --- .../quickwit-config/src/index_config/mod.rs | 3 --- .../src/merge_policy_config.rs | 23 +++++++++++++++++ .../src/actors/indexing_pipeline.rs | 3 +-- .../src/actors/indexing_service.rs | 7 +----- .../src/actors/merge_pipeline.rs | 5 +--- .../src/actors/merge_planner.rs | 25 ++++--------------- .../quickwit-indexing/src/actors/publisher.rs | 2 +- .../quickwit-indexing/src/merge_policy/mod.rs | 12 ++++++++- 8 files changed, 43 insertions(+), 37 deletions(-) diff --git a/quickwit/quickwit-config/src/index_config/mod.rs b/quickwit/quickwit-config/src/index_config/mod.rs index 8ef52c80656..60ea2a9429e 100644 --- a/quickwit/quickwit-config/src/index_config/mod.rs +++ b/quickwit/quickwit-config/src/index_config/mod.rs @@ -121,8 +121,6 @@ pub struct IndexingSettings { pub merge_policy: MergePolicyConfig, #[serde(default)] pub resources: IndexingResources, - #[serde(default = "MergePolicyConfig::noop")] - pub finalize_merge_policy: MergePolicyConfig, } impl IndexingSettings { @@ -163,7 +161,6 @@ impl Default for IndexingSettings { docstore_compression_level: Self::default_docstore_compression_level(), split_num_docs_target: Self::default_split_num_docs_target(), merge_policy: MergePolicyConfig::default(), - finalize_merge_policy: MergePolicyConfig::noop(), resources: IndexingResources::default(), } } diff --git a/quickwit/quickwit-config/src/merge_policy_config.rs b/quickwit/quickwit-config/src/merge_policy_config.rs index 757af7571b5..f71fa241c0d 100644 --- a/quickwit/quickwit-config/src/merge_policy_config.rs +++ b/quickwit/quickwit-config/src/merge_policy_config.rs @@ -21,6 +21,29 @@ use std::time::Duration; use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash, utoipa::ToSchema)] +#[serde(deny_unknown_fields)] +pub struct EndOfDayMergePolicyConfig { + /// Number of splits to merge together in a single merge operation. + #[serde(default = "default_merge_factor")] + pub merge_factor: usize, + /// Maximum number of splits that can be merged together in a single merge operation. + #[serde(default = "default_max_merge_factor")] + pub max_merge_factor: usize, + /// Maximum number of merges that a given split should undergo. + #[serde(default = "default_max_merge_ops")] + pub max_merge_ops: usize, + /// Duration relative to `split.created_timestamp` after which a split + /// becomes mature. + /// If `now() >= split.created_timestamp + maturation_period` then + /// the split is considered mature. + #[schema(value_type = String)] + #[serde(default = "default_maturation_period")] + #[serde(deserialize_with = "parse_human_duration")] + #[serde(serialize_with = "serialize_duration")] + pub maturation_period: Duration, +} + #[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash, utoipa::ToSchema)] #[serde(deny_unknown_fields)] pub struct ConstWriteAmplificationMergePolicyConfig { diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index 643ed0498cb..7ab58bb873f 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -619,7 +619,7 @@ mod tests { use super::{IndexingPipeline, *}; use crate::actors::merge_pipeline::{MergePipeline, MergePipelineParams}; - use crate::merge_policy::{default_merge_policy, NopMergePolicy}; + use crate::merge_policy::default_merge_policy; #[test] fn test_wait_duration() { @@ -908,7 +908,6 @@ mod tests { metastore: metastore.clone(), split_store: split_store.clone(), merge_policy: default_merge_policy(), - finalize_merge_policy: Arc::new(NopMergePolicy), max_concurrent_split_uploads: 2, merge_io_throughput_limiter_opt: None, merge_scheduler_service: universe.get_or_spawn_one(), diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index 7654c9e55e2..1f75b97990f 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -288,10 +288,6 @@ impl IndexingService { index_config.indexing_settings.merge_policy.clone(), &index_config.indexing_settings, ); - let finalize_merge_policy = crate::merge_policy::merge_policy_from_settings( - index_config.indexing_settings.finalize_merge_policy.clone(), - &index_config.indexing_settings, - ); let split_store = IndexingSplitStore::new(storage.clone(), self.local_split_store.clone()); let doc_mapper = build_doc_mapper(&index_config.doc_mapping, &index_config.search_settings) @@ -306,7 +302,6 @@ impl IndexingService { split_store: split_store.clone(), merge_scheduler_service: self.merge_scheduler_service.clone(), merge_policy: merge_policy.clone(), - finalize_merge_policy, merge_io_throughput_limiter_opt: self.merge_io_throughput_limiter_opt.clone(), max_concurrent_split_uploads: self.max_concurrent_split_uploads, event_broker: self.event_broker.clone(), @@ -1203,7 +1198,7 @@ mod tests { #[tokio::test] async fn test_indexing_service_apply_plan() { - const PARAMS_FINGERPRINT: u64 = 3865067856550546352u64; + const PARAMS_FINGERPRINT: u64 = 16367456605507375863u64; quickwit_common::setup_logging_for_tests(); let transport = ChannelTransport::default(); diff --git a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs index 61f8fa194c7..46e64b15b2c 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs @@ -352,7 +352,6 @@ impl MergePipeline { &self.params.pipeline_id, immature_splits, self.params.merge_policy.clone(), - self.params.finalize_merge_policy.clone(), merge_split_downloader_mailbox, self.params.merge_scheduler_service.clone(), ); @@ -561,7 +560,6 @@ pub struct MergePipelineParams { pub merge_scheduler_service: Mailbox, pub split_store: IndexingSplitStore, pub merge_policy: Arc, - pub finalize_merge_policy: Arc, pub max_concurrent_split_uploads: usize, //< TODO share with the indexing pipeline. pub merge_io_throughput_limiter_opt: Option, pub event_broker: EventBroker, @@ -583,7 +581,7 @@ mod tests { use quickwit_storage::RamStorage; use crate::actors::merge_pipeline::{MergePipeline, MergePipelineParams}; - use crate::merge_policy::{default_merge_policy, nop_merge_policy}; + use crate::merge_policy::default_merge_policy; use crate::IndexingSplitStore; #[tokio::test] @@ -624,7 +622,6 @@ mod tests { merge_scheduler_service: universe.get_or_spawn_one(), split_store, merge_policy: default_merge_policy(), - finalize_merge_policy: nop_merge_policy(), max_concurrent_split_uploads: 2, merge_io_throughput_limiter_opt: None, event_broker: Default::default(), diff --git a/quickwit/quickwit-indexing/src/actors/merge_planner.rs b/quickwit/quickwit-indexing/src/actors/merge_planner.rs index 7cb692affc6..b6baeae977f 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_planner.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_planner.rs @@ -81,7 +81,6 @@ pub struct MergePlanner { known_split_ids_recompute_attempt_id: usize, merge_policy: Arc, - finalize_merge_policy: Arc, merge_split_downloader_mailbox: Mailbox, merge_scheduler_service: Mailbox, @@ -193,7 +192,6 @@ impl MergePlanner { pipeline_id: &MergePipelineId, immature_splits: Vec, merge_policy: Arc, - finalize_merge_policy: Arc, merge_split_downloader_mailbox: Mailbox, merge_scheduler_service: Mailbox, ) -> MergePlanner { @@ -206,7 +204,6 @@ impl MergePlanner { known_split_ids_recompute_attempt_id: 0, partitioned_young_splits: Default::default(), merge_policy, - finalize_merge_policy, merge_split_downloader_mailbox, merge_scheduler_service, ongoing_merge_operations_inventory: Inventory::default(), @@ -302,12 +299,12 @@ impl MergePlanner { let mut merge_operations = Vec::new(); for young_splits in self.partitioned_young_splits.values_mut() { if !young_splits.is_empty() { - let merge_policy = if is_finalize { - &self.finalize_merge_policy + let operations = if is_finalize { + self.merge_policy.finalize_operations(young_splits) } else { - &self.merge_policy + self.merge_policy.operations(young_splits) }; - merge_operations.extend(merge_policy.operations(young_splits)); + merge_operations.extend(operations); } ctx.record_progress(); ctx.yield_now().await; @@ -380,7 +377,7 @@ mod tests { use crate::actors::MergePlanner; use crate::merge_policy::{ - merge_policy_from_settings, MergePolicy, MergeTask, NopMergePolicy, StableLogMergePolicy, + merge_policy_from_settings, MergePolicy, MergeTask, StableLogMergePolicy, }; use crate::models::NewSplits; @@ -441,7 +438,6 @@ mod tests { &pipeline_id, Vec::new(), merge_policy, - Arc::new(NopMergePolicy), merge_split_downloader_mailbox, universe.get_or_spawn_one(), ); @@ -564,13 +560,10 @@ mod tests { ]; let merge_policy: Arc = merge_policy_from_settings(indexing_settings.merge_policy.clone(), &indexing_settings); - let finalize_merge_policy: Arc = - merge_policy_from_settings(indexing_settings.merge_policy.clone(), &indexing_settings); let merge_planner = MergePlanner::new( &pipeline_id, immature_splits.clone(), merge_policy, - finalize_merge_policy, merge_split_downloader_mailbox, universe.get_or_spawn_one(), ); @@ -660,15 +653,10 @@ mod tests { ]; let merge_policy: Arc = merge_policy_from_settings(indexing_settings.merge_policy.clone(), &indexing_settings); - let finalize_merge_policy: Arc = merge_policy_from_settings( - indexing_settings.finalize_merge_policy.clone(), - &indexing_settings, - ); let merge_planner = MergePlanner::new( &pipeline_id, immature_splits.clone(), merge_policy, - finalize_merge_policy, merge_split_downloader_mailbox, universe.get_or_spawn_one(), ); @@ -731,13 +719,10 @@ mod tests { ]; let merge_policy: Arc = merge_policy_from_settings(indexing_settings.merge_policy.clone(), &indexing_settings); - let finalize_merge_policy: Arc = - merge_policy_from_settings(indexing_settings.merge_policy.clone(), &indexing_settings); let merge_planner = MergePlanner::new( &pipeline_id, immature_splits.clone(), merge_policy, - finalize_merge_policy, merge_split_downloader_mailbox, universe.get_or_spawn_one(), ); diff --git a/quickwit/quickwit-indexing/src/actors/publisher.rs b/quickwit/quickwit-indexing/src/actors/publisher.rs index 481c4971b71..f63c67e2bf6 100644 --- a/quickwit/quickwit-indexing/src/actors/publisher.rs +++ b/quickwit/quickwit-indexing/src/actors/publisher.rs @@ -111,7 +111,7 @@ impl Handler for Publisher { _: DisconnectMergePlanner, _ctx: &ActorContext, ) -> Result<(), quickwit_actors::ActorExitStatus> { - info!("disconnecting merge planner mailbox."); + info!("disconnecting merge planner mailbox"); self.merge_planner_mailbox_opt.take(); Ok(()) } diff --git a/quickwit/quickwit-indexing/src/merge_policy/mod.rs b/quickwit/quickwit-indexing/src/merge_policy/mod.rs index 95622a11460..491fe6e14f0 100644 --- a/quickwit/quickwit-indexing/src/merge_policy/mod.rs +++ b/quickwit/quickwit-indexing/src/merge_policy/mod.rs @@ -151,6 +151,17 @@ pub trait MergePolicy: Send + Sync + fmt::Debug { /// Returns the list of merge operations that should be performed. fn operations(&self, splits: &mut Vec) -> Vec; + /// After the last indexing pipeline has been shutdown, quickwit + /// finishes the ongoing merge operations, and eventually needs to shut it down. + /// + /// This method makes it possible to offer a last list of merge operations before + /// really shutting down the merge policy. + /// + /// This is especially useful for users relying on a one-index-per-day scheme. + fn finalize_operations(&self, _splits: &mut Vec) -> Vec { + Vec::new() + } + /// Returns split maturity. /// A split is either: /// - `Mature` if it does not undergo new merge operations. @@ -421,7 +432,6 @@ pub mod tests { &pipeline_id.merge_pipeline_id(), Vec::new(), merge_policy.clone(), - Arc::new(NopMergePolicy), merge_task_mailbox, universe.get_or_spawn_one::(), );