From 97614af8d69d602d54b0839332ff9cb51121ebdb Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Tue, 3 Sep 2024 16:42:03 +0200 Subject: [PATCH] restart indexing pipeline on index update (#5265) * add integration test for indexer pipeline restart on update * change pipeline uid on index update --- quickwit/Cargo.lock | 1 + quickwit/quickwit-cluster/src/cluster.rs | 13 + quickwit/quickwit-config/Cargo.toml | 1 + .../quickwit-config/src/index_config/mod.rs | 18 +- .../src/merge_policy_config.rs | 6 +- .../src/control_plane.rs | 9 +- .../src/indexing_scheduler/mod.rs | 22 ++ .../src/indexing_scheduler/scheduling/mod.rs | 48 +++- .../quickwit-control-plane/src/model/mod.rs | 6 +- .../src/actors/indexing_service.rs | 13 + .../src/tests/update_tests/mod.rs | 1 + .../update_tests/restart_indexer_tests.rs | 241 ++++++++++++++++++ .../protos/quickwit/indexing.proto | 3 + .../src/codegen/quickwit/quickwit.indexing.rs | 4 + quickwit/quickwit-serve/src/lib.rs | 1 + 15 files changed, 376 insertions(+), 11 deletions(-) create mode 100644 quickwit/quickwit-integration-tests/src/tests/update_tests/restart_indexer_tests.rs diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 4c5aca93dc4..90dbed7d0b8 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -5846,6 +5846,7 @@ dependencies = [ "serde_json", "serde_with 3.9.0", "serde_yaml", + "siphasher", "tokio", "toml", "tracing", diff --git a/quickwit/quickwit-cluster/src/cluster.rs b/quickwit/quickwit-cluster/src/cluster.rs index 116297476b2..b34fe47ab29 100644 --- a/quickwit/quickwit-cluster/src/cluster.rs +++ b/quickwit/quickwit-cluster/src/cluster.rs @@ -515,6 +515,7 @@ fn indexing_task_to_chitchat_kv(indexing_task: &IndexingTask) -> (String, String source_id, shard_ids, pipeline_uid: _, + params_fingerprint: _, } = indexing_task; let index_uid = indexing_task.index_uid(); let key = format!("{INDEXING_TASK_PREFIX}{}", indexing_task.pipeline_uid()); @@ -543,6 +544,7 @@ fn chitchat_kv_to_indexing_task(key: &str, value: &str) -> Option source_id: source_id.to_string(), pipeline_uid: Some(pipeline_uid), shard_ids, + params_fingerprint: 0, }) } @@ -945,12 +947,14 @@ mod tests { index_uid: Some(index_uid.clone()), source_id: "source-1".to_string(), shard_ids: Vec::new(), + params_fingerprint: 0, }; let indexing_task2 = IndexingTask { pipeline_uid: Some(PipelineUid::for_test(2u128)), index_uid: Some(index_uid.clone()), source_id: "source-1".to_string(), shard_ids: Vec::new(), + params_fingerprint: 0, }; cluster2 .set_self_key_value(GRPC_ADVERTISE_ADDR_KEY, "127.0.0.1:1001") @@ -1032,6 +1036,7 @@ mod tests { ), source_id: format!("source-{source_id}"), shard_ids: Vec::new(), + params_fingerprint: 0, } }) .collect_vec(); @@ -1259,6 +1264,7 @@ mod tests { index_uid: Some(index_uid.clone()), source_id: "my-source1".to_string(), shard_ids: vec![ShardId::from(1), ShardId::from(2)], + params_fingerprint: 0, }], &mut node_state, ); @@ -1269,6 +1275,7 @@ mod tests { index_uid: Some(index_uid.clone()), source_id: "my-source1".to_string(), shard_ids: vec![ShardId::from(1), ShardId::from(2), ShardId::from(3)], + params_fingerprint: 0, }], &mut node_state, ); @@ -1279,12 +1286,14 @@ mod tests { index_uid: Some(index_uid.clone()), source_id: "my-source1".to_string(), shard_ids: vec![ShardId::from(1), ShardId::from(2)], + params_fingerprint: 0, }, IndexingTask { pipeline_uid: Some(PipelineUid::for_test(2u128)), index_uid: Some(index_uid.clone()), source_id: "my-source1".to_string(), shard_ids: vec![ShardId::from(3), ShardId::from(4)], + params_fingerprint: 0, }, ], &mut node_state, @@ -1297,12 +1306,14 @@ mod tests { index_uid: Some(index_uid.clone()), source_id: "my-source1".to_string(), shard_ids: vec![ShardId::from(1), ShardId::from(2)], + params_fingerprint: 0, }, IndexingTask { pipeline_uid: Some(PipelineUid::for_test(2u128)), index_uid: Some(IndexUid::for_test("test-index2", 0)), source_id: "my-source1".to_string(), shard_ids: vec![ShardId::from(3), ShardId::from(4)], + params_fingerprint: 0, }, ], &mut node_state, @@ -1315,12 +1326,14 @@ mod tests { index_uid: Some(index_uid.clone()), source_id: "my-source1".to_string(), shard_ids: vec![ShardId::from(1), ShardId::from(2)], + params_fingerprint: 0, }, IndexingTask { pipeline_uid: Some(PipelineUid::for_test(2u128)), index_uid: Some(index_uid.clone()), source_id: "my-source2".to_string(), shard_ids: vec![ShardId::from(3), ShardId::from(4)], + params_fingerprint: 0, }, ], &mut node_state, diff --git a/quickwit/quickwit-config/Cargo.toml b/quickwit/quickwit-config/Cargo.toml index a3f300742ea..7cf75818444 100644 --- a/quickwit/quickwit-config/Cargo.toml +++ b/quickwit/quickwit-config/Cargo.toml @@ -29,6 +29,7 @@ serde = { workspace = true } serde_json = { workspace = true } serde_with = { workspace = true } serde_yaml = { workspace = true } +siphasher = { workspace = true } toml = { workspace = true } tracing = { workspace = true } utoipa = { workspace = true } diff --git a/quickwit/quickwit-config/src/index_config/mod.rs b/quickwit/quickwit-config/src/index_config/mod.rs index 08ebee23ff5..60ea2a9429e 100644 --- a/quickwit/quickwit-config/src/index_config/mod.rs +++ b/quickwit/quickwit-config/src/index_config/mod.rs @@ -19,6 +19,7 @@ pub(crate) mod serialize; +use std::hash::{Hash, Hasher}; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; @@ -33,6 +34,7 @@ use quickwit_doc_mapper::{DefaultDocMapperBuilder, DocMapper, DocMapping}; use quickwit_proto::types::IndexId; use serde::{Deserialize, Serialize}; pub use serialize::{load_index_config_from_user_config, load_index_config_update}; +use siphasher::sip::SipHasher; use tracing::warn; use crate::index_config::serialize::VersionedIndexConfig; @@ -57,6 +59,12 @@ impl PartialEq for IndexingResources { } } +impl Hash for IndexingResources { + fn hash(&self, state: &mut H) { + self.heap_size.hash(state); + } +} + impl IndexingResources { fn default_heap_size() -> ByteSize { ByteSize::gb(2) @@ -90,7 +98,7 @@ impl Default for IndexingResources { } } -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, utoipa::ToSchema)] +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Hash, utoipa::ToSchema)] #[serde(deny_unknown_fields)] pub struct IndexingSettings { #[schema(default = 60)] @@ -253,6 +261,14 @@ pub struct IndexConfig { } impl IndexConfig { + /// Return a fingerprint of parameters relevant for indexers + pub fn indexing_params_fingerprint(&self) -> u64 { + let mut hasher = SipHasher::new(); + self.doc_mapping.doc_mapping_uid.hash(&mut hasher); + self.indexing_settings.hash(&mut hasher); + hasher.finish() + } + #[cfg(any(test, feature = "testsuite"))] pub fn for_test(index_id: &str, index_uri: &str) -> Self { let index_uri = Uri::from_str(index_uri).unwrap(); diff --git a/quickwit/quickwit-config/src/merge_policy_config.rs b/quickwit/quickwit-config/src/merge_policy_config.rs index b692034c7d6..2f96ed49d78 100644 --- a/quickwit/quickwit-config/src/merge_policy_config.rs +++ b/quickwit/quickwit-config/src/merge_policy_config.rs @@ -21,7 +21,7 @@ use std::time::Duration; use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; -#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, utoipa::ToSchema)] +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash, utoipa::ToSchema)] #[serde(deny_unknown_fields)] pub struct ConstWriteAmplificationMergePolicyConfig { /// Number of splits to merge together in a single merge operation. @@ -55,7 +55,7 @@ impl Default for ConstWriteAmplificationMergePolicyConfig { } } -#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, utoipa::ToSchema)] +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Hash, utoipa::ToSchema)] #[serde(deny_unknown_fields)] pub struct StableLogMergePolicyConfig { /// Number of docs below which all splits are considered as belonging to the same level. @@ -126,7 +126,7 @@ where S: Serializer { s.serialize_str(&value_str) } -#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq, utoipa::ToSchema)] +#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq, Hash, utoipa::ToSchema)] #[serde(tag = "type")] #[serde(deny_unknown_fields)] pub enum MergePolicyConfig { diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index e04def73ccd..418c538c642 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -592,9 +592,12 @@ impl Handler for ControlPlane { return Err(ActorExitStatus::from(anyhow::anyhow!(serde_error))); } }; - self.model - .update_index_config(&index_uid, index_metadata.index_config)?; - // TODO: Handle doc mapping and/or indexing settings update here. + if self + .model + .update_index_config(&index_uid, index_metadata.index_config)? + { + let _rebuild_plan_notifier = self.rebuild_plan_debounced(ctx); + } info!(%index_uid, "updated index"); Ok(Ok(response)) } diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs index c396e1ac23a..f8df027d8e0 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs @@ -168,6 +168,10 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec { if !source_config.enabled { continue; } + let params_fingerprint = model + .index_metadata(&source_uid.index_uid) + .map(|index_meta| index_meta.index_config.indexing_params_fingerprint()) + .unwrap_or_default(); match source_config.source_params { SourceParams::File(FileSourceParams::Filepath(_)) | SourceParams::IngestCli @@ -181,6 +185,7 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec { sources.push(SourceToSchedule { source_uid, source_type: SourceToScheduleType::IngestV1, + params_fingerprint, }); } SourceParams::Ingest => { @@ -206,6 +211,7 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec { shard_ids, load_per_shard, }, + params_fingerprint, }); } SourceParams::Kafka(_) @@ -221,6 +227,7 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec { load_per_pipeline: NonZeroU32::new(PIPELINE_FULL_CAPACITY.cpu_millis()) .unwrap(), }, + params_fingerprint, }); } } @@ -680,18 +687,21 @@ mod tests { index_uid: Some(index_uid.clone()), source_id: "source-1".to_string(), shard_ids: Vec::new(), + params_fingerprint: 0, }; let task_1b = IndexingTask { pipeline_uid: Some(PipelineUid::for_test(11u128)), index_uid: Some(index_uid.clone()), source_id: "source-1".to_string(), shard_ids: Vec::new(), + params_fingerprint: 0, }; let task_2 = IndexingTask { pipeline_uid: Some(PipelineUid::for_test(20u128)), index_uid: Some(index_uid.clone()), source_id: "source-2".to_string(), shard_ids: Vec::new(), + params_fingerprint: 0, }; running_plan.insert( "indexer-1".to_string(), @@ -712,12 +722,14 @@ mod tests { index_uid: Some(index_uid.clone()), source_id: "source-1".to_string(), shard_ids: Vec::new(), + params_fingerprint: 0, }; let task_2 = IndexingTask { pipeline_uid: Some(PipelineUid::for_test(2u128)), index_uid: Some(index_uid.clone()), source_id: "source-2".to_string(), shard_ids: Vec::new(), + params_fingerprint: 0, }; running_plan.insert("indexer-1".to_string(), vec![task_1.clone()]); desired_plan.insert("indexer-1".to_string(), vec![task_2.clone()]); @@ -744,12 +756,14 @@ mod tests { index_uid: Some(index_uid.clone()), source_id: "source-1".to_string(), shard_ids: Vec::new(), + params_fingerprint: 0, }; let task_2 = IndexingTask { pipeline_uid: Some(PipelineUid::for_test(2u128)), index_uid: Some(index_uid2.clone()), source_id: "source-2".to_string(), shard_ids: Vec::new(), + params_fingerprint: 0, }; running_plan.insert("indexer-2".to_string(), vec![task_2.clone()]); desired_plan.insert("indexer-1".to_string(), vec![task_1.clone()]); @@ -784,18 +798,21 @@ mod tests { index_uid: Some(index_uid.clone()), source_id: "source-1".to_string(), shard_ids: Vec::new(), + params_fingerprint: 0, }; let task_1b = IndexingTask { pipeline_uid: Some(PipelineUid::for_test(11u128)), index_uid: Some(index_uid.clone()), source_id: "source-1".to_string(), shard_ids: Vec::new(), + params_fingerprint: 0, }; let task_1c = IndexingTask { pipeline_uid: Some(PipelineUid::for_test(12u128)), index_uid: Some(index_uid.clone()), source_id: "source-1".to_string(), shard_ids: Vec::new(), + params_fingerprint: 0, }; running_plan.insert("indexer-1".to_string(), vec![task_1a.clone()]); desired_plan.insert( @@ -938,6 +955,7 @@ mod tests { num_pipelines: 3, load_per_pipeline: NonZeroU32::new(1_000).unwrap(), }, + params_fingerprint: 0, }, SourceToSchedule { source_uid: source_2.clone(), @@ -945,6 +963,7 @@ mod tests { num_pipelines: 2, load_per_pipeline: NonZeroU32::new(1_000).unwrap(), }, + params_fingerprint: 0, }, ]; let mut indexer_max_loads = FnvHashMap::default(); @@ -968,18 +987,21 @@ mod tests { source_id: "my-source".to_string(), pipeline_uid: Some(PipelineUid::random()), shard_ids: vec!["shard1".into()], + params_fingerprint: 0, }; let task2 = IndexingTask { index_uid: Some(IndexUid::for_test("index2", 123)), source_id: "my-source".to_string(), pipeline_uid: Some(PipelineUid::random()), shard_ids: vec!["shard2".into(), "shard3".into()], + params_fingerprint: 0, }; let task3 = IndexingTask { index_uid: Some(IndexUid::for_test("index3", 123)), source_id: "my-source".to_string(), pipeline_uid: Some(PipelineUid::random()), shard_ids: vec!["shard6".into()], + params_fingerprint: 0, }; // order made to map with the debug for lisibility map.insert("indexer5", vec![&task2]); diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs index fe7c33d1b46..35a14d7f316 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs @@ -159,6 +159,7 @@ fn convert_physical_plan_to_solution( pub struct SourceToSchedule { pub source_uid: SourceUid, pub source_type: SourceToScheduleType, + pub params_fingerprint: u64, } #[derive(Debug)] @@ -240,11 +241,18 @@ fn convert_scheduling_solution_to_physical_plan_single_node_single_source( .cloned() .collect(); remaining_num_shards_to_schedule_on_node -= shard_ids.len() as u32; + let pipeline_uid = if previous_task.params_fingerprint == source.params_fingerprint + { + previous_task.pipeline_uid + } else { + Some(PipelineUid::random()) + }; let new_task = IndexingTask { index_uid: previous_task.index_uid.clone(), source_id: previous_task.source_id.clone(), - pipeline_uid: previous_task.pipeline_uid, + pipeline_uid, shard_ids, + params_fingerprint: source.params_fingerprint, }; new_tasks.push(new_task); if new_tasks.len() >= max_num_pipelines as usize { @@ -263,12 +271,19 @@ fn convert_scheduling_solution_to_physical_plan_single_node_single_source( .take(remaining_num_shards_to_schedule_on_node as usize) .map(|task| (*task).clone()) .collect(); + for indexing_task in &mut indexing_tasks { + if indexing_task.params_fingerprint != source.params_fingerprint { + indexing_task.params_fingerprint = source.params_fingerprint; + indexing_task.pipeline_uid = Some(PipelineUid::random()); + } + } indexing_tasks.resize_with(remaining_num_shards_to_schedule_on_node as usize, || { IndexingTask { index_uid: Some(source.source_uid.index_uid.clone()), source_id: source.source_uid.source_id.clone(), pipeline_uid: Some(PipelineUid::random()), shard_ids: Vec::new(), + params_fingerprint: source.params_fingerprint, } }); indexing_tasks @@ -277,7 +292,12 @@ fn convert_scheduling_solution_to_physical_plan_single_node_single_source( // Ingest V1 is simple. One pipeline per indexer node. if let Some(indexing_task) = previous_tasks.first() { // The pipeline already exists, let's reuse it. - vec![(*indexing_task).clone()] + let mut indexing_task = (*indexing_task).clone(); + if indexing_task.params_fingerprint != source.params_fingerprint { + indexing_task.params_fingerprint = source.params_fingerprint; + indexing_task.pipeline_uid = Some(PipelineUid::random()); + } + vec![indexing_task] } else { // The source is new, we need to create a new task. vec![IndexingTask { @@ -285,6 +305,7 @@ fn convert_scheduling_solution_to_physical_plan_single_node_single_source( source_id: source.source_uid.source_id.clone(), pipeline_uid: Some(PipelineUid::random()), shard_ids: Vec::new(), + params_fingerprint: source.params_fingerprint, }] } } @@ -427,6 +448,7 @@ fn convert_scheduling_solution_to_physical_plan( &source.source_uid, max_shard_per_pipeline, &mut new_physical_plan, + source.params_fingerprint, ); } } @@ -528,6 +550,7 @@ fn add_shard_to_indexer( source_uid: &SourceUid, max_shard_per_pipeline: NonZeroU32, new_physical_plan: &mut PhysicalIndexingPlan, + params_fingerprint: u64, ) { let indexer_tasks = new_physical_plan .indexing_tasks_per_indexer_mut() @@ -553,6 +576,7 @@ fn add_shard_to_indexer( source_id: source_uid.source_id.clone(), pipeline_uid: Some(PipelineUid::random()), shard_ids: vec![missing_shard], + params_fingerprint, }); } } @@ -773,6 +797,7 @@ mod tests { ], load_per_shard: NonZeroU32::new(1_000).unwrap(), }, + params_fingerprint: 0, }; let source_1 = SourceToSchedule { source_uid: source_uid1.clone(), @@ -780,10 +805,12 @@ mod tests { num_pipelines: 2, load_per_pipeline: NonZeroU32::new(3_200).unwrap(), }, + params_fingerprint: 0, }; let source_2 = SourceToSchedule { source_uid: source_uid2.clone(), source_type: SourceToScheduleType::IngestV1, + params_fingerprint: 0, }; let mut indexer_id_to_cpu_capacities = FnvHashMap::default(); indexer_id_to_cpu_capacities.insert(indexer1.clone(), mcpu(16_000)); @@ -844,6 +871,7 @@ mod tests { shard_ids: vec![shard_ids[i].clone()], load_per_shard: NonZeroU32::new(250).unwrap(), }, + params_fingerprint: 0, }) .collect(); @@ -883,6 +911,7 @@ mod tests { num_pipelines: 2, load_per_pipeline: NonZeroU32::new(1000).unwrap(), }, + params_fingerprint: 0, }; let sources = vec![source_1]; @@ -925,6 +954,7 @@ mod tests { source_id: source_uid.source_id.clone(), pipeline_uid: Some(*pipeline_uid), shard_ids: shard_ids.to_vec(), + params_fingerprint: 0, }); } plan @@ -958,6 +988,7 @@ mod tests { ], load_per_shard: NonZeroU32::new(1_000).unwrap(), }, + params_fingerprint: 0, }]; let mut indexer_id_to_cpu_capacities = FnvHashMap::default(); indexer_id_to_cpu_capacities.insert("node1".to_string(), mcpu(10_000)); @@ -997,6 +1028,7 @@ mod tests { shard_ids: shard_ids.iter().copied().map(ShardId::from).collect(), load_per_shard: NonZeroU32::new(load_per_shard.cpu_millis()).unwrap(), }, + params_fingerprint: 0, }]; const NODE: &str = "node1"; let mut indexer_id_to_cpu_capacities = FnvHashMap::default(); @@ -1175,6 +1207,7 @@ mod tests { source_id: "_ingest-api-source".to_string(), }, source_type: SourceToScheduleType::IngestV1, + params_fingerprint: 0, }, SourceToSchedule { source_uid: SourceUid { @@ -1188,6 +1221,7 @@ mod tests { shard_ids: vec![ShardId::from(1)], load_per_shard: NonZeroU32::new(250).unwrap(), }, + params_fingerprint: 0, }, ]; let mut capacities = FnvHashMap::default(); @@ -1207,6 +1241,7 @@ mod tests { source_id: source_uid.source_id.to_string(), pipeline_uid: Some(PipelineUid::random()), shard_ids: vec![ShardId::from(1), ShardId::from(4), ShardId::from(5)], + params_fingerprint: 0, }; let previous_task2 = IndexingTask { index_uid: Some(source_uid.index_uid.clone()), @@ -1219,6 +1254,7 @@ mod tests { ShardId::from(9), ShardId::from(10), ], + params_fingerprint: 0, }; { let sharded_source = SourceToSchedule { @@ -1232,6 +1268,7 @@ mod tests { ], load_per_shard: NonZeroU32::new(1_000).unwrap(), }, + params_fingerprint: 0, }; let tasks = convert_scheduling_solution_to_physical_plan_single_node_single_source( 4, @@ -1257,6 +1294,7 @@ mod tests { ], load_per_shard: NonZeroU32::new(250).unwrap(), }, + params_fingerprint: 0, }; let tasks = convert_scheduling_solution_to_physical_plan_single_node_single_source( 4, @@ -1281,6 +1319,7 @@ mod tests { source_id: source_uid.source_id.to_string(), pipeline_uid: Some(pipeline_uid1), shard_ids: Vec::new(), + params_fingerprint: 0, }; let pipeline_uid2 = PipelineUid::random(); let previous_task2 = IndexingTask { @@ -1288,6 +1327,7 @@ mod tests { source_id: source_uid.source_id.to_string(), pipeline_uid: Some(pipeline_uid2), shard_ids: Vec::new(), + params_fingerprint: 0, }; { let sharded_source = SourceToSchedule { @@ -1296,6 +1336,7 @@ mod tests { num_pipelines: 1, load_per_pipeline: NonZeroU32::new(4000).unwrap(), }, + params_fingerprint: 0, }; let tasks = convert_scheduling_solution_to_physical_plan_single_node_single_source( 1, @@ -1314,6 +1355,7 @@ mod tests { num_pipelines: 0, load_per_pipeline: NonZeroU32::new(1_000).unwrap(), }, + params_fingerprint: 0, }; let tasks = convert_scheduling_solution_to_physical_plan_single_node_single_source( 0, @@ -1329,6 +1371,7 @@ mod tests { num_pipelines: 2, load_per_pipeline: NonZeroU32::new(1_000).unwrap(), }, + params_fingerprint: 0, }; let tasks = convert_scheduling_solution_to_physical_plan_single_node_single_source( 2, @@ -1350,6 +1393,7 @@ mod tests { num_pipelines: 2, load_per_pipeline: NonZeroU32::new(1_000).unwrap(), }, + params_fingerprint: 0, }; let tasks = convert_scheduling_solution_to_physical_plan_single_node_single_source( 2, diff --git a/quickwit/quickwit-control-plane/src/model/mod.rs b/quickwit/quickwit-control-plane/src/model/mod.rs index b378931c5eb..ca314233f6a 100644 --- a/quickwit/quickwit-control-plane/src/model/mod.rs +++ b/quickwit/quickwit-control-plane/src/model/mod.rs @@ -210,13 +210,15 @@ impl ControlPlaneModel { &mut self, index_uid: &IndexUid, index_config: IndexConfig, - ) -> anyhow::Result<()> { + ) -> anyhow::Result { let Some(index_model) = self.index_table.get_mut(index_uid) else { bail!("index `{}` not found", index_uid.index_id); }; + let fp_changed = index_model.index_config.indexing_params_fingerprint() + != index_config.indexing_params_fingerprint(); index_model.index_config = index_config; self.update_metrics(); - Ok(()) + Ok(fp_changed) } pub(crate) fn delete_index(&mut self, index_uid: &IndexUid) { diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index 87d827dd2e7..757d434adca 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -768,6 +768,7 @@ impl IndexingService { source_id: pipeline_handle.indexing_pipeline_id.source_id.clone(), pipeline_uid: Some(pipeline_handle.indexing_pipeline_id.pipeline_uid), shard_ids, + params_fingerprint: 0, } }) .collect(); @@ -1250,12 +1251,14 @@ mod tests { source_id: "test-indexing-service--source-1".to_string(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(0u128)), + params_fingerprint: 0, }, IndexingTask { index_uid: Some(metadata.index_uid.clone()), source_id: "test-indexing-service--source-1".to_string(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(1u128)), + params_fingerprint: 0, }, ]; indexing_service @@ -1294,24 +1297,28 @@ mod tests { source_id: INGEST_API_SOURCE_ID.to_string(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(3u128)), + params_fingerprint: 0, }, IndexingTask { index_uid: Some(metadata.index_uid.clone()), source_id: "test-indexing-service--source-1".to_string(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(1u128)), + params_fingerprint: 0, }, IndexingTask { index_uid: Some(metadata.index_uid.clone()), source_id: "test-indexing-service--source-1".to_string(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(2u128)), + params_fingerprint: 0, }, IndexingTask { index_uid: Some(metadata.index_uid.clone()), source_id: source_config_2.source_id.clone(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(4u128)), + params_fingerprint: 0, }, ]; indexing_service @@ -1352,18 +1359,21 @@ mod tests { source_id: INGEST_API_SOURCE_ID.to_string(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(3u128)), + params_fingerprint: 0, }, IndexingTask { index_uid: Some(metadata.index_uid.clone()), source_id: "test-indexing-service--source-1".to_string(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(1u128)), + params_fingerprint: 0, }, IndexingTask { index_uid: Some(metadata.index_uid.clone()), source_id: source_config_2.source_id.clone(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(4u128)), + params_fingerprint: 0, }, ]; indexing_service @@ -1799,18 +1809,21 @@ mod tests { source_id: "test-source".to_string(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(0)), + params_fingerprint: 0, }, IndexingTask { index_uid: Some(IndexUid::for_test("test-index-1", 0)), source_id: "test-source".to_string(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(1)), + params_fingerprint: 0, }, IndexingTask { index_uid: Some(IndexUid::for_test("test-index-2", 0)), source_id: "test-source".to_string(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(2)), + params_fingerprint: 0, }, ], }) diff --git a/quickwit/quickwit-integration-tests/src/tests/update_tests/mod.rs b/quickwit/quickwit-integration-tests/src/tests/update_tests/mod.rs index 4871be449ef..ad6bb67bcc5 100644 --- a/quickwit/quickwit-integration-tests/src/tests/update_tests/mod.rs +++ b/quickwit/quickwit-integration-tests/src/tests/update_tests/mod.rs @@ -64,4 +64,5 @@ async fn assert_hits_unordered( } mod doc_mapping_tests; +mod restart_indexer_tests; mod search_settings_tests; diff --git a/quickwit/quickwit-integration-tests/src/tests/update_tests/restart_indexer_tests.rs b/quickwit/quickwit-integration-tests/src/tests/update_tests/restart_indexer_tests.rs new file mode 100644 index 00000000000..2b1344bbb19 --- /dev/null +++ b/quickwit/quickwit-integration-tests/src/tests/update_tests/restart_indexer_tests.rs @@ -0,0 +1,241 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::fmt::Write; +use std::time::Duration; + +use quickwit_config::service::QuickwitService; +use quickwit_metastore::SplitState; +use quickwit_proto::types::DocMappingUid; +use quickwit_rest_client::models::IngestSource; +use quickwit_rest_client::rest_client::CommitType; +use quickwit_serve::ListSplitsQueryParams; +use serde_json::json; + +use crate::test_utils::ClusterSandboxBuilder; + +#[tokio::test] +async fn test_update_doc_mapping_restart_indexing_pipeline() { + let index_id = "update-restart-ingest"; + quickwit_common::setup_logging_for_tests(); + let sandbox = ClusterSandboxBuilder::default() + .add_node([ + QuickwitService::Searcher, + QuickwitService::Metastore, + QuickwitService::Indexer, + QuickwitService::ControlPlane, + QuickwitService::Janitor, + ]) + .build_and_start() + .await; + + { + // Wait for indexer to fully start. + // The starting time is a bit long for a cluster. + tokio::time::sleep(Duration::from_secs(3)).await; + let indexing_service_counters = sandbox + .indexer_rest_client + .node_stats() + .indexing() + .await + .unwrap(); + assert_eq!(indexing_service_counters.num_running_pipelines, 0); + } + + // usually these are choosen by quickwit, but actually the client can specify them + // and we do here to simplify the test + let initial_mapping_uid = DocMappingUid::for_test(1); + let final_mapping_uid = DocMappingUid::for_test(2); + + // Create index + sandbox + .indexer_rest_client + .indexes() + .create( + json!({ + "version": "0.8", + "index_id": index_id, + "doc_mapping": { + "doc_mapping_uid": initial_mapping_uid, + "field_mappings": [ + {"name": "body", "type": "u64"} + ] + }, + "indexing_settings": { + "commit_timeout_secs": 1 + }, + }) + .to_string(), + quickwit_config::ConfigFormat::Json, + false, + ) + .await + .unwrap(); + + assert!(sandbox + .indexer_rest_client + .node_health() + .is_live() + .await + .unwrap()); + + // Wait until indexing pipelines are started. + sandbox.wait_for_indexing_pipelines(1).await.unwrap(); + + let payload = (0..1000).fold(String::new(), |mut buffer, id| { + writeln!(&mut buffer, "{{\"body\": {id}}}").unwrap(); + buffer + }); + + // ingest some documents with old doc mapping. + // we *don't* use local ingest to use a normal indexing pipeline + sandbox + .indexer_rest_client + .ingest( + index_id, + IngestSource::Str(payload.clone()), + None, + None, + CommitType::Auto, + ) + .await + .unwrap(); + + // we wait for a new split. We don't want to force commits to let the pipeline behave as if in + // a steady state. + sandbox + .wait_for_splits(index_id, Some(vec![SplitState::Published]), 1) + .await + .unwrap(); + + // we ingest again, this might end up with the new or old doc mapping depending on how quickly + // the pipeline gets killed and restarted (in practice as this cluster is very lightly loaded, + // it will almost always kill the pipeline before these documents are commited) + sandbox + .indexer_rest_client + .ingest( + index_id, + IngestSource::Str(payload.clone()), + None, + None, + CommitType::Auto, + ) + .await + .unwrap(); + + // Update index + sandbox + .searcher_rest_client + .indexes() + .update( + index_id, + json!({ + "version": "0.8", + "index_id": index_id, + "doc_mapping": { + "doc_mapping_uid": final_mapping_uid, + "field_mappings": [ + {"name": "body", "type": "i64"} + ] + }, + "indexing_settings": { + "commit_timeout_secs": 1, + }, + }) + .to_string(), + quickwit_config::ConfigFormat::Json, + ) + .await + .unwrap(); + + // we ingest again, this might end up with the new or old doc mapping depending on how quickly + // the pipeline gets killed and restarted. In practice this will almost always use the new + // mapping on a lightly loaded cluster. + sandbox + .indexer_rest_client + .ingest( + index_id, + IngestSource::Str(payload.clone()), + None, + None, + CommitType::Auto, + ) + .await + .unwrap(); + + // we wait for a 2nd split, though it might still be there if it contains only batch 2 and not + // batch 3. + sandbox + .wait_for_splits(index_id, Some(vec![SplitState::Published]), 2) + .await + .unwrap(); + + // we ingest again, definitely with the up to date doc mapper this time + sandbox + .indexer_rest_client + .ingest( + index_id, + IngestSource::Str(payload.clone()), + None, + None, + CommitType::Auto, + ) + .await + .unwrap(); + + // wait for a last commit + sandbox + .wait_for_splits(index_id, Some(vec![SplitState::Published]), 3) + .await + .unwrap(); + + let splits = sandbox + .indexer_rest_client + .splits(index_id) + .list(ListSplitsQueryParams::default()) + .await + .unwrap(); + + // we expect 3 splits, with all docs, and at least one split under old mapping and one under + // new mapping + assert_eq!(splits.len(), 3); + assert!( + splits + .iter() + .filter(|split| split.split_metadata.doc_mapping_uid == initial_mapping_uid) + .count() + > 0 + ); + assert!( + splits + .iter() + .filter(|split| split.split_metadata.doc_mapping_uid == final_mapping_uid) + .count() + > 0 + ); + assert_eq!( + splits + .iter() + .map(|split| split.split_metadata.num_docs) + .sum::(), + 4000 + ); + + sandbox.shutdown().await.unwrap(); +} diff --git a/quickwit/quickwit-proto/protos/quickwit/indexing.proto b/quickwit/quickwit-proto/protos/quickwit/indexing.proto index 2eeb31d784a..294f301bc4b 100644 --- a/quickwit/quickwit-proto/protos/quickwit/indexing.proto +++ b/quickwit/quickwit-proto/protos/quickwit/indexing.proto @@ -46,6 +46,9 @@ message IndexingTask { PipelineUid pipeline_uid = 4; // The shards assigned to the indexer. repeated quickwit.ingest.ShardId shard_ids = 3; + // Fingerprint of the pipeline parameters. Anything that should cause a pipeline restart (such + // as updating indexing settings or doc mapping) should influence this value. + uint64 params_fingerprint = 6; } message ApplyIndexingPlanResponse {} diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.indexing.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.indexing.rs index 60a31926f60..ae0ef465968 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.indexing.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.indexing.rs @@ -21,6 +21,10 @@ pub struct IndexingTask { /// The shards assigned to the indexer. #[prost(message, repeated, tag = "3")] pub shard_ids: ::prost::alloc::vec::Vec, + /// Fingerprint of the pipeline parameters. Anything that should cause a pipeline restart (such + /// as updating indexing settings or doc mapping) should influence this value. + #[prost(uint64, tag = "6")] + pub params_fingerprint: u64, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 9b816333528..ab244c9297a 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -1437,6 +1437,7 @@ mod tests { index_uid: Some(IndexUid::for_test("test-index", 0)), source_id: "test-source".to_string(), shard_ids: Vec::new(), + params_fingerprint: 0, }; let updated_indexer_node = ClusterNode::for_test( "test-indexer-node",