Skip to content

Commit

Permalink
Add maturity date in split metadata and use it to filter mature split…
Browse files Browse the repository at this point in the history
…s for merges (#3585)

* Add maturity timestamp in split metadata and use it to filter mature split for merges.

* Clean.

* Simplify merge policy.

* Refactor time to maturity.

* Fix docs.

* Fix OpenAPI.

* Clean docs.

* Review serialization of split maturity.

* Clean and better docs.

* Refactor split maturity to hide ()postgresql) implementation details and avoid confusing the reader.

* Clean.

* Use Bound for maturity filter.

* Use serde_with and fix use millis for milliseconds abbrevation.
  • Loading branch information
fmassot authored Jul 6, 2023
1 parent a026084 commit 8a684d1
Show file tree
Hide file tree
Showing 37 changed files with 569 additions and 234 deletions.
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions quickwit/quickwit-cli/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1124,13 +1124,13 @@ mod test {
split_metadata: split_metadata_1,
split_state: quickwit_metastore::SplitState::Published,
update_timestamp: 0,
publish_timestamp: Some(0),
publish_timestamp: Some(10),
};
let split_data_2 = Split {
split_metadata: split_metadata_2,
split_state: quickwit_metastore::SplitState::MarkedForDeletion,
update_timestamp: 0,
publish_timestamp: Some(0),
publish_timestamp: Some(10),
};

let index_stats =
Expand Down Expand Up @@ -1162,8 +1162,8 @@ mod test {
let split_id = "stat-test-split".to_string();
let template_split = Split {
split_state: quickwit_metastore::SplitState::Published,
update_timestamp: 0,
publish_timestamp: Some(0),
update_timestamp: 10,
publish_timestamp: Some(10),
split_metadata: SplitMetadata::default(),
};

Expand Down
7 changes: 3 additions & 4 deletions quickwit/quickwit-indexing/src/actors/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -767,10 +767,9 @@ mod tests {
let batch = messages.into_iter().next().unwrap();
assert_eq!(batch.commit_trigger, CommitTrigger::NumDocsLimit);
assert_eq!(batch.splits[0].split_attrs.num_docs, 4);
assert_eq!(
batch.splits[0].split_attrs.delete_opstamp,
last_delete_opstamp
);
for split in batch.splits.iter() {
assert_eq!(split.split_attrs.delete_opstamp, last_delete_opstamp);
}
let index_checkpoint = batch.checkpoint_delta.unwrap();
assert_eq!(index_checkpoint.source_id, "test-source");
assert_eq!(
Expand Down
7 changes: 7 additions & 0 deletions quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use crate::actors::publisher::PublisherType;
use crate::actors::sequencer::Sequencer;
use crate::actors::uploader::UploaderType;
use crate::actors::{Indexer, Packager, Publisher, Uploader};
use crate::merge_policy::MergePolicy;
use crate::models::{IndexingPipelineId, IndexingStatistics, Observe};
use crate::source::{quickwit_supported_sources, SourceActor, SourceExecutionContext};
use crate::split_store::IndexingSplitStore;
Expand Down Expand Up @@ -298,6 +299,7 @@ impl IndexingPipeline {
let uploader = Uploader::new(
UploaderType::IndexUploader,
self.params.metastore.clone(),
self.params.merge_policy.clone(),
self.params.split_store.clone(),
SplitsUpdateMailbox::Sequencer(sequencer_mailbox),
self.params.max_concurrent_split_uploads_index,
Expand Down Expand Up @@ -529,6 +531,7 @@ pub struct IndexingPipelineParams {
pub metastore: Arc<dyn Metastore>,
pub storage: Arc<dyn Storage>,
pub split_store: IndexingSplitStore,
pub merge_policy: Arc<dyn MergePolicy>,
pub max_concurrent_split_uploads_index: usize,
pub max_concurrent_split_uploads_merge: usize,
pub cooperative_indexing_permits: Option<Arc<Semaphore>>,
Expand Down Expand Up @@ -639,6 +642,7 @@ mod tests {
metastore: metastore.clone(),
storage,
split_store,
merge_policy: default_merge_policy(),
queues_dir_path: PathBuf::from("./queues"),
max_concurrent_split_uploads_index: 4,
max_concurrent_split_uploads_merge: 5,
Expand Down Expand Up @@ -734,6 +738,7 @@ mod tests {
queues_dir_path: PathBuf::from("./queues"),
storage,
split_store,
merge_policy: default_merge_policy(),
max_concurrent_split_uploads_index: 4,
max_concurrent_split_uploads_merge: 5,
cooperative_indexing_permits: None,
Expand Down Expand Up @@ -808,6 +813,7 @@ mod tests {
queues_dir_path: PathBuf::from("./queues"),
storage,
split_store,
merge_policy: default_merge_policy(),
max_concurrent_split_uploads_index: 4,
max_concurrent_split_uploads_merge: 5,
cooperative_indexing_permits: None,
Expand Down Expand Up @@ -924,6 +930,7 @@ mod tests {
queues_dir_path: PathBuf::from("./queues"),
storage,
split_store,
merge_policy: default_merge_policy(),
max_concurrent_split_uploads_index: 4,
max_concurrent_split_uploads_merge: 5,
cooperative_indexing_permits: None,
Expand Down
9 changes: 3 additions & 6 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,11 +281,7 @@ impl IndexingService {
.await?;
let merge_policy =
crate::merge_policy::merge_policy_from_settings(&index_config.indexing_settings);
let split_store = IndexingSplitStore::new(
storage.clone(),
merge_policy.clone(),
self.local_split_store.clone(),
);
let split_store = IndexingSplitStore::new(storage.clone(), self.local_split_store.clone());

let doc_mapper = build_doc_mapper(&index_config.doc_mapping, &index_config.search_settings)
.map_err(IndexingServiceError::InvalidParams)?;
Expand All @@ -296,7 +292,7 @@ impl IndexingService {
indexing_directory: indexing_directory.clone(),
metastore: self.metastore.clone(),
split_store: split_store.clone(),
merge_policy,
merge_policy: merge_policy.clone(),
merge_max_io_num_bytes_per_sec: index_config
.indexing_settings
.resources
Expand All @@ -322,6 +318,7 @@ impl IndexingService {
metastore: self.metastore.clone(),
storage,
split_store,
merge_policy,
max_concurrent_split_uploads_index,
max_concurrent_split_uploads_merge,
queues_dir_path: self.queue_dir_path.clone(),
Expand Down
32 changes: 25 additions & 7 deletions quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use quickwit_common::temp_dir::TempDirectory;
use quickwit_common::KillSwitch;
use quickwit_doc_mapper::DocMapper;
use quickwit_metastore::{ListSplitsQuery, Metastore, MetastoreError, SplitState};
use time::OffsetDateTime;
use tokio::join;
use tracing::{debug, error, info, instrument};

Expand Down Expand Up @@ -199,7 +200,8 @@ impl MergePipeline {
"Spawning merge pipeline.",
);
let query = ListSplitsQuery::for_index(self.params.pipeline_id.index_uid.clone())
.with_split_state(SplitState::Published);
.with_split_state(SplitState::Published)
.retain_immature(OffsetDateTime::now_utc());
let published_splits = ctx
.protect_future(self.params.metastore.list_splits(query))
.await?
Expand All @@ -223,6 +225,7 @@ impl MergePipeline {
let merge_uploader = Uploader::new(
UploaderType::MergeUploader,
self.params.metastore.clone(),
self.params.merge_policy.clone(),
self.params.split_store.clone(),
merge_publisher_mailbox.into(),
self.params.max_concurrent_split_uploads,
Expand Down Expand Up @@ -438,6 +441,7 @@ pub struct MergePipelineParams {

#[cfg(test)]
mod tests {
use std::ops::Bound;
use std::sync::Arc;

use quickwit_actors::{ActorExitStatus, Universe};
Expand All @@ -455,17 +459,31 @@ mod tests {
#[tokio::test]
async fn test_merge_pipeline_simple() -> anyhow::Result<()> {
let mut metastore = MockMetastore::default();
metastore
.expect_list_splits()
.times(1)
.returning(|_| Ok(Vec::new()));
let universe = Universe::with_accelerated_time();
let index_uid = IndexUid::new("test-index");
let pipeline_id = IndexingPipelineId {
index_uid: IndexUid::new("test-index"),
index_uid: index_uid.clone(),
source_id: "test-source".to_string(),
node_id: "test-node".to_string(),
pipeline_ord: 0,
};
metastore
.expect_list_splits()
.times(1)
.returning(move |list_split_query| {
assert_eq!(list_split_query.index_uid, index_uid);
assert_eq!(
list_split_query.split_states,
vec![quickwit_metastore::SplitState::Published]
);
match list_split_query.mature {
Bound::Excluded(_) => {}
_ => {
panic!("Expected excluded bound.");
}
}
Ok(Vec::new())
});
let universe = Universe::with_accelerated_time();
let storage = Arc::new(RamStorage::default());
let split_store = IndexingSplitStore::create_without_local_store(storage.clone());
let pipeline_params = MergePipelineParams {
Expand Down
8 changes: 6 additions & 2 deletions quickwit/quickwit-indexing/src/actors/merge_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, Qu
use quickwit_metastore::SplitMetadata;
use serde::Serialize;
use tantivy::Inventory;
use time::OffsetDateTime;
use tracing::info;

use crate::actors::MergeSplitDownloader;
Expand Down Expand Up @@ -148,7 +149,7 @@ impl MergePlanner {
}

fn record_split(&mut self, new_split: SplitMetadata) {
if self.merge_policy.is_mature(&new_split) {
if new_split.is_mature(OffsetDateTime::now_utc()) {
return;
}
let splits_for_partition: &mut Vec<SplitMetadata> = self
Expand Down Expand Up @@ -305,7 +306,7 @@ mod tests {
ConstWriteAmplificationMergePolicyConfig, MergePolicyConfig, StableLogMergePolicyConfig,
};
use quickwit_config::IndexingSettings;
use quickwit_metastore::SplitMetadata;
use quickwit_metastore::{SplitMaturity, SplitMetadata};
use quickwit_proto::IndexUid;
use tantivy::TrackedObject;
use time::OffsetDateTime;
Expand All @@ -331,6 +332,9 @@ mod tests {
partition_id,
num_merge_ops,
create_timestamp: OffsetDateTime::now_utc().unix_timestamp(),
maturity: SplitMaturity::Immature {
maturation_period: Duration::from_secs(3600),
},
..Default::default()
}
}
Expand Down
16 changes: 15 additions & 1 deletion quickwit/quickwit-indexing/src/actors/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use tracing::{info, instrument, warn, Instrument, Span};

use crate::actors::sequencer::{Sequencer, SequencerCommand};
use crate::actors::Publisher;
use crate::merge_policy::MergeOperation;
use crate::merge_policy::{MergeOperation, MergePolicy};
use crate::metrics::INDEXER_METRICS;
use crate::models::{
create_split_metadata, EmptySplit, PackagedSplit, PackagedSplitBatch, PublishLock, SplitsUpdate,
Expand Down Expand Up @@ -163,6 +163,7 @@ impl SplitsUpdateSender {
pub struct Uploader {
uploader_type: UploaderType,
metastore: Arc<dyn Metastore>,
merge_policy: Arc<dyn MergePolicy>,
split_store: IndexingSplitStore,
split_update_mailbox: SplitsUpdateMailbox,
max_concurrent_split_uploads: usize,
Expand All @@ -173,13 +174,15 @@ impl Uploader {
pub fn new(
uploader_type: UploaderType,
metastore: Arc<dyn Metastore>,
merge_policy: Arc<dyn MergePolicy>,
split_store: IndexingSplitStore,
split_update_mailbox: SplitsUpdateMailbox,
max_concurrent_split_uploads: usize,
) -> Uploader {
Uploader {
uploader_type,
metastore,
merge_policy,
split_store,
split_update_mailbox,
max_concurrent_split_uploads,
Expand Down Expand Up @@ -291,6 +294,7 @@ impl Handler<PackagedSplitBatch> for Uploader {
let counters = self.counters.clone();
let index_uid = batch.index_uid();
let ctx_clone = ctx.clone();
let merge_policy = self.merge_policy.clone();
info!(split_ids=?split_ids, "start-stage-and-store-splits");
tokio::spawn(
async move {
Expand All @@ -310,6 +314,7 @@ impl Handler<PackagedSplitBatch> for Uploader {
&packaged_split.hotcache_bytes,
)?;
let split_metadata = create_split_metadata(
&merge_policy,
&packaged_split.split_attrs,
packaged_split.tags.clone(),
split_streamer.footer_range.start..split_streamer.footer_range.end,
Expand Down Expand Up @@ -465,10 +470,12 @@ mod tests {
use tokio::sync::oneshot;

use super::*;
use crate::merge_policy::{default_merge_policy, NopMergePolicy};
use crate::models::{IndexingPipelineId, SplitAttrs, SplitsUpdate};

#[tokio::test]
async fn test_uploader_with_sequencer() -> anyhow::Result<()> {
quickwit_common::setup_logging_for_tests();
let universe = Universe::new();
let pipeline_id = IndexingPipelineId {
index_uid: IndexUid::new("test-index"),
Expand All @@ -492,9 +499,11 @@ mod tests {
let ram_storage = RamStorage::default();
let split_store =
IndexingSplitStore::create_without_local_store(Arc::new(ram_storage.clone()));
let merge_policy = Arc::new(NopMergePolicy);
let uploader = Uploader::new(
UploaderType::IndexUploader,
Arc::new(mock_metastore),
merge_policy,
split_store,
SplitsUpdateMailbox::Sequencer(sequencer_mailbox),
4,
Expand Down Expand Up @@ -600,9 +609,11 @@ mod tests {
let ram_storage = RamStorage::default();
let split_store =
IndexingSplitStore::create_without_local_store(Arc::new(ram_storage.clone()));
let merge_policy = Arc::new(NopMergePolicy);
let uploader = Uploader::new(
UploaderType::IndexUploader,
Arc::new(mock_metastore),
merge_policy,
split_store,
SplitsUpdateMailbox::Sequencer(sequencer_mailbox),
4,
Expand Down Expand Up @@ -736,9 +747,11 @@ mod tests {
let ram_storage = RamStorage::default();
let split_store =
IndexingSplitStore::create_without_local_store(Arc::new(ram_storage.clone()));
let merge_policy = Arc::new(NopMergePolicy);
let uploader = Uploader::new(
UploaderType::IndexUploader,
Arc::new(mock_metastore),
merge_policy,
split_store,
SplitsUpdateMailbox::Publisher(publisher_mailbox),
4,
Expand Down Expand Up @@ -805,6 +818,7 @@ mod tests {
let uploader = Uploader::new(
UploaderType::IndexUploader,
Arc::new(mock_metastore),
default_merge_policy(),
split_store,
SplitsUpdateMailbox::Sequencer(sequencer_mailbox),
4,
Expand Down
Loading

0 comments on commit 8a684d1

Please sign in to comment.