Skip to content

Commit

Permalink
restart indexing pipeline on index update (#5265)
Browse files Browse the repository at this point in the history
* add integration test for indexer pipeline restart on update

* change pipeline uid on index update
  • Loading branch information
trinity-1686a authored Sep 3, 2024
1 parent 9d8fab0 commit 97614af
Show file tree
Hide file tree
Showing 15 changed files with 376 additions and 11 deletions.
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions quickwit/quickwit-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,7 @@ fn indexing_task_to_chitchat_kv(indexing_task: &IndexingTask) -> (String, String
source_id,
shard_ids,
pipeline_uid: _,
params_fingerprint: _,
} = indexing_task;
let index_uid = indexing_task.index_uid();
let key = format!("{INDEXING_TASK_PREFIX}{}", indexing_task.pipeline_uid());
Expand Down Expand Up @@ -543,6 +544,7 @@ fn chitchat_kv_to_indexing_task(key: &str, value: &str) -> Option<IndexingTask>
source_id: source_id.to_string(),
pipeline_uid: Some(pipeline_uid),
shard_ids,
params_fingerprint: 0,
})
}

Expand Down Expand Up @@ -945,12 +947,14 @@ mod tests {
index_uid: Some(index_uid.clone()),
source_id: "source-1".to_string(),
shard_ids: Vec::new(),
params_fingerprint: 0,
};
let indexing_task2 = IndexingTask {
pipeline_uid: Some(PipelineUid::for_test(2u128)),
index_uid: Some(index_uid.clone()),
source_id: "source-1".to_string(),
shard_ids: Vec::new(),
params_fingerprint: 0,
};
cluster2
.set_self_key_value(GRPC_ADVERTISE_ADDR_KEY, "127.0.0.1:1001")
Expand Down Expand Up @@ -1032,6 +1036,7 @@ mod tests {
),
source_id: format!("source-{source_id}"),
shard_ids: Vec::new(),
params_fingerprint: 0,
}
})
.collect_vec();
Expand Down Expand Up @@ -1259,6 +1264,7 @@ mod tests {
index_uid: Some(index_uid.clone()),
source_id: "my-source1".to_string(),
shard_ids: vec![ShardId::from(1), ShardId::from(2)],
params_fingerprint: 0,
}],
&mut node_state,
);
Expand All @@ -1269,6 +1275,7 @@ mod tests {
index_uid: Some(index_uid.clone()),
source_id: "my-source1".to_string(),
shard_ids: vec![ShardId::from(1), ShardId::from(2), ShardId::from(3)],
params_fingerprint: 0,
}],
&mut node_state,
);
Expand All @@ -1279,12 +1286,14 @@ mod tests {
index_uid: Some(index_uid.clone()),
source_id: "my-source1".to_string(),
shard_ids: vec![ShardId::from(1), ShardId::from(2)],
params_fingerprint: 0,
},
IndexingTask {
pipeline_uid: Some(PipelineUid::for_test(2u128)),
index_uid: Some(index_uid.clone()),
source_id: "my-source1".to_string(),
shard_ids: vec![ShardId::from(3), ShardId::from(4)],
params_fingerprint: 0,
},
],
&mut node_state,
Expand All @@ -1297,12 +1306,14 @@ mod tests {
index_uid: Some(index_uid.clone()),
source_id: "my-source1".to_string(),
shard_ids: vec![ShardId::from(1), ShardId::from(2)],
params_fingerprint: 0,
},
IndexingTask {
pipeline_uid: Some(PipelineUid::for_test(2u128)),
index_uid: Some(IndexUid::for_test("test-index2", 0)),
source_id: "my-source1".to_string(),
shard_ids: vec![ShardId::from(3), ShardId::from(4)],
params_fingerprint: 0,
},
],
&mut node_state,
Expand All @@ -1315,12 +1326,14 @@ mod tests {
index_uid: Some(index_uid.clone()),
source_id: "my-source1".to_string(),
shard_ids: vec![ShardId::from(1), ShardId::from(2)],
params_fingerprint: 0,
},
IndexingTask {
pipeline_uid: Some(PipelineUid::for_test(2u128)),
index_uid: Some(index_uid.clone()),
source_id: "my-source2".to_string(),
shard_ids: vec![ShardId::from(3), ShardId::from(4)],
params_fingerprint: 0,
},
],
&mut node_state,
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ serde = { workspace = true }
serde_json = { workspace = true }
serde_with = { workspace = true }
serde_yaml = { workspace = true }
siphasher = { workspace = true }
toml = { workspace = true }
tracing = { workspace = true }
utoipa = { workspace = true }
Expand Down
18 changes: 17 additions & 1 deletion quickwit/quickwit-config/src/index_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

pub(crate) mod serialize;

use std::hash::{Hash, Hasher};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -33,6 +34,7 @@ use quickwit_doc_mapper::{DefaultDocMapperBuilder, DocMapper, DocMapping};
use quickwit_proto::types::IndexId;
use serde::{Deserialize, Serialize};
pub use serialize::{load_index_config_from_user_config, load_index_config_update};
use siphasher::sip::SipHasher;
use tracing::warn;

use crate::index_config::serialize::VersionedIndexConfig;
Expand All @@ -57,6 +59,12 @@ impl PartialEq for IndexingResources {
}
}

impl Hash for IndexingResources {
fn hash<H: Hasher>(&self, state: &mut H) {
self.heap_size.hash(state);
}
}

impl IndexingResources {
fn default_heap_size() -> ByteSize {
ByteSize::gb(2)
Expand Down Expand Up @@ -90,7 +98,7 @@ impl Default for IndexingResources {
}
}

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Hash, utoipa::ToSchema)]
#[serde(deny_unknown_fields)]
pub struct IndexingSettings {
#[schema(default = 60)]
Expand Down Expand Up @@ -253,6 +261,14 @@ pub struct IndexConfig {
}

impl IndexConfig {
/// Return a fingerprint of parameters relevant for indexers
pub fn indexing_params_fingerprint(&self) -> u64 {
let mut hasher = SipHasher::new();
self.doc_mapping.doc_mapping_uid.hash(&mut hasher);
self.indexing_settings.hash(&mut hasher);
hasher.finish()
}

#[cfg(any(test, feature = "testsuite"))]
pub fn for_test(index_id: &str, index_uri: &str) -> Self {
let index_uri = Uri::from_str(index_uri).unwrap();
Expand Down
6 changes: 3 additions & 3 deletions quickwit/quickwit-config/src/merge_policy_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::time::Duration;

use serde::{de, Deserialize, Deserializer, Serialize, Serializer};

#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, utoipa::ToSchema)]
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash, utoipa::ToSchema)]
#[serde(deny_unknown_fields)]
pub struct ConstWriteAmplificationMergePolicyConfig {
/// Number of splits to merge together in a single merge operation.
Expand Down Expand Up @@ -55,7 +55,7 @@ impl Default for ConstWriteAmplificationMergePolicyConfig {
}
}

#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, utoipa::ToSchema)]
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Hash, utoipa::ToSchema)]
#[serde(deny_unknown_fields)]
pub struct StableLogMergePolicyConfig {
/// Number of docs below which all splits are considered as belonging to the same level.
Expand Down Expand Up @@ -126,7 +126,7 @@ where S: Serializer {
s.serialize_str(&value_str)
}

#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq, utoipa::ToSchema)]
#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq, Hash, utoipa::ToSchema)]
#[serde(tag = "type")]
#[serde(deny_unknown_fields)]
pub enum MergePolicyConfig {
Expand Down
9 changes: 6 additions & 3 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,9 +592,12 @@ impl Handler<UpdateIndexRequest> for ControlPlane {
return Err(ActorExitStatus::from(anyhow::anyhow!(serde_error)));
}
};
self.model
.update_index_config(&index_uid, index_metadata.index_config)?;
// TODO: Handle doc mapping and/or indexing settings update here.
if self
.model
.update_index_config(&index_uid, index_metadata.index_config)?
{
let _rebuild_plan_notifier = self.rebuild_plan_debounced(ctx);
}
info!(%index_uid, "updated index");
Ok(Ok(response))
}
Expand Down
22 changes: 22 additions & 0 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec<SourceToSchedule> {
if !source_config.enabled {
continue;
}
let params_fingerprint = model
.index_metadata(&source_uid.index_uid)
.map(|index_meta| index_meta.index_config.indexing_params_fingerprint())
.unwrap_or_default();
match source_config.source_params {
SourceParams::File(FileSourceParams::Filepath(_))
| SourceParams::IngestCli
Expand All @@ -181,6 +185,7 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec<SourceToSchedule> {
sources.push(SourceToSchedule {
source_uid,
source_type: SourceToScheduleType::IngestV1,
params_fingerprint,
});
}
SourceParams::Ingest => {
Expand All @@ -206,6 +211,7 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec<SourceToSchedule> {
shard_ids,
load_per_shard,
},
params_fingerprint,
});
}
SourceParams::Kafka(_)
Expand All @@ -221,6 +227,7 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec<SourceToSchedule> {
load_per_pipeline: NonZeroU32::new(PIPELINE_FULL_CAPACITY.cpu_millis())
.unwrap(),
},
params_fingerprint,
});
}
}
Expand Down Expand Up @@ -680,18 +687,21 @@ mod tests {
index_uid: Some(index_uid.clone()),
source_id: "source-1".to_string(),
shard_ids: Vec::new(),
params_fingerprint: 0,
};
let task_1b = IndexingTask {
pipeline_uid: Some(PipelineUid::for_test(11u128)),
index_uid: Some(index_uid.clone()),
source_id: "source-1".to_string(),
shard_ids: Vec::new(),
params_fingerprint: 0,
};
let task_2 = IndexingTask {
pipeline_uid: Some(PipelineUid::for_test(20u128)),
index_uid: Some(index_uid.clone()),
source_id: "source-2".to_string(),
shard_ids: Vec::new(),
params_fingerprint: 0,
};
running_plan.insert(
"indexer-1".to_string(),
Expand All @@ -712,12 +722,14 @@ mod tests {
index_uid: Some(index_uid.clone()),
source_id: "source-1".to_string(),
shard_ids: Vec::new(),
params_fingerprint: 0,
};
let task_2 = IndexingTask {
pipeline_uid: Some(PipelineUid::for_test(2u128)),
index_uid: Some(index_uid.clone()),
source_id: "source-2".to_string(),
shard_ids: Vec::new(),
params_fingerprint: 0,
};
running_plan.insert("indexer-1".to_string(), vec![task_1.clone()]);
desired_plan.insert("indexer-1".to_string(), vec![task_2.clone()]);
Expand All @@ -744,12 +756,14 @@ mod tests {
index_uid: Some(index_uid.clone()),
source_id: "source-1".to_string(),
shard_ids: Vec::new(),
params_fingerprint: 0,
};
let task_2 = IndexingTask {
pipeline_uid: Some(PipelineUid::for_test(2u128)),
index_uid: Some(index_uid2.clone()),
source_id: "source-2".to_string(),
shard_ids: Vec::new(),
params_fingerprint: 0,
};
running_plan.insert("indexer-2".to_string(), vec![task_2.clone()]);
desired_plan.insert("indexer-1".to_string(), vec![task_1.clone()]);
Expand Down Expand Up @@ -784,18 +798,21 @@ mod tests {
index_uid: Some(index_uid.clone()),
source_id: "source-1".to_string(),
shard_ids: Vec::new(),
params_fingerprint: 0,
};
let task_1b = IndexingTask {
pipeline_uid: Some(PipelineUid::for_test(11u128)),
index_uid: Some(index_uid.clone()),
source_id: "source-1".to_string(),
shard_ids: Vec::new(),
params_fingerprint: 0,
};
let task_1c = IndexingTask {
pipeline_uid: Some(PipelineUid::for_test(12u128)),
index_uid: Some(index_uid.clone()),
source_id: "source-1".to_string(),
shard_ids: Vec::new(),
params_fingerprint: 0,
};
running_plan.insert("indexer-1".to_string(), vec![task_1a.clone()]);
desired_plan.insert(
Expand Down Expand Up @@ -938,13 +955,15 @@ mod tests {
num_pipelines: 3,
load_per_pipeline: NonZeroU32::new(1_000).unwrap(),
},
params_fingerprint: 0,
},
SourceToSchedule {
source_uid: source_2.clone(),
source_type: SourceToScheduleType::NonSharded {
num_pipelines: 2,
load_per_pipeline: NonZeroU32::new(1_000).unwrap(),
},
params_fingerprint: 0,
},
];
let mut indexer_max_loads = FnvHashMap::default();
Expand All @@ -968,18 +987,21 @@ mod tests {
source_id: "my-source".to_string(),
pipeline_uid: Some(PipelineUid::random()),
shard_ids: vec!["shard1".into()],
params_fingerprint: 0,
};
let task2 = IndexingTask {
index_uid: Some(IndexUid::for_test("index2", 123)),
source_id: "my-source".to_string(),
pipeline_uid: Some(PipelineUid::random()),
shard_ids: vec!["shard2".into(), "shard3".into()],
params_fingerprint: 0,
};
let task3 = IndexingTask {
index_uid: Some(IndexUid::for_test("index3", 123)),
source_id: "my-source".to_string(),
pipeline_uid: Some(PipelineUid::random()),
shard_ids: vec!["shard6".into()],
params_fingerprint: 0,
};
// order made to map with the debug for lisibility
map.insert("indexer5", vec![&task2]);
Expand Down
Loading

0 comments on commit 97614af

Please sign in to comment.