Skip to content

Commit

Permalink
Implement metastore shard API (#3773)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Sep 7, 2023
1 parent 202348d commit 4f5d268
Show file tree
Hide file tree
Showing 48 changed files with 1,648 additions and 688 deletions.
469 changes: 233 additions & 236 deletions quickwit/Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion quickwit/quickwit-config/src/index_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use quickwit_doc_mapper::{
DefaultDocMapper, DefaultDocMapperBuilder, DocMapper, FieldMappingEntry, Mode, ModeType,
QuickwitJsonOptions, TokenizerEntry,
};
use quickwit_proto::IndexId;
use serde::{Deserialize, Serialize};
pub use serialize::load_index_config_from_user_config;

Expand Down Expand Up @@ -296,7 +297,7 @@ fn prepend_at_char(schedule: &str) -> String {
#[serde(into = "VersionedIndexConfig")]
#[serde(try_from = "VersionedIndexConfig")]
pub struct IndexConfig {
pub index_id: String,
pub index_id: IndexId,
pub index_uri: Uri,
pub doc_mapping: DocMapping,
pub indexing_settings: IndexingSettings,
Expand Down
7 changes: 4 additions & 3 deletions quickwit/quickwit-indexing/src/actors/index_serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,12 @@ impl Handler<IndexedSplitBatchBuilder> for IndexSerializer {
splits.push(split);
}
let indexed_split_batch = IndexedSplitBatch {
batch_parent_span: batch_builder.batch_parent_span,
splits,
checkpoint_delta: batch_builder.checkpoint_delta,
checkpoint_delta_opt: batch_builder.checkpoint_delta_opt,
publish_lock: batch_builder.publish_lock,
merge_operation: None,
publish_token_opt: batch_builder.publish_token_opt,
merge_operation_opt: None,
batch_parent_span: batch_builder.batch_parent_span,
};
ctx.send_message(&self.packager_mailbox, indexed_split_batch)
.await?;
Expand Down
16 changes: 11 additions & 5 deletions quickwit/quickwit-indexing/src/actors/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use quickwit_doc_mapper::DocMapper;
use quickwit_metastore::checkpoint::{IndexCheckpointDelta, SourceCheckpointDelta};
use quickwit_metastore::Metastore;
use quickwit_proto::indexing::IndexingPipelineId;
use quickwit_proto::PublishToken;
use quickwit_query::get_quickwit_fastfield_normalizer_manager;
use serde::Serialize;
use tantivy::schema::Schema;
Expand Down Expand Up @@ -205,6 +206,7 @@ impl IndexerState {
},
indexing_permit,
publish_lock: self.publish_lock.clone(),
publish_token_opt: None, // TODO: Get publish token from source (next PR)
last_delete_opstamp,
memory_usage: Byte::from_bytes(0),
};
Expand Down Expand Up @@ -318,6 +320,7 @@ struct IndexingWorkbench {
checkpoint_delta: IndexCheckpointDelta,
indexing_permit: Option<OwnedSemaphorePermit>,
publish_lock: PublishLock,
publish_token_opt: Option<PublishToken>,
// On workbench creation, we fetch from the metastore the last delete task opstamp.
// We use this value to set the `delete_opstamp` of the workbench splits.
last_delete_opstamp: u64,
Expand Down Expand Up @@ -550,6 +553,7 @@ impl Indexer {
other_indexed_split_opt,
checkpoint_delta,
publish_lock,
publish_token_opt,
batch_parent_span,
indexing_permit,
..
Expand All @@ -574,9 +578,10 @@ impl Indexer {
&self.index_serializer_mailbox,
EmptySplit {
index_uid: self.indexer_state.pipeline_id.index_uid.clone(),
batch_parent_span,
checkpoint_delta,
publish_lock,
publish_token_opt,
batch_parent_span,
},
)
.await?;
Expand All @@ -589,11 +594,12 @@ impl Indexer {
ctx.send_message(
&self.index_serializer_mailbox,
IndexedSplitBatchBuilder {
batch_parent_span,
splits,
checkpoint_delta: Some(checkpoint_delta),
checkpoint_delta_opt: Some(checkpoint_delta),
publish_lock,
publish_token_opt,
commit_trigger,
batch_parent_span,
},
)
.await?;
Expand Down Expand Up @@ -771,7 +777,7 @@ mod tests {
for split in batch.splits.iter() {
assert_eq!(split.split_attrs.delete_opstamp, last_delete_opstamp);
}
let index_checkpoint = batch.checkpoint_delta.unwrap();
let index_checkpoint = batch.checkpoint_delta_opt.unwrap();
assert_eq!(index_checkpoint.source_id, "test-source");
assert_eq!(
index_checkpoint.source_delta,
Expand Down Expand Up @@ -1497,7 +1503,7 @@ mod tests {
let mut metastore = MockMetastore::default();
metastore
.expect_publish_splits()
.returning(move |_, splits, _, _| {
.returning(move |_, splits, _, _, _| {
assert!(splits.is_empty());
Ok(())
});
Expand Down
27 changes: 21 additions & 6 deletions quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,12 @@ mod tests {
metastore
.expect_publish_splits()
.withf(
|index_uid, splits, replaced_splits, checkpoint_delta_opt| -> bool {
|index_uid,
splits,
replaced_splits,
checkpoint_delta_opt,
_publish_token_opt|
-> bool {
let checkpoint_delta = checkpoint_delta_opt.as_ref().unwrap();
index_uid.to_string() == "test-index:11111111111111111111111111"
&& checkpoint_delta.source_id == "test-source"
Expand All @@ -614,7 +619,7 @@ mod tests {
.ends_with(":(00000000000000000000..00000000000000001030])")
},
)
.returning(|_, _, _, _| Ok(()));
.returning(|_, _, _, _, _| Ok(()));
let node_id = "test-node";
let metastore = Arc::new(metastore);
let pipeline_id = IndexingPipelineId {
Expand Down Expand Up @@ -698,7 +703,12 @@ mod tests {
metastore
.expect_publish_splits()
.withf(
|index_uid, splits, replaced_split_ids, checkpoint_delta_opt| -> bool {
|index_uid,
splits,
replaced_split_ids,
checkpoint_delta_opt,
_publish_token_opt|
-> bool {
let checkpoint_delta = checkpoint_delta_opt.as_ref().unwrap();
index_uid.to_string() == "test-index:11111111111111111111111111"
&& splits.len() == 1
Expand All @@ -708,7 +718,7 @@ mod tests {
.ends_with(":(00000000000000000000..00000000000000001030])")
},
)
.returning(|_, _, _, _| Ok(()));
.returning(|_, _, _, _, _| Ok(()));
let universe = Universe::new();
let node_id = "test-node";
let metastore = Arc::new(metastore);
Expand Down Expand Up @@ -872,7 +882,12 @@ mod tests {
metastore
.expect_publish_splits()
.withf(
|index_uid, splits, replaced_split_ids, checkpoint_delta_opt| -> bool {
|index_uid,
splits,
replaced_split_ids,
checkpoint_delta_opt,
_publish_token_opt|
-> bool {
let checkpoint_delta = checkpoint_delta_opt.as_ref().unwrap();
index_uid.to_string() == "test-index:11111111111111111111111111"
&& splits.is_empty()
Expand All @@ -882,7 +897,7 @@ mod tests {
.ends_with(":(00000000000000000000..00000000000000001030])")
},
)
.returning(|_, _, _, _| Ok(()));
.returning(|_, _, _, _, _| Ok(()));
let universe = Universe::new();
let node_id = "test-node";
let metastore = Arc::new(metastore);
Expand Down
8 changes: 5 additions & 3 deletions quickwit/quickwit-indexing/src/actors/merge_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,12 @@ impl Handler<MergeScratch> for MergeExecutor {
ctx.send_message(
&self.merge_packager_mailbox,
IndexedSplitBatch {
batch_parent_span: merge_op.merge_parent_span.clone(),
splits: vec![indexed_split],
checkpoint_delta: Default::default(),
checkpoint_delta_opt: Default::default(),
publish_lock: PublishLock::default(),
merge_operation: Some(merge_op),
publish_token_opt: None,
batch_parent_span: merge_op.merge_parent_span.clone(),
merge_operation_opt: Some(merge_op),
},
)
.await?;
Expand Down Expand Up @@ -716,6 +717,7 @@ mod tests {
&[new_split_metadata.split_id()],
&[split_metadata.split_id()],
None,
None,
)
.await
.unwrap();
Expand Down
12 changes: 7 additions & 5 deletions quickwit/quickwit-indexing/src/actors/packager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl Handler<IndexedSplitBatch> for Packager {
"start-packaging-splits"
);
fail_point!("packager:before");
let mut packaged_splits = Vec::new();
let mut packaged_splits = Vec::with_capacity(batch.splits.len());
for split in batch.splits {
if batch.publish_lock.is_dead() {
// TODO: Remove the junk right away?
Expand All @@ -153,9 +153,10 @@ impl Handler<IndexedSplitBatch> for Packager {
&self.uploader_mailbox,
PackagedSplitBatch::new(
packaged_splits,
batch.checkpoint_delta,
batch.checkpoint_delta_opt,
batch.publish_lock,
batch.merge_operation,
batch.publish_token_opt,
batch.merge_operation_opt,
batch.batch_parent_span,
),
)
Expand Down Expand Up @@ -465,10 +466,11 @@ mod tests {
packager_mailbox
.send_message(IndexedSplitBatch {
splits: vec![indexed_split],
checkpoint_delta: IndexCheckpointDelta::for_test("source_id", 10..20).into(),
checkpoint_delta_opt: IndexCheckpointDelta::for_test("source_id", 10..20).into(),
publish_lock: PublishLock::default(),
publish_token_opt: None,
merge_operation_opt: None,
batch_parent_span: Span::none(),
merge_operation: None,
})
.await?;
assert_eq!(
Expand Down
33 changes: 25 additions & 8 deletions quickwit/quickwit-indexing/src/actors/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ impl Handler<SplitsUpdate> for Publisher {
replaced_split_ids,
checkpoint_delta_opt,
publish_lock,
merge_operation: _,
parent_span: _,
publish_token_opt,
..
} = split_update;

let split_ids: Vec<&str> = new_splits.iter().map(|split| split.split_id()).collect();
Expand All @@ -132,6 +132,7 @@ impl Handler<SplitsUpdate> for Publisher {
&split_ids[..],
&replaced_split_ids_ref_vec,
checkpoint_delta_opt.clone(),
publish_token_opt,
))
.await
.context("Failed to publish splits.")?;
Expand Down Expand Up @@ -205,7 +206,11 @@ mod tests {
mock_metastore
.expect_publish_splits()
.withf(
|index_uid, split_ids, replaced_split_ids, checkpoint_delta_opt| {
|index_uid,
split_ids,
replaced_split_ids,
checkpoint_delta_opt,
_publish_token_opt| {
let checkpoint_delta = checkpoint_delta_opt.as_ref().unwrap();
index_uid.to_string() == "index:11111111111111111111111111"
&& checkpoint_delta.source_id == "source"
Expand All @@ -215,7 +220,7 @@ mod tests {
},
)
.times(1)
.returning(|_, _, _, _| Ok(()));
.returning(|_, _, _, _, _| Ok(()));
let (merge_planner_mailbox, merge_planner_inbox) = universe.create_test_mailbox();

let (source_mailbox, source_inbox) = universe.create_test_mailbox();
Expand All @@ -241,6 +246,7 @@ mod tests {
source_delta: SourceCheckpointDelta::from_range(1..3),
}),
publish_lock: PublishLock::default(),
publish_token_opt: None,
merge_operation: None,
parent_span: tracing::Span::none(),
})
Expand Down Expand Up @@ -277,7 +283,11 @@ mod tests {
mock_metastore
.expect_publish_splits()
.withf(
|index_uid, split_ids, replaced_split_ids, checkpoint_delta_opt| {
|index_uid,
split_ids,
replaced_split_ids,
checkpoint_delta_opt,
_publish_token_opt| {
let checkpoint_delta = checkpoint_delta_opt.as_ref().unwrap();
index_uid.to_string() == "index:11111111111111111111111111"
&& checkpoint_delta.source_id == "source"
Expand All @@ -287,7 +297,7 @@ mod tests {
},
)
.times(1)
.returning(|_, _, _, _| Ok(()));
.returning(|_, _, _, _, _| Ok(()));
let (merge_planner_mailbox, merge_planner_inbox) = universe.create_test_mailbox();

let (source_mailbox, source_inbox) = universe.create_test_mailbox();
Expand All @@ -310,6 +320,7 @@ mod tests {
source_delta: SourceCheckpointDelta::from_range(1..3),
}),
publish_lock: PublishLock::default(),
publish_token_opt: None,
merge_operation: None,
parent_span: tracing::Span::none(),
})
Expand Down Expand Up @@ -347,15 +358,19 @@ mod tests {
mock_metastore
.expect_publish_splits()
.withf(
|index_uid, new_split_ids, replaced_split_ids, checkpoint_delta_opt| {
|index_uid,
new_split_ids,
replaced_split_ids,
checkpoint_delta_opt,
_publish_token_opt| {
index_uid.to_string() == "index:11111111111111111111111111"
&& new_split_ids[..] == ["split3"]
&& replaced_split_ids[..] == ["split1", "split2"]
&& checkpoint_delta_opt.is_none()
},
)
.times(1)
.returning(|_, _, _, _| Ok(()));
.returning(|_, _, _, _, _| Ok(()));
let (merge_planner_mailbox, merge_planner_inbox) = universe.create_test_mailbox();
let publisher = Publisher::new(
PublisherType::MainPublisher,
Expand All @@ -373,6 +388,7 @@ mod tests {
replaced_split_ids: vec!["split1".to_string(), "split2".to_string()],
checkpoint_delta_opt: None,
publish_lock: PublishLock::default(),
publish_token_opt: None,
merge_operation: None,
parent_span: Span::none(),
};
Expand Down Expand Up @@ -414,6 +430,7 @@ mod tests {
replaced_split_ids: Vec::new(),
checkpoint_delta_opt: None,
publish_lock,
publish_token_opt: None,
merge_operation: None,
parent_span: Span::none(),
})
Expand Down
Loading

0 comments on commit 4f5d268

Please sign in to comment.