diff --git a/quickwit/quickwit-indexing/src/actors/merge_planner.rs b/quickwit/quickwit-indexing/src/actors/merge_planner.rs
index 6375186d19d..c00fb1a71a4 100644
--- a/quickwit/quickwit-indexing/src/actors/merge_planner.rs
+++ b/quickwit/quickwit-indexing/src/actors/merge_planner.rs
@@ -18,7 +18,7 @@
// along with this program. If not, see .
use std::cmp::Reverse;
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Instant;
@@ -31,7 +31,7 @@ use quickwit_proto::indexing::IndexingPipelineId;
use serde::Serialize;
use tantivy::Inventory;
use time::OffsetDateTime;
-use tracing::info;
+use tracing::{info, warn};
use crate::actors::MergeSplitDownloader;
use crate::merge_policy::MergeOperation;
@@ -42,15 +42,37 @@ use crate::MergePolicy;
/// The merge planner decides when to start a merge task.
pub struct MergePlanner {
pipeline_id: IndexingPipelineId,
+
/// A young split is a split that has not reached maturity
/// yet and can be candidate to merge operations.
partitioned_young_splits: HashMap>,
+ num_young_splits: usize,
+
+ /// This set contains all of the split ids that we "acknowledged".
+ /// The point of this set is to rapidly dismiss redundant `NewSplit` message.
+ ///
+ /// Complex scenarii that can result in the reemission of
+ /// such messages are described in #3627.
+ ///
+ /// At any given point in time, the set must contains at least:
+ /// - young splits (non-mature)
+ /// - splits that are currently in merge.
+ ///
+ /// It also contain other splits, that have gone through a successful
+ /// merge and have been deleted for instance.
+ ///
+ /// We incrementally build this set, by adding new splits to it.
+ /// When it becomes too large, we entirely rebuild it.
+ known_split_ids: HashSet,
+
merge_policy: Arc,
merge_split_downloader_mailbox: Mailbox,
+
/// Inventory of ongoing merge operations. If everything goes well,
/// a merge operation is dropped after the publish of the merged split.
/// Used for observability.
ongoing_merge_operations_inventory: Inventory,
+
/// We use the actor start_time as a way to identify incarnations.
/// Incarnations are useful to avoid a nasty bug:
/// Since we recycle mailbox between MergePlanner instantiation,
@@ -75,6 +97,10 @@ impl Actor for MergePlanner {
.iter()
.map(|tracked_operation| tracked_operation.as_ref().clone())
.collect_vec();
+ #[cfg(test)]
+ {
+ self.check_invariants();
+ }
MergePlannerState {
ongoing_merge_operations,
}
@@ -126,6 +152,10 @@ impl Handler for MergePlanner {
// (See comment on `Self::incarnation_start_at`.)
self.send_merge_ops(ctx).await?;
}
+ #[cfg(test)]
+ {
+ self.check_invariants();
+ }
Ok(())
}
}
@@ -141,6 +171,10 @@ impl Handler for MergePlanner {
) -> Result<(), ActorExitStatus> {
self.record_splits(new_splits.new_splits);
self.send_merge_ops(ctx).await?;
+ #[cfg(test)]
+ {
+ self.check_invariants();
+ }
Ok(())
}
}
@@ -173,6 +207,8 @@ impl MergePlanner {
.collect();
let mut merge_planner = MergePlanner {
pipeline_id,
+ num_young_splits: 0,
+ known_split_ids: Default::default(),
partitioned_young_splits: Default::default(),
merge_policy,
merge_split_downloader_mailbox,
@@ -183,23 +219,110 @@ impl MergePlanner {
merge_planner
}
+ fn rebuild_known_split_ids(&self) -> HashSet {
+ let mut known_split_ids: HashSet =
+ HashSet::with_capacity(self.num_known_splits_rebuild_threshold());
+ // Add splits that in `partitioned_young_splits`.
+ for young_split_partition in self.partitioned_young_splits.values() {
+ for split in young_split_partition {
+ known_split_ids.insert(split.split_id().to_string());
+ }
+ }
+ let ongoing_merge_operations = self.ongoing_merge_operations_inventory.list();
+ // Add splits that are known as in merge.
+ for merge_op in ongoing_merge_operations {
+ for split in &merge_op.splits {
+ known_split_ids.insert(split.split_id().to_string());
+ }
+ }
+ if known_split_ids.len() * 2 >= self.known_split_ids.len() {
+ warn!(
+ known_split_ids_len_after = known_split_ids.len(),
+ known_split_ids_len_before = self.known_split_ids.len(),
+ "Rebuilding the known split ids set ended up not halving its size. Please report. \
+ This is likely a bug, please report."
+ );
+ }
+ known_split_ids
+ }
+
+ /// Updates `known_split_ids` and return true if the split was not
+ /// previously known and should be recorded.
+ fn acknownledge_split(&mut self, split_id: &str) -> bool {
+ if self.known_split_ids.contains(split_id) {
+ return false;
+ }
+ // We only rebuild `known_split_ids` when it seems too large.
+ if self.known_split_ids.len() >= self.num_known_splits_rebuild_threshold() {
+ self.known_split_ids = self.rebuild_known_split_ids();
+ }
+ self.known_split_ids.insert(split_id.to_string());
+ true
+ }
+
+ #[cfg(test)]
+ fn check_invariants(&self) {
+ assert_eq!(
+ self.partitioned_young_splits
+ .values()
+ .map(Vec::len)
+ .sum::(),
+ self.num_young_splits,
+ );
+ let known_split_ids = self.rebuild_known_split_ids();
+ for split_id in known_split_ids {
+ self.known_split_ids.contains(&split_id);
+ }
+ let merge_operation = self.ongoing_merge_operations_inventory.list();
+ let mut young_splits = HashSet::new();
+ for (&partition_id, young_splits_in_partition) in &self.partitioned_young_splits {
+ for split_metadata in young_splits_in_partition {
+ assert_eq!(split_metadata.partition_id, partition_id);
+ young_splits.insert(split_metadata.split_id());
+ }
+ }
+ for merge_op in merge_operation {
+ assert!(!self.known_split_ids.contains(&merge_op.merge_split_id));
+ for split_in_merge in merge_op.splits_as_slice() {
+ assert!(self.known_split_ids.contains(split_in_merge.split_id()));
+ }
+ }
+ assert!(self.known_split_ids.len() <= self.num_known_splits_rebuild_threshold() + 1);
+ }
+
+ /// Whenever the number of known splits exceeds this threshold, we rebuild the `known_split_ids`
+ /// set.
+ ///
+ /// We have this function to return a number that is higher than 2 times the len of
+ /// `known_split_ids` after a rebuild to get amortization.
+ fn num_known_splits_rebuild_threshold(&self) -> usize {
+ // The idea behind this formula is that we expect the max legitimate of splits after a
+ // rebuild to be `num_young_splits` + `num_splits_merge`.
+ // The capacity of `partioned_young_splits` is a good upper bound for the number of
+ // partition.
+ //
+ // We can expect a maximum of 100 ongoing splits in merge per partition. (We oversize this
+ // because it actually depends on the merge factor.
+ 1 + self.num_young_splits + (1 + self.partitioned_young_splits.capacity()) * 20
+ }
+
fn record_split(&mut self, new_split: SplitMetadata) {
if new_split.is_mature(OffsetDateTime::now_utc()) {
return;
}
- let splits_for_partition: &mut Vec = self
- .partitioned_young_splits
- .entry(new_split.partition_id)
- .or_default();
// Due to the recycling of the mailbox of the merge planner, it is possible for
// a split already in store to be received.
- if splits_for_partition
- .iter()
- .any(|split| split.split_id() == new_split.split_id())
- {
+ //
+ // See `known_split_ids`.
+ if !self.acknownledge_split(new_split.split_id()) {
return;
}
+ let splits_for_partition: &mut Vec = self
+ .partitioned_young_splits
+ .entry(new_split.partition_id)
+ .or_default();
splits_for_partition.push(new_split);
+ self.num_young_splits += 1;
}
// Records a list of splits.
@@ -214,7 +337,6 @@ impl MergePlanner {
self.record_split(new_split);
}
}
-
async fn compute_merge_ops(
&mut self,
ctx: &ActorContext,
@@ -229,6 +351,12 @@ impl MergePlanner {
}
self.partitioned_young_splits
.retain(|_, splits| !splits.is_empty());
+ // We recompute the number of young splits.
+ self.num_young_splits = self
+ .partitioned_young_splits
+ .values()
+ .map(|splits| splits.len())
+ .sum();
Ok(merge_operations)
}
@@ -677,7 +805,24 @@ mod tests {
.recv_typed_message::>()
.await;
assert!(merge_op.is_some());
+
+ // We make sure that the known splits filtering set filters out splits are currently in
+ // merge.
+ merge_planner_mailbox
+ .ask(NewSplits {
+ new_splits: pre_existing_splits,
+ })
+ .await?;
+
+ let _ = merge_planner_handle.process_pending_and_observe().await;
+
+ let merge_ops =
+ merge_split_downloader_inbox.drain_for_test_typed::>();
+
+ assert!(merge_ops.is_empty());
+
merge_planner_mailbox.send_message(Command::Quit).await?;
+
let (exit_status, _last_state) = merge_planner_handle.join().await;
assert!(matches!(exit_status, ActorExitStatus::Quit));
let merge_ops =
@@ -812,7 +957,7 @@ mod tests {
// The low capacity of the queue of the merge planner prevents us from sending a Command in
// the low priority queue. It would take the single slot and prevent the message
- // sent in the initiliaze method.
+ // sent in the initialize method.
// Instead, we wait for the first merge ops.
let merge_ops = merge_split_downloader_inbox
@@ -832,4 +977,72 @@ mod tests {
universe.assert_quit().await;
Ok(())
}
+
+ #[tokio::test]
+ async fn test_merge_planner_known_splits_set_size_stays_bounded() -> anyhow::Result<()> {
+ let universe = Universe::with_accelerated_time();
+ let (merge_split_downloader_mailbox, merge_split_downloader_inbox) = universe
+ .spawn_ctx()
+ .create_mailbox("MergeSplitDownloader", QueueCapacity::Unbounded);
+ let index_uid = IndexUid::new("test-index");
+ let pipeline_id = IndexingPipelineId {
+ index_uid: index_uid.clone(),
+ source_id: "test-source".to_string(),
+ node_id: "test-node".to_string(),
+ pipeline_ord: 0,
+ };
+ let merge_policy_config = ConstWriteAmplificationMergePolicyConfig {
+ merge_factor: 2,
+ max_merge_factor: 2,
+ max_merge_ops: 3,
+ ..Default::default()
+ };
+ let indexing_settings = IndexingSettings {
+ merge_policy: MergePolicyConfig::ConstWriteAmplification(merge_policy_config),
+ ..Default::default()
+ };
+ let merge_policy: Arc = merge_policy_from_settings(&indexing_settings);
+ let merge_planner = MergePlanner::new(
+ pipeline_id,
+ Vec::new(),
+ merge_policy,
+ merge_split_downloader_mailbox,
+ );
+ let universe = Universe::with_accelerated_time();
+
+ // We spawn our merge planner with this recycled mailbox.
+ let (merge_planner_mailbox, merge_planner_handle) =
+ universe.spawn_builder().spawn(merge_planner);
+
+ for j in 0..100 {
+ for i in 0..10 {
+ merge_planner_mailbox
+ .ask(NewSplits {
+ new_splits: vec![split_metadata_for_test(
+ &index_uid,
+ &format!("split_{}", j * 10 + i),
+ 0,
+ 1_000_000,
+ 1,
+ )],
+ })
+ .await
+ .unwrap();
+ }
+ // We drain the merge_ops to make sure merge ops are dropped (as if merges where
+ // successful) and that we are properly testing that the known_splits_set is
+ // bounded.
+ let merge_ops = merge_split_downloader_inbox
+ .drain_for_test_typed::>();
+ assert_eq!(merge_ops.len(), 5);
+ }
+
+ // At this point, our merge has been initialized.
+ merge_planner_mailbox.send_message(Command::Quit).await?;
+ let (exit_status, _last_state) = merge_planner_handle.join().await;
+ assert!(matches!(exit_status, ActorExitStatus::Quit));
+
+ universe.assert_quit().await;
+ Ok(())
+ }
}