Skip to content

Commit

Permalink
prevent merging splits with different doc mapping (#5218)
Browse files Browse the repository at this point in the history
* add doc mapping uid to splits

* partition merges by doc mapper version
  • Loading branch information
trinity-1686a authored Jul 24, 2024
1 parent f9ccc31 commit d89287f
Show file tree
Hide file tree
Showing 18 changed files with 658 additions and 553 deletions.
5 changes: 4 additions & 1 deletion quickwit/quickwit-indexing/src/actors/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use quickwit_proto::indexing::{IndexingPipelineId, PipelineMetrics};
use quickwit_proto::metastore::{
LastDeleteOpstampRequest, MetastoreService, MetastoreServiceClient,
};
use quickwit_proto::types::PublishToken;
use quickwit_proto::types::{DocMappingUid, PublishToken};
use quickwit_query::get_quickwit_fastfield_normalizer_manager;
use serde::Serialize;
use tantivy::schema::Schema;
Expand Down Expand Up @@ -98,6 +98,7 @@ struct IndexerState {
publish_lock: PublishLock,
publish_token_opt: Option<PublishToken>,
schema: Schema,
doc_mapping_uid: DocMappingUid,
tokenizer_manager: TokenizerManager,
max_num_partitions: NonZeroU32,
index_settings: IndexSettings,
Expand Down Expand Up @@ -130,6 +131,7 @@ impl IndexerState {
self.pipeline_id.clone(),
partition_id,
last_delete_opstamp,
self.doc_mapping_uid,
self.indexing_directory.clone(),
index_builder,
io_controls,
Expand Down Expand Up @@ -572,6 +574,7 @@ impl Indexer {
publish_lock: PublishLock::default(),
publish_token_opt: None,
schema,
doc_mapping_uid: doc_mapper.doc_mapping_uid(),
tokenizer_manager: tokenizer_manager.tantivy_manager().clone(),
index_settings,
max_num_partitions: doc_mapper.max_num_partitions(),
Expand Down
20 changes: 16 additions & 4 deletions quickwit/quickwit-indexing/src/actors/merge_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ pub fn merge_split_attrs(
pipeline_id: MergePipelineId,
merge_split_id: SplitId,
splits: &[SplitMetadata],
) -> SplitAttrs {
) -> anyhow::Result<SplitAttrs> {
let partition_id = combine_partition_ids_aux(splits.iter().map(|split| split.partition_id));
let time_range: Option<RangeInclusive<DateTime>> = merge_time_range(splits);
let uncompressed_docs_size_in_bytes = sum_doc_sizes_in_bytes(splits);
Expand All @@ -250,10 +250,21 @@ pub fn merge_split_attrs(
.map(|split| split.delete_opstamp)
.min()
.unwrap_or(0);
SplitAttrs {
let doc_mapping_uid = splits
.first()
.ok_or_else(|| anyhow::anyhow!("attempted to merge zero splits"))?
.doc_mapping_uid;
if splits
.iter()
.any(|split| split.doc_mapping_uid != doc_mapping_uid)
{
anyhow::bail!("attempted to merge splits with different doc mapping uid");
}
Ok(SplitAttrs {
node_id: pipeline_id.node_id.clone(),
index_uid: pipeline_id.index_uid.clone(),
source_id: pipeline_id.source_id.clone(),
doc_mapping_uid,
split_id: merge_split_id,
partition_id,
replaced_split_ids,
Expand All @@ -262,7 +273,7 @@ pub fn merge_split_attrs(
uncompressed_docs_size_in_bytes,
delete_opstamp,
num_merge_ops: max_merge_ops(splits) + 1,
}
})
}

fn max_merge_ops(splits: &[SplitMetadata]) -> usize {
Expand Down Expand Up @@ -324,7 +335,7 @@ impl MergeExecutor {
)?;
ctx.record_progress();

let split_attrs = merge_split_attrs(self.pipeline_id.clone(), merge_split_id, &splits);
let split_attrs = merge_split_attrs(self.pipeline_id.clone(), merge_split_id, &splits)?;
Ok(IndexedSplit {
split_attrs,
index: merged_index,
Expand Down Expand Up @@ -436,6 +447,7 @@ impl MergeExecutor {
node_id: NodeId::new(split.node_id),
index_uid: split.index_uid,
source_id: split.source_id,
doc_mapping_uid: split.doc_mapping_uid,
split_id: merge_split_id,
partition_id: split.partition_id,
replaced_split_ids: vec![split.split_id.clone()],
Expand Down
107 changes: 82 additions & 25 deletions quickwit/quickwit-indexing/src/actors/merge_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use async_trait::async_trait;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity};
use quickwit_metastore::SplitMetadata;
use quickwit_proto::indexing::MergePipelineId;
use quickwit_proto::types::DocMappingUid;
use serde::Serialize;
use tantivy::Inventory;
use time::OffsetDateTime;
Expand All @@ -37,11 +38,26 @@ use crate::merge_policy::MergeOperation;
use crate::models::NewSplits;
use crate::MergePolicy;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct MergePartition {
partition_id: u64,
doc_mapping_uid: DocMappingUid,
}

impl MergePartition {
fn from_split_meta(split_meta: &SplitMetadata) -> MergePartition {
MergePartition {
partition_id: split_meta.partition_id,
doc_mapping_uid: split_meta.doc_mapping_uid,
}
}
}

/// The merge planner decides when to start a merge task.
pub struct MergePlanner {
/// A young split is a split that has not reached maturity
/// yet and can be candidate to merge operations.
partitioned_young_splits: HashMap<u64, Vec<SplitMetadata>>,
partitioned_young_splits: HashMap<MergePartition, Vec<SplitMetadata>>,

/// This set contains all of the split ids that we "acknowledged".
/// The point of this set is to rapidly dismiss redundant `NewSplit` message.
Expand Down Expand Up @@ -228,7 +244,7 @@ impl MergePlanner {
fn record_split(&mut self, new_split: SplitMetadata) {
let splits_for_partition: &mut Vec<SplitMetadata> = self
.partitioned_young_splits
.entry(new_split.partition_id)
.entry(MergePartition::from_split_meta(&new_split))
.or_default();
splits_for_partition.push(new_split);
}
Expand Down Expand Up @@ -326,7 +342,7 @@ mod tests {
use quickwit_config::IndexingSettings;
use quickwit_metastore::{SplitMaturity, SplitMetadata};
use quickwit_proto::indexing::MergePipelineId;
use quickwit_proto::types::{IndexUid, NodeId};
use quickwit_proto::types::{DocMappingUid, IndexUid, NodeId};
use time::OffsetDateTime;

use crate::actors::MergePlanner;
Expand All @@ -339,6 +355,7 @@ mod tests {
index_uid: &IndexUid,
split_id: &str,
partition_id: u64,
doc_mapping_uid: DocMappingUid,
num_docs: usize,
num_merge_ops: usize,
) -> SplitMetadata {
Expand All @@ -354,6 +371,7 @@ mod tests {
maturity: SplitMaturity::Immature {
maturation_period: Duration::from_secs(3600),
},
doc_mapping_uid,
..Default::default()
}
}
Expand All @@ -363,6 +381,8 @@ mod tests {
let node_id = NodeId::from("test-node");
let index_uid = IndexUid::new_with_random_ulid("test-index");
let source_id = "test-source".to_string();
let doc_mapping_uid1 = DocMappingUid::random();
let doc_mapping_uid2 = DocMappingUid::random();
let pipeline_id = MergePipelineId {
node_id,
index_uid: index_uid.clone(),
Expand Down Expand Up @@ -394,8 +414,9 @@ mod tests {
// send one split
let message = NewSplits {
new_splits: vec![
split_metadata_for_test(&index_uid, "1_1", 1, 2500, 0),
split_metadata_for_test(&index_uid, "1_2", 2, 3000, 0),
split_metadata_for_test(&index_uid, "1_1", 1, doc_mapping_uid1, 2500, 0),
split_metadata_for_test(&index_uid, "1v2_1", 1, doc_mapping_uid2, 2500, 0),
split_metadata_for_test(&index_uid, "1_2", 2, doc_mapping_uid1, 3000, 0),
],
};
merge_planner_mailbox.send_message(message).await?;
Expand All @@ -406,8 +427,9 @@ mod tests {
// send two splits with a duplicate
let message = NewSplits {
new_splits: vec![
split_metadata_for_test(&index_uid, "2_1", 1, 2000, 0),
split_metadata_for_test(&index_uid, "1_2", 2, 3000, 0),
split_metadata_for_test(&index_uid, "2_1", 1, doc_mapping_uid1, 2000, 0),
split_metadata_for_test(&index_uid, "2v2_1", 1, doc_mapping_uid2, 2500, 0),
split_metadata_for_test(&index_uid, "1_2", 2, doc_mapping_uid1, 3000, 0),
],
};
merge_planner_mailbox.send_message(message).await?;
Expand All @@ -418,27 +440,41 @@ mod tests {
// send four more splits to generate merge
let message = NewSplits {
new_splits: vec![
split_metadata_for_test(&index_uid, "3_1", 1, 1500, 0),
split_metadata_for_test(&index_uid, "4_1", 1, 1000, 0),
split_metadata_for_test(&index_uid, "2_2", 2, 2000, 0),
split_metadata_for_test(&index_uid, "3_2", 2, 4000, 0),
split_metadata_for_test(&index_uid, "3_1", 1, doc_mapping_uid1, 1500, 0),
split_metadata_for_test(&index_uid, "4_1", 1, doc_mapping_uid1, 1000, 0),
split_metadata_for_test(&index_uid, "3v2_1", 1, doc_mapping_uid2, 1500, 0),
split_metadata_for_test(&index_uid, "2_2", 2, doc_mapping_uid1, 2000, 0),
split_metadata_for_test(&index_uid, "3_2", 2, doc_mapping_uid1, 4000, 0),
],
};
merge_planner_mailbox.send_message(message).await?;
merge_planner_handle.process_pending_and_observe().await;
let operations = merge_split_downloader_inbox.drain_for_test_typed::<MergeTask>();
assert_eq!(operations.len(), 2);
let mut merge_operations = operations.into_iter().sorted_by(|left_op, right_op| {
left_op.splits[0]
.partition_id
.cmp(&right_op.splits[0].partition_id)
});
assert_eq!(operations.len(), 3);
let mut merge_operations = operations
.into_iter()
.sorted_by_key(|op| (op.splits[0].partition_id, op.splits[0].doc_mapping_uid));

let first_merge_operation = merge_operations.next().unwrap();
assert_eq!(first_merge_operation.splits.len(), 4);
assert!(first_merge_operation
.splits
.iter()
.all(|split| split.partition_id == 1 && split.doc_mapping_uid == doc_mapping_uid1));

let second_merge_operation = merge_operations.next().unwrap();
assert_eq!(second_merge_operation.splits.len(), 3);
assert!(second_merge_operation
.splits
.iter()
.all(|split| split.partition_id == 1 && split.doc_mapping_uid == doc_mapping_uid2));

let third_merge_operation = merge_operations.next().unwrap();
assert_eq!(third_merge_operation.splits.len(), 3);
assert!(third_merge_operation
.splits
.iter()
.all(|split| split.partition_id == 2 && split.doc_mapping_uid == doc_mapping_uid1));
}
universe.assert_quit().await;

Expand All @@ -451,6 +487,7 @@ mod tests {
let node_id = NodeId::from("test-node");
let index_uid = IndexUid::new_with_random_ulid("test-index");
let source_id = "test-source".to_string();
let doc_mapping_uid = DocMappingUid::random();
let pipeline_id = MergePipelineId {
node_id,
index_uid: index_uid.clone(),
Expand All @@ -472,12 +509,20 @@ mod tests {
};
let immature_splits = vec![
split_metadata_for_test(
&index_uid, "a_small", 0, // partition_id
1_000_000, 2,
&index_uid,
"a_small",
0, // partition_id
doc_mapping_uid,
1_000_000,
2,
),
split_metadata_for_test(
&index_uid, "b_small", 0, // partition_id
1_000_000, 2,
&index_uid,
"b_small",
0, // partition_id
doc_mapping_uid,
1_000_000,
2,
),
];
let merge_policy: Arc<dyn MergePolicy> = merge_policy_from_settings(&indexing_settings);
Expand Down Expand Up @@ -527,6 +572,7 @@ mod tests {
let node_id = NodeId::from("test-node");
let index_uid = IndexUid::new_with_random_ulid("test-index");
let source_id = "test-source".to_string();
let doc_mapping_uid = DocMappingUid::random();
let pipeline_id = MergePipelineId {
node_id,
index_uid,
Expand Down Expand Up @@ -558,13 +604,15 @@ mod tests {
&other_index_uid,
"a_small",
0, // partition_id
doc_mapping_uid,
1_000_000,
2,
),
split_metadata_for_test(
&other_index_uid,
"b_small",
0, // partition_id
doc_mapping_uid,
1_000_000,
2,
),
Expand Down Expand Up @@ -595,6 +643,7 @@ mod tests {
let node_id = NodeId::from("test-node");
let index_uid = IndexUid::new_with_random_ulid("test-index");
let source_id = "test-source".to_string();
let doc_mapping_uid = DocMappingUid::random();
let pipeline_id = MergePipelineId {
node_id,
index_uid: index_uid.clone(),
Expand All @@ -617,12 +666,20 @@ mod tests {
};
let immature_splits = vec![
split_metadata_for_test(
&index_uid, "a_small", 0, // partition_id
1_000_000, 2,
&index_uid,
"a_small",
0, // partition_id
doc_mapping_uid,
1_000_000,
2,
),
split_metadata_for_test(
&index_uid, "b_small", 0, // partition_id
1_000_000, 2,
&index_uid,
"b_small",
0, // partition_id
doc_mapping_uid,
1_000_000,
2,
),
];
let merge_policy: Arc<dyn MergePolicy> = merge_policy_from_settings(&indexing_settings);
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-indexing/src/actors/packager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ mod tests {
use quickwit_actors::{ObservationType, Universe};
use quickwit_metastore::checkpoint::IndexCheckpointDelta;
use quickwit_proto::search::{deserialize_split_fields, ListFieldsEntryResponse};
use quickwit_proto::types::{IndexUid, NodeId};
use quickwit_proto::types::{DocMappingUid, IndexUid, NodeId};
use tantivy::directory::MmapDirectory;
use tantivy::schema::{NumericOptions, Schema, Type, FAST, STRING, TEXT};
use tantivy::{doc, DateTime, IndexBuilder, IndexSettings};
Expand Down Expand Up @@ -519,6 +519,7 @@ mod tests {
node_id,
index_uid,
source_id,
doc_mapping_uid: DocMappingUid::default(),
split_id: "test-split".to_string(),
partition_id: 17u64,
num_docs,
Expand Down
Loading

0 comments on commit d89287f

Please sign in to comment.