Skip to content

Commit

Permalink
Add maturity timestamp in split metadata and use it to filter mature …
Browse files Browse the repository at this point in the history
…split for merges.
  • Loading branch information
fmassot committed Jun 28, 2023
1 parent ccfa5e4 commit e1cd0b2
Show file tree
Hide file tree
Showing 35 changed files with 465 additions and 224 deletions.
8 changes: 4 additions & 4 deletions quickwit/quickwit-cli/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1124,13 +1124,13 @@ mod test {
split_metadata: split_metadata_1,
split_state: quickwit_metastore::SplitState::Published,
update_timestamp: 0,
publish_timestamp: Some(0),
publish_timestamp: Some(10),
};
let split_data_2 = Split {
split_metadata: split_metadata_2,
split_state: quickwit_metastore::SplitState::MarkedForDeletion,
update_timestamp: 0,
publish_timestamp: Some(0),
publish_timestamp: Some(10),
};

let index_stats =
Expand Down Expand Up @@ -1162,8 +1162,8 @@ mod test {
let split_id = "stat-test-split".to_string();
let template_split = Split {
split_state: quickwit_metastore::SplitState::Published,
update_timestamp: 0,
publish_timestamp: Some(0),
update_timestamp: 10,
publish_timestamp: Some(10),
split_metadata: SplitMetadata::default(),
};

Expand Down
9 changes: 5 additions & 4 deletions quickwit/quickwit-indexing/src/actors/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@ impl Handler<NewPublishLock> for Indexer {
}

impl Indexer {
#[allow(clippy::too_many_arguments)]
pub fn new(
pipeline_id: IndexingPipelineId,
doc_mapper: Arc<dyn DocMapper>,
Expand Down Expand Up @@ -554,6 +555,7 @@ impl Indexer {
// Dropping the indexing permit explicitly here for enhanced readability.
drop(indexing_permit);

// Update the time to maturity of the splits as they won't receive documents anymore.
let mut splits: Vec<IndexedSplitBuilder> = indexed_splits.into_values().collect();

if let Some(other_split) = other_indexed_split_opt {
Expand Down Expand Up @@ -762,10 +764,9 @@ mod tests {
let batch = messages.into_iter().next().unwrap();
assert_eq!(batch.commit_trigger, CommitTrigger::NumDocsLimit);
assert_eq!(batch.splits[0].split_attrs.num_docs, 4);
assert_eq!(
batch.splits[0].split_attrs.delete_opstamp,
last_delete_opstamp
);
for split in batch.splits.iter() {
assert_eq!(split.split_attrs.delete_opstamp, last_delete_opstamp);
}
let index_checkpoint = batch.checkpoint_delta.unwrap();
assert_eq!(index_checkpoint.source_id, "test-source");
assert_eq!(
Expand Down
7 changes: 7 additions & 0 deletions quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use crate::actors::publisher::PublisherType;
use crate::actors::sequencer::Sequencer;
use crate::actors::uploader::UploaderType;
use crate::actors::{Indexer, Packager, Publisher, Uploader};
use crate::merge_policy::MergePolicy;
use crate::models::{IndexingPipelineId, IndexingStatistics, Observe};
use crate::source::{quickwit_supported_sources, SourceActor, SourceExecutionContext};
use crate::split_store::IndexingSplitStore;
Expand Down Expand Up @@ -298,6 +299,7 @@ impl IndexingPipeline {
let uploader = Uploader::new(
UploaderType::IndexUploader,
self.params.metastore.clone(),
self.params.merge_policy.clone(),
self.params.split_store.clone(),
SplitsUpdateMailbox::Sequencer(sequencer_mailbox),
self.params.max_concurrent_split_uploads_index,
Expand Down Expand Up @@ -529,6 +531,7 @@ pub struct IndexingPipelineParams {
pub metastore: Arc<dyn Metastore>,
pub storage: Arc<dyn Storage>,
pub split_store: IndexingSplitStore,
pub merge_policy: Arc<dyn MergePolicy>,
pub max_concurrent_split_uploads_index: usize,
pub max_concurrent_split_uploads_merge: usize,
pub cooperative_indexing_permits: Option<Arc<Semaphore>>,
Expand Down Expand Up @@ -639,6 +642,7 @@ mod tests {
metastore: metastore.clone(),
storage,
split_store,
merge_policy: default_merge_policy(),
queues_dir_path: PathBuf::from("./queues"),
max_concurrent_split_uploads_index: 4,
max_concurrent_split_uploads_merge: 5,
Expand Down Expand Up @@ -734,6 +738,7 @@ mod tests {
queues_dir_path: PathBuf::from("./queues"),
storage,
split_store,
merge_policy: default_merge_policy(),
max_concurrent_split_uploads_index: 4,
max_concurrent_split_uploads_merge: 5,
cooperative_indexing_permits: None,
Expand Down Expand Up @@ -808,6 +813,7 @@ mod tests {
queues_dir_path: PathBuf::from("./queues"),
storage,
split_store,
merge_policy: default_merge_policy(),
max_concurrent_split_uploads_index: 4,
max_concurrent_split_uploads_merge: 5,
cooperative_indexing_permits: None,
Expand Down Expand Up @@ -924,6 +930,7 @@ mod tests {
queues_dir_path: PathBuf::from("./queues"),
storage,
split_store,
merge_policy: default_merge_policy(),
max_concurrent_split_uploads_index: 4,
max_concurrent_split_uploads_merge: 5,
cooperative_indexing_permits: None,
Expand Down
9 changes: 3 additions & 6 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,11 +281,7 @@ impl IndexingService {
.await?;
let merge_policy =
crate::merge_policy::merge_policy_from_settings(&index_config.indexing_settings);
let split_store = IndexingSplitStore::new(
storage.clone(),
merge_policy.clone(),
self.local_split_store.clone(),
);
let split_store = IndexingSplitStore::new(storage.clone(), self.local_split_store.clone());

let doc_mapper = build_doc_mapper(&index_config.doc_mapping, &index_config.search_settings)
.map_err(IndexingServiceError::InvalidParams)?;
Expand All @@ -296,7 +292,7 @@ impl IndexingService {
indexing_directory: indexing_directory.clone(),
metastore: self.metastore.clone(),
split_store: split_store.clone(),
merge_policy,
merge_policy: merge_policy.clone(),
merge_max_io_num_bytes_per_sec: index_config
.indexing_settings
.resources
Expand All @@ -322,6 +318,7 @@ impl IndexingService {
metastore: self.metastore.clone(),
storage,
split_store,
merge_policy,
max_concurrent_split_uploads_index,
max_concurrent_split_uploads_merge,
queues_dir_path: self.queue_dir_path.clone(),
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-indexing/src/actors/merge_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ pub fn merge_split_attrs(
.map(|split| split.delete_opstamp)
.min()
.unwrap_or(0);
let num_merge_ops = max_merge_ops(splits) + 1;
SplitAttrs {
split_id: merge_split_id,
partition_id,
Expand All @@ -252,7 +253,7 @@ pub fn merge_split_attrs(
num_docs,
uncompressed_docs_size_in_bytes,
delete_opstamp,
num_merge_ops: max_merge_ops(splits) + 1,
num_merge_ops,
}
}

Expand Down
40 changes: 33 additions & 7 deletions quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use quickwit_common::temp_dir::TempDirectory;
use quickwit_common::KillSwitch;
use quickwit_doc_mapper::DocMapper;
use quickwit_metastore::{ListSplitsQuery, Metastore, MetastoreError, SplitState};
use time::OffsetDateTime;
use tokio::join;
use tracing::{debug, error, info, instrument};

Expand Down Expand Up @@ -199,7 +200,8 @@ impl MergePipeline {
"Spawning merge pipeline.",
);
let query = ListSplitsQuery::for_index(self.params.pipeline_id.index_uid.clone())
.with_split_state(SplitState::Published);
.with_split_state(SplitState::Published)
.with_maturity_timestamp_lte(OffsetDateTime::now_utc().unix_timestamp());
let published_splits = ctx
.protect_future(self.params.metastore.list_splits(query))
.await?
Expand All @@ -223,6 +225,7 @@ impl MergePipeline {
let merge_uploader = Uploader::new(
UploaderType::MergeUploader,
self.params.metastore.clone(),
self.params.merge_policy.clone(),
self.params.split_store.clone(),
merge_publisher_mailbox.into(),
self.params.max_concurrent_split_uploads,
Expand Down Expand Up @@ -438,6 +441,7 @@ pub struct MergePipelineParams {

#[cfg(test)]
mod tests {
use std::ops::Bound;
use std::sync::Arc;

use quickwit_actors::{ActorExitStatus, Universe};
Expand All @@ -446,6 +450,7 @@ mod tests {
use quickwit_metastore::MockMetastore;
use quickwit_proto::IndexUid;
use quickwit_storage::RamStorage;
use time::OffsetDateTime;

use crate::actors::merge_pipeline::{MergePipeline, MergePipelineParams};
use crate::merge_policy::default_merge_policy;
Expand All @@ -455,17 +460,38 @@ mod tests {
#[tokio::test]
async fn test_merge_pipeline_simple() -> anyhow::Result<()> {
let mut metastore = MockMetastore::default();
metastore
.expect_list_splits()
.times(1)
.returning(|_| Ok(Vec::new()));
let universe = Universe::with_accelerated_time();
let index_uid = IndexUid::new("test-index");
let pipeline_id = IndexingPipelineId {
index_uid: IndexUid::new("test-index"),
index_uid: index_uid.clone(),
source_id: "test-source".to_string(),
node_id: "test-node".to_string(),
pipeline_ord: 0,
};
metastore
.expect_list_splits()
.times(1)
.returning(move |list_split_query| {
assert_eq!(list_split_query.index_uid, index_uid);
assert_eq!(
list_split_query.split_states,
vec![quickwit_metastore::SplitState::Published]
);
match list_split_query.maturity_timestamp.end {
Bound::Included(maturity_timestamp_end) => {
assert!(
maturity_timestamp_end
< OffsetDateTime::now_utc().unix_timestamp() + 3600
);
assert!(
maturity_timestamp_end
> OffsetDateTime::now_utc().unix_timestamp() - 3600
)
}
_ => panic!("Expected unbounded maturity timestamp."),
}
Ok(Vec::new())
});
let universe = Universe::with_accelerated_time();
let storage = Arc::new(RamStorage::default());
let split_store = IndexingSplitStore::create_without_local_store(storage.clone());
let pipeline_params = MergePipelineParams {
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-indexing/src/actors/merge_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl MergePlanner {
}

fn record_split(&mut self, new_split: SplitMetadata) {
if self.merge_policy.is_mature(&new_split) {
if new_split.is_mature() {
return;
}
let splits_for_partition: &mut Vec<SplitMetadata> = self
Expand Down Expand Up @@ -331,6 +331,7 @@ mod tests {
partition_id,
num_merge_ops,
create_timestamp: OffsetDateTime::now_utc().unix_timestamp(),
maturity_timestamp: OffsetDateTime::now_utc().unix_timestamp() + 3600,
..Default::default()
}
}
Expand Down
16 changes: 15 additions & 1 deletion quickwit/quickwit-indexing/src/actors/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use tracing::{info, instrument, warn, Instrument, Span};

use crate::actors::sequencer::{Sequencer, SequencerCommand};
use crate::actors::Publisher;
use crate::merge_policy::MergeOperation;
use crate::merge_policy::{MergeOperation, MergePolicy};
use crate::metrics::INDEXER_METRICS;
use crate::models::{
create_split_metadata, EmptySplit, PackagedSplit, PackagedSplitBatch, PublishLock, SplitsUpdate,
Expand Down Expand Up @@ -163,6 +163,7 @@ impl SplitsUpdateSender {
pub struct Uploader {
uploader_type: UploaderType,
metastore: Arc<dyn Metastore>,
merge_policy: Arc<dyn MergePolicy>,
split_store: IndexingSplitStore,
split_update_mailbox: SplitsUpdateMailbox,
max_concurrent_split_uploads: usize,
Expand All @@ -173,13 +174,15 @@ impl Uploader {
pub fn new(
uploader_type: UploaderType,
metastore: Arc<dyn Metastore>,
merge_policy: Arc<dyn MergePolicy>,
split_store: IndexingSplitStore,
split_update_mailbox: SplitsUpdateMailbox,
max_concurrent_split_uploads: usize,
) -> Uploader {
Uploader {
uploader_type,
metastore,
merge_policy,
split_store,
split_update_mailbox,
max_concurrent_split_uploads,
Expand Down Expand Up @@ -291,6 +294,7 @@ impl Handler<PackagedSplitBatch> for Uploader {
let counters = self.counters.clone();
let index_uid = batch.index_uid();
let ctx_clone = ctx.clone();
let merge_policy = self.merge_policy.clone();
info!(split_ids=?split_ids, "start-stage-and-store-splits");
tokio::spawn(
async move {
Expand All @@ -310,6 +314,7 @@ impl Handler<PackagedSplitBatch> for Uploader {
&packaged_split.hotcache_bytes,
)?;
let split_metadata = create_split_metadata(
&merge_policy,
&packaged_split.split_attrs,
packaged_split.tags.clone(),
split_streamer.footer_range.start..split_streamer.footer_range.end,
Expand Down Expand Up @@ -465,10 +470,12 @@ mod tests {
use tokio::sync::oneshot;

use super::*;
use crate::merge_policy::{default_merge_policy, NopMergePolicy};
use crate::models::{IndexingPipelineId, SplitAttrs, SplitsUpdate};

#[tokio::test]
async fn test_uploader_with_sequencer() -> anyhow::Result<()> {
quickwit_common::setup_logging_for_tests();
let universe = Universe::new();
let pipeline_id = IndexingPipelineId {
index_uid: IndexUid::new("test-index"),
Expand All @@ -492,9 +499,11 @@ mod tests {
let ram_storage = RamStorage::default();
let split_store =
IndexingSplitStore::create_without_local_store(Arc::new(ram_storage.clone()));
let merge_policy = Arc::new(NopMergePolicy);
let uploader = Uploader::new(
UploaderType::IndexUploader,
Arc::new(mock_metastore),
merge_policy,
split_store,
SplitsUpdateMailbox::Sequencer(sequencer_mailbox),
4,
Expand Down Expand Up @@ -600,9 +609,11 @@ mod tests {
let ram_storage = RamStorage::default();
let split_store =
IndexingSplitStore::create_without_local_store(Arc::new(ram_storage.clone()));
let merge_policy = Arc::new(NopMergePolicy);
let uploader = Uploader::new(
UploaderType::IndexUploader,
Arc::new(mock_metastore),
merge_policy,
split_store,
SplitsUpdateMailbox::Sequencer(sequencer_mailbox),
4,
Expand Down Expand Up @@ -736,9 +747,11 @@ mod tests {
let ram_storage = RamStorage::default();
let split_store =
IndexingSplitStore::create_without_local_store(Arc::new(ram_storage.clone()));
let merge_policy = Arc::new(NopMergePolicy);
let uploader = Uploader::new(
UploaderType::IndexUploader,
Arc::new(mock_metastore),
merge_policy,
split_store,
SplitsUpdateMailbox::Publisher(publisher_mailbox),
4,
Expand Down Expand Up @@ -805,6 +818,7 @@ mod tests {
let uploader = Uploader::new(
UploaderType::IndexUploader,
Arc::new(mock_metastore),
default_merge_policy(),
split_store,
SplitsUpdateMailbox::Sequencer(sequencer_mailbox),
4,
Expand Down
Loading

0 comments on commit e1cd0b2

Please sign in to comment.