Skip to content

Commit

Permalink
Integrating the finalize merge policy into merge policies themselves.
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Oct 8, 2024
1 parent bed2ae7 commit 03220c6
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 37 deletions.
3 changes: 0 additions & 3 deletions quickwit/quickwit-config/src/index_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,6 @@ pub struct IndexingSettings {
pub merge_policy: MergePolicyConfig,
#[serde(default)]
pub resources: IndexingResources,
#[serde(default = "MergePolicyConfig::noop")]
pub finalize_merge_policy: MergePolicyConfig,
}

impl IndexingSettings {
Expand Down Expand Up @@ -163,7 +161,6 @@ impl Default for IndexingSettings {
docstore_compression_level: Self::default_docstore_compression_level(),
split_num_docs_target: Self::default_split_num_docs_target(),
merge_policy: MergePolicyConfig::default(),
finalize_merge_policy: MergePolicyConfig::noop(),
resources: IndexingResources::default(),
}
}
Expand Down
23 changes: 23 additions & 0 deletions quickwit/quickwit-config/src/merge_policy_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,29 @@ use std::time::Duration;

use serde::{de, Deserialize, Deserializer, Serialize, Serializer};

#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash, utoipa::ToSchema)]
#[serde(deny_unknown_fields)]
pub struct EndOfDayMergePolicyConfig {
/// Number of splits to merge together in a single merge operation.
#[serde(default = "default_merge_factor")]
pub merge_factor: usize,
/// Maximum number of splits that can be merged together in a single merge operation.
#[serde(default = "default_max_merge_factor")]
pub max_merge_factor: usize,
/// Maximum number of merges that a given split should undergo.
#[serde(default = "default_max_merge_ops")]
pub max_merge_ops: usize,
/// Duration relative to `split.created_timestamp` after which a split
/// becomes mature.
/// If `now() >= split.created_timestamp + maturation_period` then
/// the split is considered mature.
#[schema(value_type = String)]
#[serde(default = "default_maturation_period")]
#[serde(deserialize_with = "parse_human_duration")]
#[serde(serialize_with = "serialize_duration")]
pub maturation_period: Duration,
}

#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash, utoipa::ToSchema)]
#[serde(deny_unknown_fields)]
pub struct ConstWriteAmplificationMergePolicyConfig {
Expand Down
3 changes: 1 addition & 2 deletions quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ mod tests {

use super::{IndexingPipeline, *};
use crate::actors::merge_pipeline::{MergePipeline, MergePipelineParams};
use crate::merge_policy::{default_merge_policy, NopMergePolicy};
use crate::merge_policy::default_merge_policy;

#[test]
fn test_wait_duration() {
Expand Down Expand Up @@ -908,7 +908,6 @@ mod tests {
metastore: metastore.clone(),
split_store: split_store.clone(),
merge_policy: default_merge_policy(),
finalize_merge_policy: Arc::new(NopMergePolicy),
max_concurrent_split_uploads: 2,
merge_io_throughput_limiter_opt: None,
merge_scheduler_service: universe.get_or_spawn_one(),
Expand Down
7 changes: 1 addition & 6 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,6 @@ impl IndexingService {
index_config.indexing_settings.merge_policy.clone(),
&index_config.indexing_settings,
);
let finalize_merge_policy = crate::merge_policy::merge_policy_from_settings(
index_config.indexing_settings.finalize_merge_policy.clone(),
&index_config.indexing_settings,
);
let split_store = IndexingSplitStore::new(storage.clone(), self.local_split_store.clone());

let doc_mapper = build_doc_mapper(&index_config.doc_mapping, &index_config.search_settings)
Expand All @@ -306,7 +302,6 @@ impl IndexingService {
split_store: split_store.clone(),
merge_scheduler_service: self.merge_scheduler_service.clone(),
merge_policy: merge_policy.clone(),
finalize_merge_policy,
merge_io_throughput_limiter_opt: self.merge_io_throughput_limiter_opt.clone(),
max_concurrent_split_uploads: self.max_concurrent_split_uploads,
event_broker: self.event_broker.clone(),
Expand Down Expand Up @@ -1203,7 +1198,7 @@ mod tests {

#[tokio::test]
async fn test_indexing_service_apply_plan() {
const PARAMS_FINGERPRINT: u64 = 3865067856550546352u64;
const PARAMS_FINGERPRINT: u64 = 16367456605507375863u64;

quickwit_common::setup_logging_for_tests();
let transport = ChannelTransport::default();
Expand Down
5 changes: 1 addition & 4 deletions quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,6 @@ impl MergePipeline {
&self.params.pipeline_id,
immature_splits,
self.params.merge_policy.clone(),
self.params.finalize_merge_policy.clone(),
merge_split_downloader_mailbox,
self.params.merge_scheduler_service.clone(),
);
Expand Down Expand Up @@ -561,7 +560,6 @@ pub struct MergePipelineParams {
pub merge_scheduler_service: Mailbox<MergeSchedulerService>,
pub split_store: IndexingSplitStore,
pub merge_policy: Arc<dyn MergePolicy>,
pub finalize_merge_policy: Arc<dyn MergePolicy>,
pub max_concurrent_split_uploads: usize, //< TODO share with the indexing pipeline.
pub merge_io_throughput_limiter_opt: Option<Limiter>,
pub event_broker: EventBroker,
Expand All @@ -583,7 +581,7 @@ mod tests {
use quickwit_storage::RamStorage;

use crate::actors::merge_pipeline::{MergePipeline, MergePipelineParams};
use crate::merge_policy::{default_merge_policy, nop_merge_policy};
use crate::merge_policy::default_merge_policy;
use crate::IndexingSplitStore;

#[tokio::test]
Expand Down Expand Up @@ -624,7 +622,6 @@ mod tests {
merge_scheduler_service: universe.get_or_spawn_one(),
split_store,
merge_policy: default_merge_policy(),
finalize_merge_policy: nop_merge_policy(),
max_concurrent_split_uploads: 2,
merge_io_throughput_limiter_opt: None,
event_broker: Default::default(),
Expand Down
25 changes: 5 additions & 20 deletions quickwit/quickwit-indexing/src/actors/merge_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ pub struct MergePlanner {
known_split_ids_recompute_attempt_id: usize,

merge_policy: Arc<dyn MergePolicy>,
finalize_merge_policy: Arc<dyn MergePolicy>,

merge_split_downloader_mailbox: Mailbox<MergeSplitDownloader>,
merge_scheduler_service: Mailbox<MergeSchedulerService>,
Expand Down Expand Up @@ -193,7 +192,6 @@ impl MergePlanner {
pipeline_id: &MergePipelineId,
immature_splits: Vec<SplitMetadata>,
merge_policy: Arc<dyn MergePolicy>,
finalize_merge_policy: Arc<dyn MergePolicy>,
merge_split_downloader_mailbox: Mailbox<MergeSplitDownloader>,
merge_scheduler_service: Mailbox<MergeSchedulerService>,
) -> MergePlanner {
Expand All @@ -206,7 +204,6 @@ impl MergePlanner {
known_split_ids_recompute_attempt_id: 0,
partitioned_young_splits: Default::default(),
merge_policy,
finalize_merge_policy,
merge_split_downloader_mailbox,
merge_scheduler_service,
ongoing_merge_operations_inventory: Inventory::default(),
Expand Down Expand Up @@ -302,12 +299,12 @@ impl MergePlanner {
let mut merge_operations = Vec::new();
for young_splits in self.partitioned_young_splits.values_mut() {
if !young_splits.is_empty() {
let merge_policy = if is_finalize {
&self.finalize_merge_policy
let operations = if is_finalize {
self.merge_policy.finalize_operations(young_splits)
} else {
&self.merge_policy
self.merge_policy.operations(young_splits)
};
merge_operations.extend(merge_policy.operations(young_splits));
merge_operations.extend(operations);
}
ctx.record_progress();
ctx.yield_now().await;
Expand Down Expand Up @@ -380,7 +377,7 @@ mod tests {

use crate::actors::MergePlanner;
use crate::merge_policy::{
merge_policy_from_settings, MergePolicy, MergeTask, NopMergePolicy, StableLogMergePolicy,
merge_policy_from_settings, MergePolicy, MergeTask, StableLogMergePolicy,
};
use crate::models::NewSplits;

Expand Down Expand Up @@ -441,7 +438,6 @@ mod tests {
&pipeline_id,
Vec::new(),
merge_policy,
Arc::new(NopMergePolicy),
merge_split_downloader_mailbox,
universe.get_or_spawn_one(),
);
Expand Down Expand Up @@ -564,13 +560,10 @@ mod tests {
];
let merge_policy: Arc<dyn MergePolicy> =
merge_policy_from_settings(indexing_settings.merge_policy.clone(), &indexing_settings);
let finalize_merge_policy: Arc<dyn MergePolicy> =
merge_policy_from_settings(indexing_settings.merge_policy.clone(), &indexing_settings);
let merge_planner = MergePlanner::new(
&pipeline_id,
immature_splits.clone(),
merge_policy,
finalize_merge_policy,
merge_split_downloader_mailbox,
universe.get_or_spawn_one(),
);
Expand Down Expand Up @@ -660,15 +653,10 @@ mod tests {
];
let merge_policy: Arc<dyn MergePolicy> =
merge_policy_from_settings(indexing_settings.merge_policy.clone(), &indexing_settings);
let finalize_merge_policy: Arc<dyn MergePolicy> = merge_policy_from_settings(
indexing_settings.finalize_merge_policy.clone(),
&indexing_settings,
);
let merge_planner = MergePlanner::new(
&pipeline_id,
immature_splits.clone(),
merge_policy,
finalize_merge_policy,
merge_split_downloader_mailbox,
universe.get_or_spawn_one(),
);
Expand Down Expand Up @@ -731,13 +719,10 @@ mod tests {
];
let merge_policy: Arc<dyn MergePolicy> =
merge_policy_from_settings(indexing_settings.merge_policy.clone(), &indexing_settings);
let finalize_merge_policy: Arc<dyn MergePolicy> =
merge_policy_from_settings(indexing_settings.merge_policy.clone(), &indexing_settings);
let merge_planner = MergePlanner::new(
&pipeline_id,
immature_splits.clone(),
merge_policy,
finalize_merge_policy,
merge_split_downloader_mailbox,
universe.get_or_spawn_one(),
);
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/actors/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl Handler<DisconnectMergePlanner> for Publisher {
_: DisconnectMergePlanner,
_ctx: &ActorContext<Self>,
) -> Result<(), quickwit_actors::ActorExitStatus> {
info!("disconnecting merge planner mailbox.");
info!("disconnecting merge planner mailbox");
self.merge_planner_mailbox_opt.take();
Ok(())
}
Expand Down
12 changes: 11 additions & 1 deletion quickwit/quickwit-indexing/src/merge_policy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,17 @@ pub trait MergePolicy: Send + Sync + fmt::Debug {
/// Returns the list of merge operations that should be performed.
fn operations(&self, splits: &mut Vec<SplitMetadata>) -> Vec<MergeOperation>;

/// After the last indexing pipeline has been shutdown, quickwit
/// finishes the ongoing merge operations, and eventually needs to shut it down.
///
/// This method makes it possible to offer a last list of merge operations before
/// really shutting down the merge policy.
///
/// This is especially useful for users relying on a one-index-per-day scheme.
fn finalize_operations(&self, _splits: &mut Vec<SplitMetadata>) -> Vec<MergeOperation> {
Vec::new()
}

/// Returns split maturity.
/// A split is either:
/// - `Mature` if it does not undergo new merge operations.
Expand Down Expand Up @@ -421,7 +432,6 @@ pub mod tests {
&pipeline_id.merge_pipeline_id(),
Vec::new(),
merge_policy.clone(),
Arc::new(NopMergePolicy),
merge_task_mailbox,
universe.get_or_spawn_one::<MergeSchedulerService>(),
);
Expand Down

0 comments on commit 03220c6

Please sign in to comment.