Skip to content

Commit

Permalink
Integrating the finalize merge policy into merge policies themselves.
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Oct 8, 2024
1 parent bed2ae7 commit 132afb3
Show file tree
Hide file tree
Showing 11 changed files with 134 additions and 59 deletions.
3 changes: 0 additions & 3 deletions quickwit/quickwit-config/src/index_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,6 @@ pub struct IndexingSettings {
pub merge_policy: MergePolicyConfig,
#[serde(default)]
pub resources: IndexingResources,
#[serde(default = "MergePolicyConfig::noop")]
pub finalize_merge_policy: MergePolicyConfig,
}

impl IndexingSettings {
Expand Down Expand Up @@ -163,7 +161,6 @@ impl Default for IndexingSettings {
docstore_compression_level: Self::default_docstore_compression_level(),
split_num_docs_target: Self::default_split_num_docs_target(),
merge_policy: MergePolicyConfig::default(),
finalize_merge_policy: MergePolicyConfig::noop(),
resources: IndexingResources::default(),
}
}
Expand Down
31 changes: 31 additions & 0 deletions quickwit/quickwit-config/src/merge_policy_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,33 @@ use std::time::Duration;

use serde::{de, Deserialize, Deserializer, Serialize, Serializer};

#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash, utoipa::ToSchema)]
#[serde(deny_unknown_fields)]
pub struct EndOfDayMergePolicyConfig {
/// Number of splits to merge together in a single merge operation.
#[serde(default = "default_merge_factor")]
pub merge_factor: usize,
/// Maximum number of splits that can be merged together in a single merge operation.
#[serde(default = "default_max_merge_factor")]
pub max_merge_factor: usize,
/// Maximum number of merges that a given split should undergo.
#[serde(default = "default_max_merge_ops")]
pub max_merge_ops: usize,
/// Duration relative to `split.created_timestamp` after which a split
/// becomes mature.
/// If `now() >= split.created_timestamp + maturation_period` then
/// the split is considered mature.
#[schema(value_type = String)]
#[serde(default = "default_maturation_period")]
#[serde(deserialize_with = "parse_human_duration")]
#[serde(serialize_with = "serialize_duration")]
pub maturation_period: Duration,
}

fn is_zero(n: &usize) -> bool {
*n == 0
}

#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash, utoipa::ToSchema)]
#[serde(deny_unknown_fields)]
pub struct ConstWriteAmplificationMergePolicyConfig {
Expand All @@ -42,6 +69,9 @@ pub struct ConstWriteAmplificationMergePolicyConfig {
#[serde(deserialize_with = "parse_human_duration")]
#[serde(serialize_with = "serialize_duration")]
pub maturation_period: Duration,
#[serde(default)]
#[serde(skip_serializing_if = "is_zero")]
pub max_finalize_merge_operations: usize,
}

impl Default for ConstWriteAmplificationMergePolicyConfig {
Expand All @@ -51,6 +81,7 @@ impl Default for ConstWriteAmplificationMergePolicyConfig {
merge_factor: default_merge_factor(),
max_merge_factor: default_max_merge_factor(),
maturation_period: default_maturation_period(),
max_finalize_merge_operations: 0,
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ mod tests {

use super::{IndexingPipeline, *};
use crate::actors::merge_pipeline::{MergePipeline, MergePipelineParams};
use crate::merge_policy::{default_merge_policy, NopMergePolicy};
use crate::merge_policy::default_merge_policy;

#[test]
fn test_wait_duration() {
Expand Down Expand Up @@ -908,7 +908,6 @@ mod tests {
metastore: metastore.clone(),
split_store: split_store.clone(),
merge_policy: default_merge_policy(),
finalize_merge_policy: Arc::new(NopMergePolicy),
max_concurrent_split_uploads: 2,
merge_io_throughput_limiter_opt: None,
merge_scheduler_service: universe.get_or_spawn_one(),
Expand Down
7 changes: 1 addition & 6 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,6 @@ impl IndexingService {
index_config.indexing_settings.merge_policy.clone(),
&index_config.indexing_settings,
);
let finalize_merge_policy = crate::merge_policy::merge_policy_from_settings(
index_config.indexing_settings.finalize_merge_policy.clone(),
&index_config.indexing_settings,
);
let split_store = IndexingSplitStore::new(storage.clone(), self.local_split_store.clone());

let doc_mapper = build_doc_mapper(&index_config.doc_mapping, &index_config.search_settings)
Expand All @@ -306,7 +302,6 @@ impl IndexingService {
split_store: split_store.clone(),
merge_scheduler_service: self.merge_scheduler_service.clone(),
merge_policy: merge_policy.clone(),
finalize_merge_policy,
merge_io_throughput_limiter_opt: self.merge_io_throughput_limiter_opt.clone(),
max_concurrent_split_uploads: self.max_concurrent_split_uploads,
event_broker: self.event_broker.clone(),
Expand Down Expand Up @@ -1203,7 +1198,7 @@ mod tests {

#[tokio::test]
async fn test_indexing_service_apply_plan() {
const PARAMS_FINGERPRINT: u64 = 3865067856550546352u64;
const PARAMS_FINGERPRINT: u64 = 3865067856550546352;

quickwit_common::setup_logging_for_tests();
let transport = ChannelTransport::default();
Expand Down
10 changes: 3 additions & 7 deletions quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ use time::OffsetDateTime;
use tokio::sync::Semaphore;
use tracing::{debug, error, info, instrument};

use super::merge_planner::RunFinalizeMergePolicy;
use super::publisher::DisconnectMergePlanner;
use super::MergeSchedulerService;
use super::{MergeSchedulerService, RunFinalizeMergePolicyAndQuit};
use crate::actors::indexing_pipeline::wait_duration_before_retry;
use crate::actors::merge_split_downloader::MergeSplitDownloader;
use crate::actors::publisher::PublisherType;
Expand Down Expand Up @@ -352,7 +351,6 @@ impl MergePipeline {
&self.params.pipeline_id,
immature_splits,
self.params.merge_policy.clone(),
self.params.finalize_merge_policy.clone(),
merge_split_downloader_mailbox,
self.params.merge_scheduler_service.clone(),
);
Expand Down Expand Up @@ -507,7 +505,7 @@ impl Handler<FinishPendingMergesAndShutdownPipeline> for MergePipeline {
let _ = handles
.merge_planner
.mailbox()
.send_message(RunFinalizeMergePolicy);
.send_message(RunFinalizeMergePolicyAndQuit);
} else {
// we won't respawn the pipeline in the future, so there is nothing
// to do here.
Expand Down Expand Up @@ -561,7 +559,6 @@ pub struct MergePipelineParams {
pub merge_scheduler_service: Mailbox<MergeSchedulerService>,
pub split_store: IndexingSplitStore,
pub merge_policy: Arc<dyn MergePolicy>,
pub finalize_merge_policy: Arc<dyn MergePolicy>,
pub max_concurrent_split_uploads: usize, //< TODO share with the indexing pipeline.
pub merge_io_throughput_limiter_opt: Option<Limiter>,
pub event_broker: EventBroker,
Expand All @@ -583,7 +580,7 @@ mod tests {
use quickwit_storage::RamStorage;

use crate::actors::merge_pipeline::{MergePipeline, MergePipelineParams};
use crate::merge_policy::{default_merge_policy, nop_merge_policy};
use crate::merge_policy::default_merge_policy;
use crate::IndexingSplitStore;

#[tokio::test]
Expand Down Expand Up @@ -624,7 +621,6 @@ mod tests {
merge_scheduler_service: universe.get_or_spawn_one(),
split_store,
merge_policy: default_merge_policy(),
finalize_merge_policy: nop_merge_policy(),
max_concurrent_split_uploads: 2,
merge_io_throughput_limiter_opt: None,
event_broker: Default::default(),
Expand Down
31 changes: 8 additions & 23 deletions quickwit/quickwit-indexing/src/actors/merge_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::models::NewSplits;
use crate::MergePolicy;

#[derive(Debug)]
pub(crate) struct RunFinalizeMergePolicy;
pub(crate) struct RunFinalizeMergePolicyAndQuit;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct MergePartition {
Expand Down Expand Up @@ -81,7 +81,6 @@ pub struct MergePlanner {
known_split_ids_recompute_attempt_id: usize,

merge_policy: Arc<dyn MergePolicy>,
finalize_merge_policy: Arc<dyn MergePolicy>,

merge_split_downloader_mailbox: Mailbox<MergeSplitDownloader>,
merge_scheduler_service: Mailbox<MergeSchedulerService>,
Expand Down Expand Up @@ -132,12 +131,12 @@ impl Actor for MergePlanner {
}

#[async_trait]
impl Handler<RunFinalizeMergePolicy> for MergePlanner {
impl Handler<RunFinalizeMergePolicyAndQuit> for MergePlanner {
type Reply = ();

async fn handle(
&mut self,
_plan_merge: RunFinalizeMergePolicy,
_plan_merge: RunFinalizeMergePolicyAndQuit,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
// Note we ignore messages that could be coming from a different incarnation.
Expand Down Expand Up @@ -193,7 +192,6 @@ impl MergePlanner {
pipeline_id: &MergePipelineId,
immature_splits: Vec<SplitMetadata>,
merge_policy: Arc<dyn MergePolicy>,
finalize_merge_policy: Arc<dyn MergePolicy>,
merge_split_downloader_mailbox: Mailbox<MergeSplitDownloader>,
merge_scheduler_service: Mailbox<MergeSchedulerService>,
) -> MergePlanner {
Expand All @@ -206,7 +204,6 @@ impl MergePlanner {
known_split_ids_recompute_attempt_id: 0,
partitioned_young_splits: Default::default(),
merge_policy,
finalize_merge_policy,
merge_split_downloader_mailbox,
merge_scheduler_service,
ongoing_merge_operations_inventory: Inventory::default(),
Expand Down Expand Up @@ -302,12 +299,12 @@ impl MergePlanner {
let mut merge_operations = Vec::new();
for young_splits in self.partitioned_young_splits.values_mut() {
if !young_splits.is_empty() {
let merge_policy = if is_finalize {
&self.finalize_merge_policy
let operations = if is_finalize {
self.merge_policy.finalize_operations(young_splits)
} else {
&self.merge_policy
self.merge_policy.operations(young_splits)
};
merge_operations.extend(merge_policy.operations(young_splits));
merge_operations.extend(operations);
}
ctx.record_progress();
ctx.yield_now().await;
Expand Down Expand Up @@ -380,7 +377,7 @@ mod tests {

use crate::actors::MergePlanner;
use crate::merge_policy::{
merge_policy_from_settings, MergePolicy, MergeTask, NopMergePolicy, StableLogMergePolicy,
merge_policy_from_settings, MergePolicy, MergeTask, StableLogMergePolicy,
};
use crate::models::NewSplits;

Expand Down Expand Up @@ -441,7 +438,6 @@ mod tests {
&pipeline_id,
Vec::new(),
merge_policy,
Arc::new(NopMergePolicy),
merge_split_downloader_mailbox,
universe.get_or_spawn_one(),
);
Expand Down Expand Up @@ -564,13 +560,10 @@ mod tests {
];
let merge_policy: Arc<dyn MergePolicy> =
merge_policy_from_settings(indexing_settings.merge_policy.clone(), &indexing_settings);
let finalize_merge_policy: Arc<dyn MergePolicy> =
merge_policy_from_settings(indexing_settings.merge_policy.clone(), &indexing_settings);
let merge_planner = MergePlanner::new(
&pipeline_id,
immature_splits.clone(),
merge_policy,
finalize_merge_policy,
merge_split_downloader_mailbox,
universe.get_or_spawn_one(),
);
Expand Down Expand Up @@ -660,15 +653,10 @@ mod tests {
];
let merge_policy: Arc<dyn MergePolicy> =
merge_policy_from_settings(indexing_settings.merge_policy.clone(), &indexing_settings);
let finalize_merge_policy: Arc<dyn MergePolicy> = merge_policy_from_settings(
indexing_settings.finalize_merge_policy.clone(),
&indexing_settings,
);
let merge_planner = MergePlanner::new(
&pipeline_id,
immature_splits.clone(),
merge_policy,
finalize_merge_policy,
merge_split_downloader_mailbox,
universe.get_or_spawn_one(),
);
Expand Down Expand Up @@ -731,13 +719,10 @@ mod tests {
];
let merge_policy: Arc<dyn MergePolicy> =
merge_policy_from_settings(indexing_settings.merge_policy.clone(), &indexing_settings);
let finalize_merge_policy: Arc<dyn MergePolicy> =
merge_policy_from_settings(indexing_settings.merge_policy.clone(), &indexing_settings);
let merge_planner = MergePlanner::new(
&pipeline_id,
immature_splits.clone(),
merge_policy,
finalize_merge_policy,
merge_split_downloader_mailbox,
universe.get_or_spawn_one(),
);
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/actors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub use indexing_pipeline::{IndexingPipeline, IndexingPipelineParams};
pub use indexing_service::{IndexingService, IndexingServiceCounters, INDEXING_DIR_NAME};
pub use merge_executor::{combine_partition_ids, merge_split_attrs, MergeExecutor};
pub use merge_pipeline::MergePipeline;
pub use merge_planner::MergePlanner;
pub(crate) use merge_planner::{MergePlanner, RunFinalizeMergePolicyAndQuit};
pub use merge_scheduler_service::{schedule_merge, MergePermit, MergeSchedulerService};
pub use merge_split_downloader::MergeSplitDownloader;
pub use packager::Packager;
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/actors/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl Handler<DisconnectMergePlanner> for Publisher {
_: DisconnectMergePlanner,
_ctx: &ActorContext<Self>,
) -> Result<(), quickwit_actors::ActorExitStatus> {
info!("disconnecting merge planner mailbox.");
info!("disconnecting merge planner mailbox");
self.merge_planner_mailbox_opt.take();
Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ impl ConstWriteAmplificationMergePolicy {
merge_factor: 3,
max_merge_factor: 5,
maturation_period: Duration::from_secs(3600),
max_finalize_merge_operations: 0,
};
Self::new(config, 10_000_000)
}
Expand Down Expand Up @@ -134,8 +135,9 @@ impl MergePolicy for ConstWriteAmplificationMergePolicy {
fn operations(&self, splits: &mut Vec<SplitMetadata>) -> Vec<MergeOperation> {
let mut group_by_num_merge_ops: HashMap<usize, Vec<SplitMetadata>> = HashMap::default();
let mut mature_splits = Vec::new();
let now = OffsetDateTime::now_utc();
for split in splits.drain(..) {
if split.is_mature(OffsetDateTime::now_utc()) {
if split.is_mature(now) {
mature_splits.push(split);
} else {
group_by_num_merge_ops
Expand All @@ -149,11 +151,52 @@ impl MergePolicy for ConstWriteAmplificationMergePolicy {
for splits_in_group in group_by_num_merge_ops.values_mut() {
let merge_ops = self.merge_operations_within_num_merge_op_level(splits_in_group);
merge_operations.extend(merge_ops);
// we readd the splits that are not used in a merge operation into the splits vector.
splits.append(splits_in_group);
}
merge_operations
}

fn finalize_operations(&self, splits: &mut Vec<SplitMetadata>) -> Vec<MergeOperation> {
if self.config.max_finalize_merge_operations == 0 {
return Vec::new();
}

let now = OffsetDateTime::now_utc();

// We first isolate mature splits. Let's not touch them.
let (mature_splits, mut young_splits): (Vec<SplitMetadata>, Vec<SplitMetadata>) =
splits.drain(..).partition(|split| split.is_mature(now));
splits.extend(mature_splits);

// We then sort the split by reverse creation date and split id.
// You may notice that reverse is the opposite of the rest of the policy.
//
// This is because these are the youngest splits. If we limit ourselves in the number of
// merge we will operate, we might as well focus on the young == smaller ones for that
// last merge.
young_splits.sort_by(|left, right| {
left.create_timestamp
.cmp(&right.create_timestamp)
.then_with(|| left.split_id().cmp(right.split_id()))
});
let mut merge_operations = Vec::new();
while merge_operations.len() < self.config.max_finalize_merge_operations {
if let Some(merge_op) =
self.single_merge_operation_within_num_merge_op_level(&mut young_splits)
{
merge_operations.push(merge_op);
} else {
break;
}
}

// We readd the young splits that are not used in any merge operation.
splits.extend(young_splits);

merge_operations
}

fn split_maturity(&self, split_num_docs: usize, split_num_merge_ops: usize) -> SplitMaturity {
if split_num_merge_ops >= self.config.max_merge_ops {
return SplitMaturity::Mature;
Expand Down Expand Up @@ -372,7 +415,7 @@ mod tests {
let final_splits = crate::merge_policy::tests::aux_test_simulate_merge_planner_num_docs(
Arc::new(merge_policy.clone()),
&vals[..],
|splits| {
& |splits| {
let mut num_merge_ops_counts: HashMap<usize, usize> = HashMap::default();
for split in splits {
*num_merge_ops_counts.entry(split.num_merge_ops).or_default() += 1;
Expand All @@ -392,4 +435,7 @@ mod tests {
assert_eq!(final_splits.len(), 49);
Ok(())
}

#[tokio::test]
async fn test_const_write_amplification_merge_policy_with_finalize() {}
}
Loading

0 comments on commit 132afb3

Please sign in to comment.