Skip to content

Commit

Permalink
Introduce a known_splits set to discard redundant NewSplits messages.
Browse files Browse the repository at this point in the history
This set is used as a filter.
We append `NewSplits` to it, and never remove any splits from it.

When it reaches a size that is too large, we rebuild it entirely.

Closes #3847
  • Loading branch information
fulmicoton committed Oct 2, 2023
1 parent 9cc02a9 commit 520eec8
Showing 1 changed file with 225 additions and 12 deletions.
237 changes: 225 additions & 12 deletions quickwit/quickwit-indexing/src/actors/merge_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::cmp::Reverse;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Instant;

Expand All @@ -31,7 +31,7 @@ use quickwit_proto::indexing::IndexingPipelineId;
use serde::Serialize;
use tantivy::Inventory;
use time::OffsetDateTime;
use tracing::info;
use tracing::{info, warn};

use crate::actors::MergeSplitDownloader;
use crate::merge_policy::MergeOperation;
Expand All @@ -42,15 +42,37 @@ use crate::MergePolicy;
/// The merge planner decides when to start a merge task.
pub struct MergePlanner {
pipeline_id: IndexingPipelineId,

/// A young split is a split that has not reached maturity
/// yet and can be candidate to merge operations.
partitioned_young_splits: HashMap<u64, Vec<SplitMetadata>>,
num_young_splits: usize,

/// This set contains all of the split ids that we "acknowledged".
/// The point of this set is to rapidly dismiss redundant `NewSplit` message.
///
/// Complex scenarii that can result in the reemission of
/// such messages are described in #3627.
///
/// At any given point in time, the set must contains at least:
/// - young splits (non-mature)
/// - splits that are currently in merge.
///
/// It also contain other splits, that have gone through a successful
/// merge and have been deleted for instance.
///
/// We incrementally build this set, by adding new splits to it.
/// When it becomes too large, we entirely rebuild it.
known_split_ids: HashSet<String>,

merge_policy: Arc<dyn MergePolicy>,
merge_split_downloader_mailbox: Mailbox<MergeSplitDownloader>,

/// Inventory of ongoing merge operations. If everything goes well,
/// a merge operation is dropped after the publish of the merged split.
/// Used for observability.
ongoing_merge_operations_inventory: Inventory<MergeOperation>,

/// We use the actor start_time as a way to identify incarnations.
/// Incarnations are useful to avoid a nasty bug:
/// Since we recycle mailbox between MergePlanner instantiation,
Expand All @@ -75,6 +97,10 @@ impl Actor for MergePlanner {
.iter()
.map(|tracked_operation| tracked_operation.as_ref().clone())
.collect_vec();
#[cfg(test)]
{
self.check_invariants();
}
MergePlannerState {
ongoing_merge_operations,
}
Expand Down Expand Up @@ -126,6 +152,10 @@ impl Handler<PlanMerge> for MergePlanner {
// (See comment on `Self::incarnation_start_at`.)
self.send_merge_ops(ctx).await?;
}
#[cfg(test)]
{
self.check_invariants();
}
Ok(())
}
}
Expand All @@ -141,6 +171,10 @@ impl Handler<NewSplits> for MergePlanner {
) -> Result<(), ActorExitStatus> {
self.record_splits(new_splits.new_splits);
self.send_merge_ops(ctx).await?;
#[cfg(test)]
{
self.check_invariants();
}
Ok(())
}
}
Expand Down Expand Up @@ -173,6 +207,8 @@ impl MergePlanner {
.collect();
let mut merge_planner = MergePlanner {
pipeline_id,
num_young_splits: 0,
known_split_ids: Default::default(),
partitioned_young_splits: Default::default(),
merge_policy,
merge_split_downloader_mailbox,
Expand All @@ -183,23 +219,110 @@ impl MergePlanner {
merge_planner
}

fn rebuild_known_split_ids(&self) -> HashSet<String> {
let mut known_split_ids: HashSet<String> =
HashSet::with_capacity(self.num_known_splits_rebuild_threshold());
// Add splits that in `partitioned_young_splits`.
for young_split_partition in self.partitioned_young_splits.values() {
for split in young_split_partition {
known_split_ids.insert(split.split_id().to_string());
}
}
let ongoing_merge_operations = self.ongoing_merge_operations_inventory.list();
// Add splits that are known as in merge.
for merge_op in ongoing_merge_operations {
for split in &merge_op.splits {
known_split_ids.insert(split.split_id().to_string());
}
}
if known_split_ids.len() * 2 >= self.known_split_ids.len() {
warn!(
known_split_ids_len_after = known_split_ids.len(),
known_split_ids_len_before = self.known_split_ids.len(),
"Rebuilding the known split ids set ended up not halving its size. Please report. \
This is likely a bug, please report."
);
}
known_split_ids
}

/// Updates `known_split_ids` and return true if the split was not
/// previously known and should be recorded.
fn acknownledge_split(&mut self, split_id: &str) -> bool {
if self.known_split_ids.contains(split_id) {
return false;
}
// We only rebuild `known_split_ids` when it seems too large.
if self.known_split_ids.len() >= self.num_known_splits_rebuild_threshold() {
self.known_split_ids = self.rebuild_known_split_ids();
}
self.known_split_ids.insert(split_id.to_string());
true
}

#[cfg(test)]
fn check_invariants(&self) {
assert_eq!(
self.partitioned_young_splits
.values()
.map(Vec::len)
.sum::<usize>(),
self.num_young_splits,
);
let known_split_ids = self.rebuild_known_split_ids();
for split_id in known_split_ids {
self.known_split_ids.contains(&split_id);
}
let merge_operation = self.ongoing_merge_operations_inventory.list();
let mut young_splits = HashSet::new();
for (&partition_id, young_splits_in_partition) in &self.partitioned_young_splits {
for split_metadata in young_splits_in_partition {
assert_eq!(split_metadata.partition_id, partition_id);
young_splits.insert(split_metadata.split_id());
}
}
for merge_op in merge_operation {
assert!(!self.known_split_ids.contains(&merge_op.merge_split_id));
for split_in_merge in merge_op.splits_as_slice() {
assert!(self.known_split_ids.contains(split_in_merge.split_id()));
}
}
assert!(self.known_split_ids.len() <= self.num_known_splits_rebuild_threshold() + 1);
}

/// Whenever the number of known splits exceeds this threshold, we rebuild the `known_split_ids`
/// set.
///
/// We have this function to return a number that is higher than 2 times the len of
/// `known_split_ids` after a rebuild to get amortization.
fn num_known_splits_rebuild_threshold(&self) -> usize {
// The idea behind this formula is that we expect the max legitimate of splits after a
// rebuild to be `num_young_splits` + `num_splits_merge`.
// The capacity of `partioned_young_splits` is a good upper bound for the number of
// partition.
//
// We can expect a maximum of 100 ongoing splits in merge per partition. (We oversize this
// because it actually depends on the merge factor.
1 + self.num_young_splits + (1 + self.partitioned_young_splits.capacity()) * 20
}

fn record_split(&mut self, new_split: SplitMetadata) {
if new_split.is_mature(OffsetDateTime::now_utc()) {
return;
}
let splits_for_partition: &mut Vec<SplitMetadata> = self
.partitioned_young_splits
.entry(new_split.partition_id)
.or_default();
// Due to the recycling of the mailbox of the merge planner, it is possible for
// a split already in store to be received.
if splits_for_partition
.iter()
.any(|split| split.split_id() == new_split.split_id())
{
//
// See `known_split_ids`.
if !self.acknownledge_split(new_split.split_id()) {
return;
}
let splits_for_partition: &mut Vec<SplitMetadata> = self
.partitioned_young_splits
.entry(new_split.partition_id)
.or_default();
splits_for_partition.push(new_split);
self.num_young_splits += 1;
}

// Records a list of splits.
Expand All @@ -214,7 +337,6 @@ impl MergePlanner {
self.record_split(new_split);
}
}

async fn compute_merge_ops(
&mut self,
ctx: &ActorContext<Self>,
Expand All @@ -229,6 +351,12 @@ impl MergePlanner {
}
self.partitioned_young_splits
.retain(|_, splits| !splits.is_empty());
// We recompute the number of young splits.
self.num_young_splits = self
.partitioned_young_splits
.values()
.map(|splits| splits.len())
.sum();
Ok(merge_operations)
}

Expand Down Expand Up @@ -677,7 +805,24 @@ mod tests {
.recv_typed_message::<TrackedObject<MergeOperation>>()
.await;
assert!(merge_op.is_some());

// We make sure that the known splits filtering set filters out splits are currently in
// merge.
merge_planner_mailbox
.ask(NewSplits {
new_splits: pre_existing_splits,
})
.await?;

let _ = merge_planner_handle.process_pending_and_observe().await;

let merge_ops =
merge_split_downloader_inbox.drain_for_test_typed::<TrackedObject<MergeOperation>>();

assert!(merge_ops.is_empty());

merge_planner_mailbox.send_message(Command::Quit).await?;

let (exit_status, _last_state) = merge_planner_handle.join().await;
assert!(matches!(exit_status, ActorExitStatus::Quit));
let merge_ops =
Expand Down Expand Up @@ -812,7 +957,7 @@ mod tests {

// The low capacity of the queue of the merge planner prevents us from sending a Command in
// the low priority queue. It would take the single slot and prevent the message
// sent in the initiliaze method.
// sent in the initialize method.

// Instead, we wait for the first merge ops.
let merge_ops = merge_split_downloader_inbox
Expand All @@ -832,4 +977,72 @@ mod tests {
universe.assert_quit().await;
Ok(())
}

#[tokio::test]
async fn test_merge_planner_known_splits_set_size_stays_bounded() -> anyhow::Result<()> {
let universe = Universe::with_accelerated_time();
let (merge_split_downloader_mailbox, merge_split_downloader_inbox) = universe
.spawn_ctx()
.create_mailbox("MergeSplitDownloader", QueueCapacity::Unbounded);
let index_uid = IndexUid::new("test-index");
let pipeline_id = IndexingPipelineId {
index_uid: index_uid.clone(),
source_id: "test-source".to_string(),
node_id: "test-node".to_string(),
pipeline_ord: 0,
};
let merge_policy_config = ConstWriteAmplificationMergePolicyConfig {
merge_factor: 2,
max_merge_factor: 2,
max_merge_ops: 3,
..Default::default()
};
let indexing_settings = IndexingSettings {
merge_policy: MergePolicyConfig::ConstWriteAmplification(merge_policy_config),
..Default::default()
};
let merge_policy: Arc<dyn MergePolicy> = merge_policy_from_settings(&indexing_settings);
let merge_planner = MergePlanner::new(
pipeline_id,
Vec::new(),
merge_policy,
merge_split_downloader_mailbox,
);
let universe = Universe::with_accelerated_time();

// We spawn our merge planner with this recycled mailbox.
let (merge_planner_mailbox, merge_planner_handle) =
universe.spawn_builder().spawn(merge_planner);

for j in 0..100 {
for i in 0..10 {
merge_planner_mailbox
.ask(NewSplits {
new_splits: vec![split_metadata_for_test(
&index_uid,
&format!("split_{}", j * 10 + i),
0,
1_000_000,
1,
)],
})
.await
.unwrap();
}
// We drain the merge_ops to make sure merge ops are dropped (as if merges where
// successful) and that we are properly testing that the known_splits_set is
// bounded.
let merge_ops = merge_split_downloader_inbox
.drain_for_test_typed::<TrackedObject<MergeOperation>>();
assert_eq!(merge_ops.len(), 5);
}

// At this point, our merge has been initialized.
merge_planner_mailbox.send_message(Command::Quit).await?;
let (exit_status, _last_state) = merge_planner_handle.join().await;
assert!(matches!(exit_status, ActorExitStatus::Quit));

universe.assert_quit().await;
Ok(())
}
}

0 comments on commit 520eec8

Please sign in to comment.