diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 5cdf8ef0f3b..4b31ba8ae98 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -5414,6 +5414,7 @@ dependencies = [ "tonic 0.9.2", "tower", "tracing", + "ulid", ] [[package]] diff --git a/quickwit/quickwit-cli/src/tool.rs b/quickwit/quickwit-cli/src/tool.rs index b3949149658..7e4d0552216 100644 --- a/quickwit/quickwit-cli/src/tool.rs +++ b/quickwit/quickwit-cli/src/tool.rs @@ -52,7 +52,7 @@ use quickwit_metastore::IndexMetadataResponseExt; use quickwit_proto::indexing::CpuCapacity; use quickwit_proto::metastore::{IndexMetadataRequest, MetastoreService, MetastoreServiceClient}; use quickwit_proto::search::{CountHits, SearchResponse}; -use quickwit_proto::types::NodeId; +use quickwit_proto::types::{NodeId, PipelineUid}; use quickwit_search::{single_node_search, SearchResponseRest}; use quickwit_serve::{ search_request_from_api_request, BodyFormat, SearchRequestQueryString, SortBy, @@ -473,7 +473,7 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result< .ask_for_res(SpawnPipeline { index_id: args.index_id.clone(), source_config, - pipeline_ord: 0, + pipeline_uid: PipelineUid::from_u128(0u128), }) .await?; let merge_pipeline_handle = indexing_server_mailbox @@ -613,7 +613,7 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> { transform_config: None, input_format: SourceInputFormat::Json, }, - pipeline_ord: 0, + pipeline_uid: PipelineUid::from_u128(0u128), }) .await?; let pipeline_handle: ActorHandle = indexing_service_mailbox diff --git a/quickwit/quickwit-cluster/src/cluster.rs b/quickwit/quickwit-cluster/src/cluster.rs index dd615701c13..460575fddc4 100644 --- a/quickwit/quickwit-cluster/src/cluster.rs +++ b/quickwit/quickwit-cluster/src/cluster.rs @@ -20,6 +20,7 @@ use std::collections::{BTreeMap, HashMap, HashSet}; use std::fmt::{Debug, Display}; use std::net::SocketAddr; +use std::str::FromStr; use std::sync::Arc; use std::time::Duration; @@ -32,7 +33,7 @@ use chitchat::{ use futures::Stream; use itertools::Itertools; use quickwit_proto::indexing::{IndexingPipelineId, IndexingTask, PipelineMetrics}; -use quickwit_proto::types::NodeId; +use quickwit_proto::types::{NodeId, PipelineUid, ShardId}; use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc, watch, Mutex, RwLock}; use tokio::time::timeout; @@ -61,8 +62,8 @@ const MARKED_FOR_DELETION_GRACE_PERIOD: usize = if cfg!(any(test, feature = "tes }; // An indexing task key is formatted as -// `{INDEXING_TASK_PREFIX}{INDEXING_TASK_SEPARATOR}{index_id}{INDEXING_TASK_SEPARATOR}{source_id}`. -const INDEXING_TASK_PREFIX: &str = "indexing_task:"; +// `{INDEXING_TASK_PREFIX}{PIPELINE_ULID}`. +const INDEXING_TASK_PREFIX: &str = "indexer.task:"; #[derive(Clone)] pub struct Cluster { @@ -432,45 +433,16 @@ pub fn parse_indexing_tasks(node_state: &NodeState) -> Vec { None } }) - .flat_map(|(indexing_task_key, pipeline_shard_str)| { - parse_indexing_task_key_value(indexing_task_key, pipeline_shard_str) + .flat_map(|(key, value)| { + let indexing_task_opt = chitchat_kv_to_indexing_task(key, value); + if indexing_task_opt.is_none() { + warn!(key=%key, value=%value, "failed to parse indexing task from chitchat kv"); + } + indexing_task_opt }) .collect() } -/// Parses indexing task key into the IndexingTask. -/// Malformed keys and values are ignored, just warnings are emitted. -fn parse_indexing_task_key_value( - indexing_task_key: &str, - pipeline_shards_str: &str, -) -> Vec { - let Some(index_uid_source_id) = indexing_task_key.strip_prefix(INDEXING_TASK_PREFIX) else { - warn!( - indexing_task_key = indexing_task_key, - "indexing task must start by the prefix `{INDEXING_TASK_PREFIX}`" - ); - return Vec::new(); - }; - let Some((index_uid_str, source_id_str)) = index_uid_source_id.rsplit_once(':') else { - warn!(index_uid_source_id=%index_uid_source_id, "invalid index task format, cannot find index_uid and source_id"); - return Vec::new(); - }; - match deserialize_pipeline_shards(pipeline_shards_str) { - Ok(pipeline_shards) => pipeline_shards - .into_iter() - .map(|shard_ids| IndexingTask { - index_uid: index_uid_str.to_string(), - source_id: source_id_str.to_string(), - shard_ids, - }) - .collect(), - Err(error) => { - warn!(error=%error, "failed to parse pipeline shard list"); - Vec::new() - } - } -} - /// Writes the given indexing tasks in the given node state. /// /// If previous indexing tasks were present in the node state but were not in the given tasks, they @@ -483,73 +455,53 @@ pub fn set_indexing_tasks_in_node_state( .iter_prefix(INDEXING_TASK_PREFIX) .map(|(key, _)| key.to_string()) .collect(); - let mut indexing_tasks_grouped_by_source: HashMap<(&str, &str), Vec<&IndexingTask>> = - HashMap::new(); for indexing_task in indexing_tasks { - indexing_tasks_grouped_by_source - .entry(( - indexing_task.index_uid.as_str(), - indexing_task.source_id.as_str(), - )) - .or_default() - .push(indexing_task); - } - for ((index_uid, source_id), indexing_tasks) in indexing_tasks_grouped_by_source { - let shards_per_pipeline = indexing_tasks - .iter() - .map(|indexing_task| &indexing_task.shard_ids[..]); - let pipeline_shards_str: String = serialize_pipeline_shards(shards_per_pipeline); - let key = format!("{INDEXING_TASK_PREFIX}{index_uid}:{source_id}"); + let (key, value) = indexing_task_to_chitchat_kv(indexing_task); current_indexing_tasks_keys.remove(&key); - node_state.set(key, pipeline_shards_str); + node_state.set(key, value); } for obsolete_task_key in current_indexing_tasks_keys { node_state.mark_for_deletion(&obsolete_task_key); } } -/// Given a list of list of shards (one list per pipeline), serializes it as a string to be stored -/// as a value in the chitchat state. -/// -/// The format is as follows `[1,2,3][4,5]`. -fn serialize_pipeline_shards<'a>(pipeline_shards: impl Iterator) -> String { - let mut pipeline_shards_str = String::new(); - for shards in pipeline_shards { - pipeline_shards_str.push('['); - pipeline_shards_str.push_str(&shards.iter().join(",")); - pipeline_shards_str.push(']'); - } - pipeline_shards_str +fn indexing_task_to_chitchat_kv(indexing_task: &IndexingTask) -> (String, String) { + let IndexingTask { + pipeline_uid: _, + index_uid, + source_id, + shard_ids, + } = indexing_task; + let key = format!("{INDEXING_TASK_PREFIX}{}", indexing_task.pipeline_uid()); + let shards_str = shard_ids.iter().sorted().join(","); + let value = format!("{index_uid}:{source_id}:{shards_str}"); + (key, value) } -/// Deserializes the list of shards from a string stored in the chitchat state, as -/// serialized by `serialize_pipeline_shards`. -/// -/// This function will make sure the pipeline shard lists are sorted. -pub fn deserialize_pipeline_shards(pipeline_shards_str: &str) -> anyhow::Result>> { - if pipeline_shards_str.is_empty() { - return Ok(Vec::new()); +fn parse_shards_str(shards_str: &str) -> Option> { + if shards_str.is_empty() { + return Some(Vec::new()); } - let mut pipeline_shards: Vec> = Vec::new(); - for single_pipeline_shard_str in pipeline_shards_str.split(']') { - if single_pipeline_shard_str.is_empty() { - continue; - } - let Some(comma_sep_shards_str) = single_pipeline_shard_str.strip_prefix('[') else { - anyhow::bail!("invalid pipeline shards string: `{pipeline_shards_str}`"); - }; - let mut shards: Vec = Vec::new(); - if !comma_sep_shards_str.is_empty() { - for shard_str in comma_sep_shards_str.split(',') { - let shard_id: u64 = shard_str.parse::()?; - shards.push(shard_id); - } - } - shards.sort(); - pipeline_shards.push(shards); + let mut shard_ids = Vec::new(); + for shard_str in shards_str.split(',') { + let shard_id = shard_str.parse::().ok()?; + shard_ids.push(shard_id); } - pipeline_shards.sort_by_key(|shards| shards.first().copied()); - Ok(pipeline_shards) + Some(shard_ids) +} + +fn chitchat_kv_to_indexing_task(key: &str, value: &str) -> Option { + let pipeline_uid_str = key.strip_prefix(INDEXING_TASK_PREFIX)?; + let pipeline_uid = PipelineUid::from_str(pipeline_uid_str).ok()?; + let (source_uid, shards_str) = value.rsplit_once(':')?; + let (index_uid, source_id) = source_uid.rsplit_once(':')?; + let shard_ids = parse_shards_str(shards_str)?; + Some(IndexingTask { + index_uid: index_uid.to_string(), + source_id: source_id.to_string(), + pipeline_uid: Some(pipeline_uid), + shard_ids, + }) } async fn spawn_ready_nodes_change_stream_task(cluster: Cluster) { @@ -717,7 +669,6 @@ pub async fn create_cluster_for_test( transport: &dyn Transport, self_node_readiness: bool, ) -> anyhow::Result { - use std::str::FromStr; use std::sync::atomic::{AtomicU16, Ordering}; use quickwit_config::service::QuickwitService; @@ -944,7 +895,14 @@ mod tests { ) .await .unwrap(); - let indexing_task = IndexingTask { + let indexing_task1 = IndexingTask { + pipeline_uid: Some(PipelineUid::from_u128(1u128)), + index_uid: "index-1:11111111111111111111111111".to_string(), + source_id: "source-1".to_string(), + shard_ids: Vec::new(), + }; + let indexing_task2 = IndexingTask { + pipeline_uid: Some(PipelineUid::from_u128(2u128)), index_uid: "index-1:11111111111111111111111111".to_string(), source_id: "source-1".to_string(), shard_ids: Vec::new(), @@ -953,7 +911,7 @@ mod tests { .set_self_key_value(GRPC_ADVERTISE_ADDR_KEY, "127.0.0.1:1001") .await; cluster2 - .update_self_node_indexing_tasks(&[indexing_task.clone(), indexing_task.clone()]) + .update_self_node_indexing_tasks(&[indexing_task1.clone(), indexing_task2.clone()]) .await .unwrap(); cluster1 @@ -982,9 +940,10 @@ mod tests { member_node_2.enabled_services, HashSet::from_iter([QuickwitService::Indexer, QuickwitService::Metastore].into_iter()) ); + assert_eq!( - member_node_2.indexing_tasks, - vec![indexing_task.clone(), indexing_task.clone()] + &member_node_2.indexing_tasks, + &[indexing_task1, indexing_task2] ); } @@ -1017,10 +976,11 @@ mod tests { let mut random_generator = rand::thread_rng(); // TODO: increase it back to 1000 when https://github.com/quickwit-oss/chitchat/issues/81 is fixed let indexing_tasks = (0..500) - .map(|_| { + .map(|pipeline_id| { let index_id = random_generator.gen_range(0..=10_000); let source_id = random_generator.gen_range(0..=100); IndexingTask { + pipeline_uid: Some(PipelineUid::from_u128(pipeline_id as u128)), index_uid: format!("index-{index_id}:11111111111111111111111111"), source_id: format!("source-{source_id}"), shard_ids: Vec::new(), @@ -1130,19 +1090,19 @@ mod tests { let chitchat_handle = node.inner.read().await.chitchat_handle.chitchat(); let mut chitchat_guard = chitchat_handle.lock().await; chitchat_guard.self_node_state().set( - format!("{INDEXING_TASK_PREFIX}my_good_index:my_source:11111111111111111111111111"), - "[][]".to_string(), + format!("{INDEXING_TASK_PREFIX}01BX5ZZKBKACTAV9WEVGEMMVS0"), + "my_index:uid:my_source:1,3".to_string(), ); chitchat_guard.self_node_state().set( - format!("{INDEXING_TASK_PREFIX}my_bad_index:my_source:11111111111111111111111111"), - "malformatted value".to_string(), + format!("{INDEXING_TASK_PREFIX}01BX5ZZKBKACTAV9WEVGEMMVS1"), + "my_index:uid:my_source:3a,5".to_string(), ); } node.wait_for_ready_members(|members| members.len() == 1, Duration::from_secs(5)) .await .unwrap(); let ready_members = node.ready_members().await; - assert_eq!(ready_members[0].indexing_tasks.len(), 2); + assert_eq!(ready_members[0].indexing_tasks.len(), 1); } #[tokio::test] @@ -1233,40 +1193,6 @@ mod tests { Ok(()) } - fn test_serialize_pipeline_shards_aux(pipeline_shards: &[&[u64]], expected_str: &str) { - let pipeline_shards_str = serialize_pipeline_shards(pipeline_shards.iter().copied()); - assert_eq!(pipeline_shards_str, expected_str); - let ser_deser_pipeline_shards = deserialize_pipeline_shards(&pipeline_shards_str).unwrap(); - assert_eq!(pipeline_shards, ser_deser_pipeline_shards); - } - - #[test] - fn test_serialize_pipeline_shards() { - test_serialize_pipeline_shards_aux(&[], ""); - test_serialize_pipeline_shards_aux(&[&[]], "[]"); - test_serialize_pipeline_shards_aux(&[&[1]], "[1]"); - test_serialize_pipeline_shards_aux(&[&[1, 2]], "[1,2]"); - test_serialize_pipeline_shards_aux(&[&[], &[1, 2]], "[][1,2]"); - test_serialize_pipeline_shards_aux(&[&[], &[]], "[][]"); - test_serialize_pipeline_shards_aux(&[&[1], &[3, 4, 5, 6]], "[1][3,4,5,6]"); - } - - #[test] - fn test_deserialize_pipeline_shards_sorts() { - assert_eq!( - deserialize_pipeline_shards("[2,1]").unwrap(), - vec![vec![1, 2]] - ); - assert_eq!( - deserialize_pipeline_shards("[1][]").unwrap(), - vec![vec![], vec![1]] - ); - assert_eq!( - deserialize_pipeline_shards("[3][2]").unwrap(), - vec![vec![2], vec![3]] - ); - } - fn test_serialize_indexing_tasks_aux( indexing_tasks: &[IndexingTask], node_state: &mut NodeState, @@ -1282,6 +1208,7 @@ mod tests { test_serialize_indexing_tasks_aux(&[], &mut node_state); test_serialize_indexing_tasks_aux( &[IndexingTask { + pipeline_uid: Some(PipelineUid::from_u128(1u128)), index_uid: "test:test1".to_string(), source_id: "my-source1".to_string(), shard_ids: vec![1, 2], @@ -1291,6 +1218,7 @@ mod tests { // change in the set of shards test_serialize_indexing_tasks_aux( &[IndexingTask { + pipeline_uid: Some(PipelineUid::from_u128(2u128)), index_uid: "test:test1".to_string(), source_id: "my-source1".to_string(), shard_ids: vec![1, 2, 3], @@ -1300,11 +1228,13 @@ mod tests { test_serialize_indexing_tasks_aux( &[ IndexingTask { + pipeline_uid: Some(PipelineUid::from_u128(1u128)), index_uid: "test:test1".to_string(), source_id: "my-source1".to_string(), shard_ids: vec![1, 2], }, IndexingTask { + pipeline_uid: Some(PipelineUid::from_u128(2u128)), index_uid: "test:test1".to_string(), source_id: "my-source1".to_string(), shard_ids: vec![3, 4], @@ -1316,11 +1246,13 @@ mod tests { test_serialize_indexing_tasks_aux( &[ IndexingTask { + pipeline_uid: Some(PipelineUid::from_u128(1u128)), index_uid: "test:test1".to_string(), source_id: "my-source1".to_string(), shard_ids: vec![1, 2], }, IndexingTask { + pipeline_uid: Some(PipelineUid::from_u128(2u128)), index_uid: "test:test2".to_string(), source_id: "my-source1".to_string(), shard_ids: vec![3, 4], @@ -1332,11 +1264,13 @@ mod tests { test_serialize_indexing_tasks_aux( &[ IndexingTask { + pipeline_uid: Some(PipelineUid::from_u128(1u128)), index_uid: "test:test1".to_string(), source_id: "my-source1".to_string(), shard_ids: vec![1, 2], }, IndexingTask { + pipeline_uid: Some(PipelineUid::from_u128(2u128)), index_uid: "test:test1".to_string(), source_id: "my-source2".to_string(), shard_ids: vec![3, 4], @@ -1345,4 +1279,32 @@ mod tests { &mut node_state, ); } + + #[test] + fn test_parse_shards_str() { + assert!(parse_shards_str("").unwrap().is_empty()); + assert_eq!(parse_shards_str("12").unwrap(), vec![12]); + assert_eq!(parse_shards_str("12,23").unwrap(), vec![12, 23]); + assert!(parse_shards_str("12,23,").is_none()); + assert!(parse_shards_str("12,23a,32").is_none()); + } + + #[test] + fn test_parse_chitchat_kv() { + assert!( + chitchat_kv_to_indexing_task("invalidulid", "my_index:uid:my_source:1,3").is_none() + ); + let task = super::chitchat_kv_to_indexing_task( + "indexer.task:01BX5ZZKBKACTAV9WEVGEMMVS0", + "my_index:uid:my_source:1,3", + ) + .unwrap(); + assert_eq!( + task.pipeline_uid(), + PipelineUid::from_str("01BX5ZZKBKACTAV9WEVGEMMVS0").unwrap() + ); + assert_eq!(&task.index_uid, "my_index:uid"); + assert_eq!(&task.source_id, "my_source"); + assert_eq!(&task.shard_ids, &[1, 3]); + } } diff --git a/quickwit/quickwit-control-plane/Cargo.toml b/quickwit/quickwit-control-plane/Cargo.toml index 38589ced4af..7ed4b15a785 100644 --- a/quickwit/quickwit-control-plane/Cargo.toml +++ b/quickwit/quickwit-control-plane/Cargo.toml @@ -30,6 +30,7 @@ tokio-stream = { workspace = true } tonic = { workspace = true } tower = { workspace = true } tracing = { workspace = true } +ulid = { workspace = true } quickwit-actors = { workspace = true } quickwit-common = { workspace = true } diff --git a/quickwit/quickwit-control-plane/src/indexing_plan.rs b/quickwit/quickwit-control-plane/src/indexing_plan.rs index f266b572940..1aa6b9747bb 100644 --- a/quickwit/quickwit-control-plane/src/indexing_plan.rs +++ b/quickwit/quickwit-control-plane/src/indexing_plan.rs @@ -52,6 +52,11 @@ impl PhysicalIndexingPlan { &self.indexing_tasks_per_indexer_id } + /// Returns the hashmap of (indexer ID, indexing tasks). + pub fn indexing_tasks_per_indexer_mut(&mut self) -> &mut FnvHashMap> { + &mut self.indexing_tasks_per_indexer_id + } + /// Returns the hashmap of (indexer ID, indexing tasks). pub fn indexer(&self, indexer_id: &str) -> Option<&[IndexingTask]> { self.indexing_tasks_per_indexer_id @@ -65,7 +70,17 @@ impl PhysicalIndexingPlan { left.index_uid .cmp(&right.index_uid) .then_with(|| left.source_id.cmp(&right.source_id)) + .then_with(|| { + left.shard_ids + .first() + .copied() + .cmp(&right.shard_ids.first().copied()) + }) + .then_with(|| left.pipeline_uid().cmp(&right.pipeline_uid())) }); + for task in tasks { + task.shard_ids.sort(); + } } } } diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs index cd46c889509..f52de199ceb 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs @@ -470,7 +470,7 @@ mod tests { use proptest::{prop_compose, proptest}; use quickwit_config::{IndexConfig, KafkaSourceParams, SourceConfig, SourceParams}; use quickwit_metastore::IndexMetadata; - use quickwit_proto::types::{IndexUid, SourceUid}; + use quickwit_proto::types::{IndexUid, PipelineUid, SourceUid}; use super::*; #[test] @@ -485,22 +485,30 @@ mod tests { let mut running_plan = FnvHashMap::default(); let mut desired_plan = FnvHashMap::default(); let task_1 = IndexingTask { + pipeline_uid: Some(PipelineUid::from_u128(10u128)), + index_uid: "index-1:11111111111111111111111111".to_string(), + source_id: "source-1".to_string(), + shard_ids: Vec::new(), + }; + let task_1b = IndexingTask { + pipeline_uid: Some(PipelineUid::from_u128(11u128)), index_uid: "index-1:11111111111111111111111111".to_string(), source_id: "source-1".to_string(), shard_ids: Vec::new(), }; let task_2 = IndexingTask { + pipeline_uid: Some(PipelineUid::from_u128(20u128)), index_uid: "index-1:11111111111111111111111111".to_string(), source_id: "source-2".to_string(), shard_ids: Vec::new(), }; running_plan.insert( "indexer-1".to_string(), - vec![task_1.clone(), task_1.clone(), task_2.clone()], + vec![task_1.clone(), task_1b.clone(), task_2.clone()], ); desired_plan.insert( "indexer-1".to_string(), - vec![task_2, task_1.clone(), task_1], + vec![task_2, task_1.clone(), task_1b.clone()], ); let indexing_plans_diff = get_indexing_plans_diff(&running_plan, &desired_plan); assert!(indexing_plans_diff.is_empty()); @@ -509,11 +517,13 @@ mod tests { let mut running_plan = FnvHashMap::default(); let mut desired_plan = FnvHashMap::default(); let task_1 = IndexingTask { + pipeline_uid: Some(PipelineUid::from_u128(1u128)), index_uid: "index-1:11111111111111111111111111".to_string(), source_id: "source-1".to_string(), shard_ids: Vec::new(), }; let task_2 = IndexingTask { + pipeline_uid: Some(PipelineUid::from_u128(2u128)), index_uid: "index-1:11111111111111111111111111".to_string(), source_id: "source-2".to_string(), shard_ids: Vec::new(), @@ -539,11 +549,13 @@ mod tests { let mut running_plan = FnvHashMap::default(); let mut desired_plan = FnvHashMap::default(); let task_1 = IndexingTask { + pipeline_uid: Some(PipelineUid::from_u128(1u128)), index_uid: "index-1:11111111111111111111111111".to_string(), source_id: "source-1".to_string(), shard_ids: Vec::new(), }; let task_2 = IndexingTask { + pipeline_uid: Some(PipelineUid::from_u128(2u128)), index_uid: "index-2:11111111111111111111111111".to_string(), source_id: "source-2".to_string(), shard_ids: Vec::new(), @@ -576,48 +588,37 @@ mod tests { // Diff with 3 same tasks running but only one on the desired plan. let mut running_plan = FnvHashMap::default(); let mut desired_plan = FnvHashMap::default(); - let task_1 = IndexingTask { + let task_1a = IndexingTask { + pipeline_uid: Some(PipelineUid::from_u128(10u128)), index_uid: "index-1:11111111111111111111111111".to_string(), source_id: "source-1".to_string(), shard_ids: Vec::new(), }; - running_plan.insert("indexer-1".to_string(), vec![task_1.clone()]); - desired_plan.insert( - "indexer-1".to_string(), - vec![task_1.clone(), task_1.clone(), task_1.clone()], - ); - - let indexing_plans_diff = get_indexing_plans_diff(&running_plan, &desired_plan); - assert!(!indexing_plans_diff.is_empty()); - assert!(indexing_plans_diff.has_same_nodes()); - assert!(!indexing_plans_diff.has_same_tasks()); - assert_eq!( - indexing_plans_diff.missing_tasks_by_node_id, - FnvHashMap::from_iter([("indexer-1", vec![&task_1, &task_1])]) - ); - } - { - // Diff with 3 same tasks on desired plan but only one running. - let mut running_plan = FnvHashMap::default(); - let mut desired_plan = FnvHashMap::default(); - let task_1 = IndexingTask { + let task_1b = IndexingTask { + pipeline_uid: Some(PipelineUid::from_u128(11u128)), index_uid: "index-1:11111111111111111111111111".to_string(), source_id: "source-1".to_string(), shard_ids: Vec::new(), }; - running_plan.insert( + let task_1c = IndexingTask { + pipeline_uid: Some(PipelineUid::from_u128(12u128)), + index_uid: "index-1:11111111111111111111111111".to_string(), + source_id: "source-1".to_string(), + shard_ids: Vec::new(), + }; + running_plan.insert("indexer-1".to_string(), vec![task_1a.clone()]); + desired_plan.insert( "indexer-1".to_string(), - vec![task_1.clone(), task_1.clone(), task_1.clone()], + vec![task_1a.clone(), task_1b.clone(), task_1c.clone()], ); - desired_plan.insert("indexer-1".to_string(), vec![task_1.clone()]); let indexing_plans_diff = get_indexing_plans_diff(&running_plan, &desired_plan); assert!(!indexing_plans_diff.is_empty()); assert!(indexing_plans_diff.has_same_nodes()); assert!(!indexing_plans_diff.has_same_tasks()); assert_eq!( - indexing_plans_diff.unplanned_tasks_by_node_id, - FnvHashMap::from_iter([("indexer-1", vec![&task_1, &task_1])]) + indexing_plans_diff.missing_tasks_by_node_id, + FnvHashMap::from_iter([("indexer-1", vec![&task_1b, &task_1c])]) ); } } diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs index 63f12a87817..398f55f8251 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs @@ -20,19 +20,17 @@ pub mod scheduling_logic; pub mod scheduling_logic_model; -use std::collections::hash_map::Entry; use std::num::NonZeroU32; -use fnv::FnvHashMap; +use fnv::{FnvHashMap, FnvHashSet}; use quickwit_proto::indexing::{CpuCapacity, IndexingTask}; -use quickwit_proto::types::{IndexUid, ShardId, SourceUid}; +use quickwit_proto::types::{IndexUid, PipelineUid, ShardId, SourceUid}; use scheduling_logic_model::{IndexerOrd, SourceOrd}; -use tracing::error; -use tracing::log::warn; +use tracing::{error, warn}; use crate::indexing_plan::PhysicalIndexingPlan; use crate::indexing_scheduler::scheduling::scheduling_logic_model::{ - SchedulingProblem, SchedulingSolution, + IndexerAssignment, SchedulingProblem, SchedulingSolution, }; /// If we have several pipelines below this threshold we @@ -51,43 +49,6 @@ const CPU_PER_PIPELINE_LOAD_THRESHOLD: CpuCapacity = CpuCapacity::from_cpu_milli /// That's 80% of a period const MAX_LOAD_PER_PIPELINE: CpuCapacity = CpuCapacity::from_cpu_millis(3_200); -fn indexing_task(source_uid: SourceUid, shard_ids: Vec) -> IndexingTask { - IndexingTask { - index_uid: source_uid.index_uid.to_string(), - source_id: source_uid.source_id, - shard_ids, - } -} -fn create_shard_to_indexer_map( - physical_plan: &PhysicalIndexingPlan, - id_to_ord_map: &IdToOrdMap, -) -> FnvHashMap> { - let mut source_to_shard_to_indexer: FnvHashMap> = - Default::default(); - for (indexer_id, indexing_tasks) in physical_plan.indexing_tasks_per_indexer().iter() { - for indexing_task in indexing_tasks { - let index_uid = IndexUid::from(indexing_task.index_uid.clone()); - let Some(indexer_ord) = id_to_ord_map.indexer_ord(indexer_id) else { - continue; - }; - let source_uid = SourceUid { - index_uid, - source_id: indexing_task.source_id.clone(), - }; - let Some(source_ord) = id_to_ord_map.source_ord(&source_uid) else { - continue; - }; - for &shard_id in &indexing_task.shard_ids { - source_to_shard_to_indexer - .entry(source_ord) - .or_default() - .insert(shard_id, indexer_ord); - } - } - } - source_to_shard_to_indexer -} - fn populate_problem( source: &SourceToSchedule, problem: &mut SchedulingProblem, @@ -135,9 +96,6 @@ impl IdToOrdMap { fn source_ord(&self, source_uid: &SourceUid) -> Option { self.source_uid_to_source_ord.get(source_uid).copied() } - fn indexer_id(&self, indexer_ord: IndexerOrd) -> &String { - &self.indexer_ids[indexer_ord] - } fn indexer_ord(&self, indexer_id: &str) -> Option { self.indexer_id_to_indexer_ord.get(indexer_id).copied() @@ -173,66 +131,6 @@ fn convert_physical_plan_to_solution( } } -/// Spreads the list of shard_ids optimally amongst the different indexers. -/// This function also receives a `previous_shard_to_indexer_ord` map, informing -/// use of the previous configuration. -/// -/// Whenever possible this function tries to keep shards on the same indexer. -/// -/// Contract: -/// The sum of the number of shards (values of `indexer_num_shards`) should match -/// the length of shard_ids. -/// Note that all shards are not necessarily in previous_shard_to_indexer_ord. -fn spread_shards_optimally( - shard_ids: &[ShardId], - mut indexer_num_shards: FnvHashMap, - previous_shard_to_indexer_ord: FnvHashMap, -) -> FnvHashMap> { - assert_eq!( - shard_ids.len(), - indexer_num_shards - .values() - .map(|num_shards| num_shards.get() as usize) - .sum::(), - ); - let mut shard_ids_per_indexer: FnvHashMap> = Default::default(); - let mut unassigned_shard_ids: Vec = Vec::new(); - for &shard_id in shard_ids { - if let Some(previous_indexer_ord) = previous_shard_to_indexer_ord.get(&shard_id).cloned() { - if let Entry::Occupied(mut num_shards_entry) = - indexer_num_shards.entry(previous_indexer_ord) - { - if let Some(new_num_shards) = NonZeroU32::new(num_shards_entry.get().get() - 1u32) { - *num_shards_entry.get_mut() = new_num_shards; - } else { - num_shards_entry.remove(); - } - // We keep the shard on the indexer it used to be. - shard_ids_per_indexer - .entry(previous_indexer_ord) - .or_default() - .push(shard_id); - continue; - } - } - unassigned_shard_ids.push(shard_id); - } - - // Finally, we need to add the missing shards. - for (indexer_ord, num_shards) in indexer_num_shards { - assert!(unassigned_shard_ids.len() >= num_shards.get() as usize); - shard_ids_per_indexer - .entry(indexer_ord) - .or_default() - .extend(unassigned_shard_ids.drain(..num_shards.get() as usize)); - } - - // At this point, we should have applied all of the missing shards. - assert!(unassigned_shard_ids.is_empty()); - - shard_ids_per_indexer -} - #[derive(Debug)] pub struct SourceToSchedule { pub source_uid: SourceUid, @@ -253,202 +151,289 @@ pub enum SourceToScheduleType { IngestV1, } -fn group_shards_into_pipelines( - source_uid: &SourceUid, - shard_ids: &[ShardId], - previous_indexing_tasks: &[IndexingTask], - cpu_load_per_shard: CpuCapacity, -) -> Vec { - let num_shards = shard_ids.len() as u32; - if num_shards == 0 { - return Vec::new(); +fn compute_max_num_shards_per_pipeline(source_type: &SourceToScheduleType) -> NonZeroU32 { + match &source_type { + SourceToScheduleType::Sharded { + shards: _, + load_per_shard, + } => { + NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis() / load_per_shard.get()) + .unwrap_or_else(|| { + // We throttle shard at ingestion to ensure that a shard does not + // exceed 5MB/s. + // + // This value has been chosen to make sure that one full pipeline + // should always be able to handle the load of one shard. + // + // However it is possible for the system to take more than this + // when it is playing catch up. + // + // This is a transitory state, and not a problem per se. + warn!("load per shard is higher than `MAX_LOAD_PER_PIPELINE`"); + NonZeroU32::MIN // also colloquially known as `1` + }) + } + SourceToScheduleType::IngestV1 | SourceToScheduleType::NonSharded { .. } => { + NonZeroU32::new(1u32).unwrap() + } } - let max_num_shards_per_pipeline: NonZeroU32 = - NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis() / cpu_load_per_shard.cpu_millis()) - .unwrap_or_else(|| { - // We throttle shard at ingestion to ensure that a shard does not - // exceed 5MB/s. - // - // This value has been chosen to make sure that one full pipeline - // should always be able to handle the load of one shard. - // - // However it is possible for the system to take more than this - // when it is playing catch up. - // - // This is a transitory state, and not a problem per se. - warn!("load per shard is higher than `MAX_LOAD_PER_PIPELINE`"); - NonZeroU32::MIN // also colloquially known as `1` - }); - - // We compute the number of pipelines we will create, cooking in some hysteresis effect here. - // We have two different threshold to increase and to decrease the number of pipelines. - let min_num_pipelines: u32 = - (num_shards + max_num_shards_per_pipeline.get() - 1) / max_num_shards_per_pipeline; - assert!(min_num_pipelines > 0); - let max_num_pipelines: u32 = min_num_pipelines.max( - num_shards * cpu_load_per_shard.cpu_millis() / CPU_PER_PIPELINE_LOAD_THRESHOLD.cpu_millis(), - ); - let previous_num_pipelines = previous_indexing_tasks.len() as u32; - let num_pipelines: u32 = if previous_num_pipelines > min_num_pipelines { - previous_num_pipelines.min(max_num_pipelines) - } else { - min_num_pipelines - }; - - let mut pipelines: Vec> = std::iter::repeat_with(Vec::new) - .take((previous_num_pipelines as usize).max(num_pipelines as usize)) - .collect(); +} - let mut unassigned_shard_ids: Vec = Vec::new(); - let previous_pipeline_map: FnvHashMap = previous_indexing_tasks - .iter() - .enumerate() - .flat_map(|(pipeline_ord, indexing_task)| { - indexing_task - .shard_ids +fn convert_scheduling_solution_to_physical_plan_single_node_single_source( + mut remaining_num_shards_to_schedule_on_node: u32, + // Specific to the source. + mut previous_tasks: &[&IndexingTask], + source: &SourceToSchedule, +) -> Vec { + match &source.source_type { + SourceToScheduleType::Sharded { + shards, + load_per_shard, + } => { + // For the moment we do something voluntarily suboptimal. + let max_num_pipelines = (shards.len() as u32) * load_per_shard.get() + / CPU_PER_PIPELINE_LOAD_THRESHOLD.cpu_millis(); + if previous_tasks.len() > max_num_pipelines as usize { + previous_tasks = &previous_tasks[..max_num_pipelines as usize]; + } + let max_num_shards_per_pipeline: NonZeroU32 = + compute_max_num_shards_per_pipeline(&source.source_type); + let mut new_tasks = Vec::new(); + for previous_task in previous_tasks { + let max_shard_in_pipeline = max_num_shards_per_pipeline + .get() + .min(remaining_num_shards_to_schedule_on_node) + as usize; + let shard_ids: Vec = previous_task + .shard_ids + .iter() + .copied() + .filter(|shard_id| shards.contains(shard_id)) + .take(max_shard_in_pipeline) + .collect(); + remaining_num_shards_to_schedule_on_node -= shard_ids.len() as u32; + if remaining_num_shards_to_schedule_on_node == 0 { + break; + } + let new_task = IndexingTask { + index_uid: previous_task.index_uid.clone(), + source_id: previous_task.source_id.clone(), + pipeline_uid: previous_task.pipeline_uid, + shard_ids, + }; + new_tasks.push(new_task); + } + new_tasks + } + SourceToScheduleType::NonSharded { .. } => { + // For non-sharded pipelines, we just need `num_shards` is a number of pipelines. + let mut indexing_tasks: Vec = previous_tasks .iter() - .map(move |shard_id| (*shard_id, pipeline_ord)) - }) - .collect(); - - for &shard in shard_ids { - if let Some(pipeline_ord) = previous_pipeline_map.get(&shard).copied() { - // Whenever possible we allocate to the previous pipeline. - let best_pipeline_for_shard = &mut pipelines[pipeline_ord]; - if best_pipeline_for_shard.len() < max_num_shards_per_pipeline.get() as usize { - best_pipeline_for_shard.push(shard); + .take(remaining_num_shards_to_schedule_on_node as usize) + .map(|task| (*task).clone()) + .collect(); + indexing_tasks.resize_with(remaining_num_shards_to_schedule_on_node as usize, || { + IndexingTask { + index_uid: source.source_uid.index_uid.to_string(), + source_id: source.source_uid.source_id.clone(), + pipeline_uid: Some(PipelineUid::new()), + shard_ids: Vec::new(), + } + }); + indexing_tasks + } + SourceToScheduleType::IngestV1 => { + // Ingest V1 is simple. One pipeline per indexer node. + if let Some(indexing_task) = previous_tasks.first() { + // The pipeline already exists, let's reuse it. + vec![(*indexing_task).clone()] } else { - unassigned_shard_ids.push(shard); + // The source is new, we need to create a new task. + vec![IndexingTask { + index_uid: source.source_uid.index_uid.to_string(), + source_id: source.source_uid.source_id.clone(), + pipeline_uid: Some(PipelineUid::new()), + shard_ids: Vec::new(), + }] } - } else { - unassigned_shard_ids.push(shard); } } +} - // If needed, let's remove some pipelines. We just remove the pipelines that have - // the least number of shards. - pipelines.sort_by_key(|shards| std::cmp::Reverse(shards.len())); - for removed_pipeline_shards in pipelines.drain(num_pipelines as usize..) { - unassigned_shard_ids.extend(removed_pipeline_shards); - } - - // Now we need to allocate the unallocated shards. - // We just allocate them to the current pipeline that has the lowest load. - for shard in unassigned_shard_ids { - let best_pipeline_for_shard: &mut Vec = pipelines - .iter_mut() - .min_by_key(|shards| shards.len()) - .unwrap(); - best_pipeline_for_shard.push(shard); +fn convert_scheduling_solution_to_physical_plan_single_node( + indexer_assigment: &IndexerAssignment, + previous_tasks: &[IndexingTask], + sources: &[SourceToSchedule], + id_to_ord_map: &IdToOrdMap, +) -> Vec { + let mut tasks = Vec::new(); + for source in sources { + let source_num_shards = + if let Some(source_ord) = id_to_ord_map.source_ord(&source.source_uid) { + indexer_assigment.num_shards(source_ord) + } else { + // This can happen for IngestV1 + 1u32 + }; + let source_pipelines: Vec<&IndexingTask> = previous_tasks + .iter() + .filter(|task| { + task.index_uid == source.source_uid.index_uid.as_str() + && task.source_id == source.source_uid.source_id + }) + .collect(); + let source_tasks = convert_scheduling_solution_to_physical_plan_single_node_single_source( + source_num_shards, + &source_pipelines[..], + source, + ); + tasks.extend(source_tasks); } - - let mut indexing_tasks: Vec = pipelines - .into_iter() - .map(|mut shard_ids| { - shard_ids.sort(); - IndexingTask { - index_uid: source_uid.index_uid.to_string(), - source_id: source_uid.source_id.clone(), - shard_ids, - } - }) - .collect(); - - indexing_tasks.sort_by_key(|indexing_task| indexing_task.shard_ids[0]); - - indexing_tasks + // code goes here. + tasks.sort_by(|left: &IndexingTask, right: &IndexingTask| { + left.index_uid + .cmp(&right.index_uid) + .then_with(|| left.source_id.cmp(&right.source_id)) + }); + tasks } /// This function takes a scheduling solution (which abstracts the notion of pipelines, -/// and shard ids) and builds a physical plan. +/// and shard ids) and builds a physical plan, attempting to make as little change as possible +/// to the existing pipelines. +/// +/// We do not support moving shard from one pipeline to another, so if required this function may +/// also return instruction about deleting / adding new shards. fn convert_scheduling_solution_to_physical_plan( - solution: &SchedulingSolution, - problem: &SchedulingProblem, + mut solution: SchedulingSolution, id_to_ord_map: &IdToOrdMap, sources: &[SourceToSchedule], previous_plan_opt: Option<&PhysicalIndexingPlan>, ) -> PhysicalIndexingPlan { - let mut previous_shard_to_indexer_map: FnvHashMap> = - previous_plan_opt - .map(|previous_plan| create_shard_to_indexer_map(previous_plan, id_to_ord_map)) - .unwrap_or_default(); - - let mut physical_indexing_plan = - PhysicalIndexingPlan::with_indexer_ids(&id_to_ord_map.indexer_ids); + let mut new_physical_plan = PhysicalIndexingPlan::with_indexer_ids(&id_to_ord_map.indexer_ids); + for (indexer_id, indexer_assignment) in id_to_ord_map + .indexer_ids + .iter() + .zip(&mut solution.indexer_assignments) + { + let previous_tasks_for_indexer = previous_plan_opt + .and_then(|previous_plan| previous_plan.indexer(indexer_id)) + .unwrap_or(&[]); + // First we attempt to recycle existing pipelines. + let new_plan_indexing_tasks_for_indexer: Vec = + convert_scheduling_solution_to_physical_plan_single_node( + indexer_assignment, + previous_tasks_for_indexer, + sources, + id_to_ord_map, + ); + for indexing_task in new_plan_indexing_tasks_for_indexer { + new_physical_plan.add_indexing_task(indexer_id, indexing_task); + } + } for source in sources { - match &source.source_type { - SourceToScheduleType::Sharded { - shards, - load_per_shard: _load_per_shard, - } => { - // That's ingest v2. - // The logic is complicated here. At this point we know the number of shards to - // be assign to each indexer, but we want to convert that number into a list of - // shard ids, without moving a shard from a indexer to another - // whenever possible. - let source_ord = id_to_ord_map.source_ord(&source.source_uid).unwrap(); - let indexer_num_shards: FnvHashMap = - solution.indexer_shards(source_ord).collect(); - - let shard_to_indexer_ord = previous_shard_to_indexer_map - .remove(&source_ord) - .unwrap_or_default(); - - let load_per_shard = problem.source_load_per_shard(source_ord); - let shard_ids_per_node: FnvHashMap> = - spread_shards_optimally(shards, indexer_num_shards, shard_to_indexer_ord); - - for (node_ord, shard_ids_for_node) in shard_ids_per_node { - let node_id = id_to_ord_map.indexer_id(node_ord); - let indexing_tasks: &[IndexingTask] = previous_plan_opt - .and_then(|previous_plan| previous_plan.indexer(node_id)) - .unwrap_or(&[]); - let indexing_tasks = group_shards_into_pipelines( - &source.source_uid, - &shard_ids_for_node, - indexing_tasks, - CpuCapacity::from_cpu_millis(load_per_shard.get()), - ); - for indexing_task in indexing_tasks { - physical_indexing_plan.add_indexing_task(node_id, indexing_task); - } - } - } - SourceToScheduleType::NonSharded { .. } => { - // These are the sources that are not sharded (Kafka-like). - // - // Here one shard is one pipeline. - let source_ord = id_to_ord_map.source_ord(&source.source_uid).unwrap(); - - let indexer_num_shards: FnvHashMap = - solution.indexer_shards(source_ord).collect(); - - for (indexer_ord, num_shards) in indexer_num_shards { - let indexer_id = id_to_ord_map.indexer_id(indexer_ord); - for _ in 0..num_shards.get() { - let indexing_task = indexing_task(source.source_uid.clone(), Vec::new()); - physical_indexing_plan.add_indexing_task(indexer_id, indexing_task); - } + let SourceToScheduleType::Sharded { + shards, + load_per_shard: _, + } = &source.source_type + else { + continue; + }; + let source_ord = id_to_ord_map.source_ord(&source.source_uid).unwrap(); + let mut scheduled_shards: FnvHashSet = FnvHashSet::default(); + let mut remaining_capacity_per_node: Vec<(String, u32)> = Vec::default(); + for (indexer, indexing_tasks) in new_physical_plan.indexing_tasks_per_indexer_mut() { + let indexer_ord = id_to_ord_map.indexer_ord(indexer).unwrap(); + let mut num_shards_for_indexer_source: u32 = + solution.indexer_assignments[indexer_ord].num_shards(source_ord); + for indexing_task in indexing_tasks { + if indexing_task.index_uid == source.source_uid.index_uid.as_str() + && indexing_task.source_id == source.source_uid.source_id + { + indexing_task.shard_ids.retain(|&shard| { + let shard_added = scheduled_shards.insert(shard); + if shard_added { + error!( + "this should never happen. shard was allocated into two pipelines." + ); + true + } else { + false + } + }); + num_shards_for_indexer_source -= 1; } - continue; } - SourceToScheduleType::IngestV1 => { - // Ingest V1 requires to start one pipeline on each indexer. - // This pipeline is off-the-grid: it is not taken in account in the - // indexer capacity. We start it to ensure backward compatibility - // a little, but we want to remove it rapidly. - for indexer_id in &id_to_ord_map.indexer_ids { - let indexing_task = indexing_task(source.source_uid.clone(), Vec::new()); - physical_indexing_plan.add_indexing_task(indexer_id, indexing_task); - } + remaining_capacity_per_node.push((indexer.to_string(), num_shards_for_indexer_source)); + } + + // Missing shards is the list of shards that is not scheduled into a pipeline yet. + let missing_shards: Vec = shards + .iter() + .filter(|&shard| !scheduled_shards.contains(shard)) + .copied() + .collect(); + + // Let's allocate the missing shards. + // TODO that's the logic that has to change. Eventually we want to remove shards that + // were previously allocated and create new shards to replace them. + let max_shard_per_pipeline = compute_max_num_shards_per_pipeline(&source.source_type); + for missing_shard in missing_shards { + let (last_indexer_str, remaining_shard) = + remaining_capacity_per_node.last_mut().unwrap(); + *remaining_shard -= 1; + add_shard_to_indexer( + missing_shard, + last_indexer_str, + &source.source_uid, + max_shard_per_pipeline, + &mut new_physical_plan, + ); + if *remaining_shard == 0 { + remaining_capacity_per_node.pop(); } } } - // We sort the tasks by `source_uid`. - physical_indexing_plan.normalize(); - physical_indexing_plan + new_physical_plan.normalize(); + + new_physical_plan +} + +fn add_shard_to_indexer( + missing_shard: ShardId, + indexer: &str, + source_uid: &SourceUid, + max_shard_per_pipeline: NonZeroU32, + new_physical_plan: &mut PhysicalIndexingPlan, +) { + let indexer_tasks = new_physical_plan + .indexing_tasks_per_indexer_mut() + .entry(indexer.to_string()) + .or_default(); + + let indexing_task_opt = indexer_tasks + .iter_mut() + .filter(|indexing_task| { + indexing_task.index_uid == source_uid.index_uid.as_str() + && indexing_task.source_id == source_uid.source_id + }) + .filter(|task| task.shard_ids.len() < max_shard_per_pipeline.get() as usize) + .min_by_key(|task| task.shard_ids.len()); + + if let Some(indexing_task) = indexing_task_opt { + indexing_task.shard_ids.push(missing_shard); + } else { + // We haven't found any pipeline with remaining room. + // It is time to create a new pipeline. + indexer_tasks.push(IndexingTask { + index_uid: source_uid.index_uid.to_string(), + source_id: source_uid.source_id.clone(), + pipeline_uid: Some(PipelineUid::new()), + shard_ids: vec![missing_shard], + }); + } } /// Creates a physical plan given the current situation of the cluster and the list of sources @@ -473,8 +458,6 @@ pub fn build_physical_indexing_plan( indexer_id_to_cpu_capacities: &FnvHashMap, previous_plan_opt: Option<&PhysicalIndexingPlan>, ) -> PhysicalIndexingPlan { - // TODO make the load per node something that can be configured on each node. - // Convert our problem to a scheduling problem. let mut id_to_ord_map = IdToOrdMap::default(); @@ -513,8 +496,7 @@ pub fn build_physical_indexing_plan( // Convert the new scheduling solution back to a physical plan. convert_scheduling_solution_to_physical_plan( - &new_solution, - &problem, + new_solution, &id_to_ord_map, sources, previous_plan_opt, @@ -528,28 +510,11 @@ mod tests { use std::sync::atomic::{AtomicUsize, Ordering}; use fnv::FnvHashMap; - use quickwit_proto::indexing::{mcpu, IndexingTask}; - use quickwit_proto::types::{IndexUid, ShardId, SourceUid}; + use quickwit_proto::indexing::{mcpu, CpuCapacity, IndexingTask}; + use quickwit_proto::types::{IndexUid, PipelineUid, ShardId, SourceUid}; - use super::{ - build_physical_indexing_plan, group_shards_into_pipelines, indexing_task, - spread_shards_optimally, SourceToSchedule, SourceToScheduleType, - }; - - #[test] - fn test_spread_shard_optimally() { - let mut indexer_num_shards = FnvHashMap::default(); - indexer_num_shards.insert(0, NonZeroU32::new(2).unwrap()); - indexer_num_shards.insert(1, NonZeroU32::new(3).unwrap()); - let mut shard_to_indexer_ord = FnvHashMap::default(); - shard_to_indexer_ord.insert(0, 0); - shard_to_indexer_ord.insert(1, 2); - shard_to_indexer_ord.insert(3, 0); - let indexer_to_shards = - spread_shards_optimally(&[0, 1, 2, 3, 4], indexer_num_shards, shard_to_indexer_ord); - assert_eq!(indexer_to_shards.get(&0), Some(&vec![0, 3])); - assert_eq!(indexer_to_shards.get(&1), Some(&vec![1, 2, 4])); - } + use super::{build_physical_indexing_plan, SourceToSchedule, SourceToScheduleType}; + use crate::indexing_plan::PhysicalIndexingPlan; fn source_id() -> SourceUid { static COUNTER: AtomicUsize = AtomicUsize::new(0); @@ -597,27 +562,25 @@ mod tests { assert_eq!(indexing_plan.indexing_tasks_per_indexer().len(), 2); let node1_plan = indexing_plan.indexer(&indexer1).unwrap(); + let node2_plan = indexing_plan.indexer(&indexer2).unwrap(); // both non-sharded pipelines get scheduled on the same node. - assert_eq!( - &node1_plan, - &[ - indexing_task(source_uid1.clone(), vec![]), - indexing_task(source_uid1.clone(), vec![]), - indexing_task(source_uid2.clone(), vec![]), - ] - ); - - let node2_plan = indexing_plan.indexer(&indexer2).unwrap(); - assert_eq!( - &node2_plan, - &[ - indexing_task(source_uid0.clone(), vec![0, 3, 6]), - indexing_task(source_uid0.clone(), vec![1, 4, 7]), - indexing_task(source_uid0.clone(), vec![2, 5]), - indexing_task(source_uid2.clone(), vec![]), - ] - ); + assert_eq!(node1_plan.len(), 3); + assert_eq!(&node1_plan[0].source_id, &source_uid1.source_id); + assert!(&node1_plan[0].shard_ids.is_empty()); + assert_eq!(&node1_plan[1].source_id, &source_uid1.source_id); + assert!(&node1_plan[1].shard_ids.is_empty()); + assert_eq!(&node1_plan[2].source_id, &source_uid2.source_id); + assert!(&node1_plan[2].shard_ids.is_empty()); + + assert_eq!(node2_plan.len(), 4); + assert_eq!(&node2_plan[0].source_id, &source_uid0.source_id); + assert_eq!(&node2_plan[0].shard_ids, &[0, 1, 2]); + assert_eq!(&node2_plan[1].source_id, &source_uid0.source_id); + assert_eq!(&node2_plan[1].shard_ids, &[3, 4, 5]); + assert_eq!(&node2_plan[2].source_id, &source_uid0.source_id); + assert_eq!(&node2_plan[2].shard_ids, &[6, 7]); + assert_eq!(&node2_plan[3].source_id, &source_uid2.source_id); } #[tokio::test] @@ -640,10 +603,8 @@ mod tests { let physical_plan = build_physical_indexing_plan(&sources, &indexer_max_loads, None); assert_eq!(physical_plan.indexing_tasks_per_indexer().len(), 1); let expected_tasks = physical_plan.indexer(&indexer1).unwrap(); - assert_eq!( - expected_tasks, - &[indexing_task(source_uid1.clone(), Vec::new())] - ); + assert_eq!(expected_tasks.len(), 1); + assert_eq!(&expected_tasks[0].source_id, &source_uid1.source_id); } { indexer_max_loads.insert(indexer1.clone(), mcpu(2_000)); @@ -651,106 +612,159 @@ mod tests { let physical_plan = build_physical_indexing_plan(&sources, &indexer_max_loads, None); assert_eq!(physical_plan.indexing_tasks_per_indexer().len(), 1); let expected_tasks = physical_plan.indexer(&indexer1).unwrap(); - assert_eq!( - expected_tasks, - &[ - indexing_task(source_uid1.clone(), Vec::new()), - indexing_task(source_uid1.clone(), Vec::new()), - ] - ) + assert_eq!(expected_tasks.len(), 2); + assert_eq!(&expected_tasks[0].source_id, &source_uid1.source_id); + assert!(expected_tasks[0].shard_ids.is_empty()); + assert_eq!(&expected_tasks[1].source_id, &source_uid1.source_id); + assert!(expected_tasks[1].shard_ids.is_empty()); } } - #[test] - fn test_group_shards_empty() { - let source_uid = source_id(); - let indexing_tasks = group_shards_into_pipelines(&source_uid, &[], &[], mcpu(250)); - assert!(indexing_tasks.is_empty()); - } - fn make_indexing_tasks( source_uid: &SourceUid, - shard_ids_grp: &[&[ShardId]], + shards: &[(PipelineUid, &[ShardId])], ) -> Vec { - shard_ids_grp - .iter() - .copied() - .map(|shard_ids| IndexingTask { + let mut plan = Vec::new(); + for (pipeline_uid, shard_ids) in shards { + plan.push(IndexingTask { index_uid: source_uid.index_uid.to_string(), source_id: source_uid.source_id.clone(), + pipeline_uid: Some(*pipeline_uid), shard_ids: shard_ids.to_vec(), - }) - .collect::>() + }); + } + plan } #[test] fn test_group_shards_into_pipeline_simple() { let source_uid = source_id(); - let previous_indexing_tasks: Vec = - make_indexing_tasks(&source_uid, &[&[1, 2], &[3, 4, 5]]); - let indexing_tasks = group_shards_into_pipelines( + let indexing_tasks = make_indexing_tasks( &source_uid, - &[0, 1, 3, 4, 5], - &previous_indexing_tasks, - mcpu(1_000), + &[ + (PipelineUid::from_u128(1u128), &[1, 2]), + (PipelineUid::from_u128(2u128), &[3, 4, 5]), + ], + ); + let sources = vec![SourceToSchedule { + source_uid: source_uid.clone(), + source_type: SourceToScheduleType::Sharded { + shards: vec![0, 1, 3, 4, 5], + load_per_shard: NonZeroU32::new(1_000).unwrap(), + }, + }]; + let mut indexer_id_to_cpu_capacities = FnvHashMap::default(); + indexer_id_to_cpu_capacities.insert("node1".to_string(), mcpu(10_000)); + let mut indexing_plan = PhysicalIndexingPlan::with_indexer_ids(&["node1".to_string()]); + for indexing_task in indexing_tasks { + indexing_plan.add_indexing_task("node1", indexing_task); + } + let new_plan = build_physical_indexing_plan( + &sources, + &indexer_id_to_cpu_capacities, + Some(&indexing_plan), ); + let indexing_tasks = new_plan.indexer("node1").unwrap(); assert_eq!(indexing_tasks.len(), 2); assert_eq!(&indexing_tasks[0].shard_ids, &[0, 1]); assert_eq!(&indexing_tasks[1].shard_ids, &[3, 4, 5]); } + fn group_shards_into_pipelines_aux( + source_uid: &SourceUid, + shard_ids: &[ShardId], + previous_pipeline_shards: &[(PipelineUid, &[ShardId])], + load_per_shard: CpuCapacity, + ) -> Vec { + let indexing_tasks = make_indexing_tasks(source_uid, previous_pipeline_shards); + let sources = vec![SourceToSchedule { + source_uid: source_uid.clone(), + source_type: SourceToScheduleType::Sharded { + shards: shard_ids.to_vec(), + load_per_shard: NonZeroU32::new(load_per_shard.cpu_millis()).unwrap(), + }, + }]; + const NODE: &str = "node1"; + let mut indexer_id_to_cpu_capacities = FnvHashMap::default(); + indexer_id_to_cpu_capacities.insert(NODE.to_string(), mcpu(10_000)); + let mut indexing_plan = PhysicalIndexingPlan::with_indexer_ids(&["node1".to_string()]); + for indexing_task in indexing_tasks { + indexing_plan.add_indexing_task(NODE, indexing_task); + } + let new_plan = build_physical_indexing_plan( + &sources, + &indexer_id_to_cpu_capacities, + Some(&indexing_plan), + ); + let indexing_tasks = new_plan.indexer(NODE).unwrap(); + indexing_tasks.to_vec() + } + #[test] fn test_group_shards_load_per_shard_too_high() { let source_uid = source_id(); - let indexing_tasks = group_shards_into_pipelines(&source_uid, &[1, 2], &[], mcpu(4_000)); + let indexing_tasks = + group_shards_into_pipelines_aux(&source_uid, &[1, 2], &[], mcpu(4_000)); assert_eq!(indexing_tasks.len(), 2); } #[test] fn test_group_shards_into_pipeline_hysteresis() { let source_uid = source_id(); - let previous_indexing_tasks: Vec = make_indexing_tasks(&source_uid, &[]); - let indexing_tasks_1 = group_shards_into_pipelines( + let indexing_tasks_1 = group_shards_into_pipelines_aux( &source_uid, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10], - &previous_indexing_tasks, + &[], mcpu(400), ); assert_eq!(indexing_tasks_1.len(), 2); - assert_eq!(&indexing_tasks_1[0].shard_ids, &[0, 2, 4, 6, 8, 10]); - assert_eq!(&indexing_tasks_1[1].shard_ids, &[1, 3, 5, 7, 9]); + assert_eq!(&indexing_tasks_1[0].shard_ids, &[0, 1, 2, 3, 4, 5, 6, 7]); + assert_eq!(&indexing_tasks_1[1].shard_ids, &[8, 9, 10]); + + let pipeline_tasks1: Vec<(PipelineUid, &[ShardId])> = indexing_tasks_1 + .iter() + .map(|task| (task.pipeline_uid(), &task.shard_ids[..])) + .collect(); + // With the same set of shards, an increase of load triggers the creation of a new task. - let indexing_tasks_2 = group_shards_into_pipelines( + let indexing_tasks_2 = group_shards_into_pipelines_aux( &source_uid, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10], - &indexing_tasks_1, + &pipeline_tasks1[..], mcpu(600), ); assert_eq!(indexing_tasks_2.len(), 3); - assert_eq!(&indexing_tasks_2[0].shard_ids, &[0, 2, 4, 6, 8]); - assert_eq!(&indexing_tasks_2[1].shard_ids, &[1, 3, 5, 7, 9]); - assert_eq!(&indexing_tasks_2[2].shard_ids, &[10]); + assert_eq!(&indexing_tasks_2[0].shard_ids, &[0, 1, 2, 3, 4]); + assert_eq!(&indexing_tasks_2[1].shard_ids, &[7]); + assert_eq!(&indexing_tasks_2[2].shard_ids, &[5, 6, 8, 9, 10]); // Now the load comes back to normal // The hysteresis takes effect. We do not switch back to 2 pipelines. - let indexing_tasks_3 = group_shards_into_pipelines( + let pipeline_tasks2: Vec<(PipelineUid, &[ShardId])> = indexing_tasks_1 + .iter() + .map(|task| (task.pipeline_uid(), &task.shard_ids[..])) + .collect(); + let indexing_tasks_3 = group_shards_into_pipelines_aux( &source_uid, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10], - &indexing_tasks_2, + &pipeline_tasks2, mcpu(400), ); - assert_eq!(indexing_tasks_3.len(), 3); - assert_eq!(&indexing_tasks_3[0].shard_ids, &[0, 2, 4, 6, 8]); - assert_eq!(&indexing_tasks_3[1].shard_ids, &[1, 3, 5, 7, 9]); - assert_eq!(&indexing_tasks_3[2].shard_ids, &[10]); - // Now a further lower load.. - let indexing_tasks_4 = group_shards_into_pipelines( + assert_eq!(indexing_tasks_3.len(), 2); + assert_eq!(&indexing_tasks_3[0].shard_ids, &[0, 1, 2, 3, 4, 5, 6, 7,]); + assert_eq!(&indexing_tasks_3[1].shard_ids, &[8, 9, 10]); + let pipeline_tasks3: Vec<(PipelineUid, &[ShardId])> = indexing_tasks_2 + .iter() + .map(|task| (task.pipeline_uid(), &task.shard_ids[..])) + .collect(); + // Now a further lower load. + let indexing_tasks_4 = group_shards_into_pipelines_aux( &source_uid, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10], - &indexing_tasks_3, + &pipeline_tasks3, mcpu(320), ); assert_eq!(indexing_tasks_4.len(), 2); - assert_eq!(&indexing_tasks_4[0].shard_ids, &[0, 2, 4, 6, 8, 10]); - assert_eq!(&indexing_tasks_4[1].shard_ids, &[1, 3, 5, 7, 9]); + assert_eq!(&indexing_tasks_4[0].shard_ids, &[0, 1, 2, 3, 4, 10]); + assert_eq!(&indexing_tasks_4[1].shard_ids, &[5, 6, 7, 8, 9]); } } diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs index 7e61a5cb3ca..3ce5816141d 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs @@ -160,24 +160,7 @@ impl SchedulingSolution { indexer_assignments: (0..num_indexers).map(IndexerAssignment::new).collect(), } } - pub fn num_indexers(&self) -> usize { self.indexer_assignments.len() } - - pub fn indexer_shards( - &self, - source_ord: SourceOrd, - ) -> impl Iterator + '_ { - self.indexer_assignments - .iter() - .filter_map(move |indexer_assignment| { - let num_shards: NonZeroU32 = indexer_assignment - .num_shards_per_source - .get(&source_ord) - .copied() - .and_then(NonZeroU32::new)?; - Some((indexer_assignment.indexer_ord, num_shards)) - }) - } } diff --git a/quickwit/quickwit-indexing/failpoints/mod.rs b/quickwit/quickwit-indexing/failpoints/mod.rs index 95254a5461a..cf6a1935801 100644 --- a/quickwit/quickwit-indexing/failpoints/mod.rs +++ b/quickwit/quickwit-indexing/failpoints/mod.rs @@ -300,7 +300,7 @@ async fn test_merge_executor_controlled_directory_kill_switch() -> anyhow::Resul index_uid: IndexUid::new_with_random_ulid(index_id), source_id: "test-source".to_string(), node_id: "test-node".to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::default(), }; let universe = test_index_builder.universe(); diff --git a/quickwit/quickwit-indexing/src/actors/indexer.rs b/quickwit/quickwit-indexing/src/actors/indexer.rs index b13823fbf01..62c8b9db056 100644 --- a/quickwit/quickwit-indexing/src/actors/indexer.rs +++ b/quickwit/quickwit-indexing/src/actors/indexer.rs @@ -186,7 +186,7 @@ impl IndexerState { let batch_parent_span = info_span!(target: "quickwit-indexing", "index-doc-batches", index_id=%self.pipeline_id.index_uid.index_id(), source_id=%self.pipeline_id.source_id, - pipeline_ord=%self.pipeline_id.pipeline_ord, + pipeline_uid=%self.pipeline_id.pipeline_uid, workbench_id=%workbench_id, ); let indexing_span = info_span!(parent: batch_parent_span.id(), "indexer"); @@ -682,7 +682,7 @@ mod tests { use quickwit_doc_mapper::{default_doc_mapper_for_test, DefaultDocMapper}; use quickwit_metastore::checkpoint::SourceCheckpointDelta; use quickwit_proto::metastore::{EmptyResponse, LastDeleteOpstampResponse}; - use quickwit_proto::types::IndexUid; + use quickwit_proto::types::{IndexUid, PipelineUid}; use tantivy::{doc, DateTime}; use super::*; @@ -724,7 +724,7 @@ mod tests { index_uid: index_uid.clone(), source_id: "test-source".to_string(), node_id: "test-node".to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::default(), }; let doc_mapper = Arc::new(default_doc_mapper_for_test()); let last_delete_opstamp = 10; @@ -861,7 +861,7 @@ mod tests { index_uid: index_uid.clone(), source_id: "test-source".to_string(), node_id: "test-node".to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::default(), }; let doc_mapper = Arc::new(default_doc_mapper_for_test()); let last_delete_opstamp = 10; @@ -938,7 +938,7 @@ mod tests { index_uid: IndexUid::new_with_random_ulid("test-index"), source_id: "test-source".to_string(), node_id: "test-node".to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::default(), }; let doc_mapper = Arc::new(default_doc_mapper_for_test()); let last_delete_opstamp = 10; @@ -1022,7 +1022,7 @@ mod tests { index_uid: IndexUid::new_with_random_ulid("test-index"), source_id: "test-source".to_string(), node_id: "test-node".to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::default(), }; let doc_mapper = Arc::new(default_doc_mapper_for_test()); let last_delete_opstamp = 10; @@ -1097,7 +1097,7 @@ mod tests { index_uid: IndexUid::new_with_random_ulid("test-index"), source_id: "test-source".to_string(), node_id: "test-node".to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::default(), }; let doc_mapper = Arc::new(default_doc_mapper_for_test()); let schema = doc_mapper.schema(); @@ -1176,7 +1176,7 @@ mod tests { index_uid: IndexUid::new_with_random_ulid("test-index"), source_id: "test-source".to_string(), node_id: "test-node".to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::default(), }; let doc_mapper: Arc = Arc::new( serde_json::from_str::(DOCMAPPER_WITH_PARTITION_JSON).unwrap(), @@ -1274,7 +1274,7 @@ mod tests { index_uid: IndexUid::new_with_random_ulid("test-index"), source_id: "test-source".to_string(), node_id: "test-node".to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::default(), }; let doc_mapper: Arc = Arc::new(serde_json::from_str::(DOCMAPPER_SIMPLE_JSON).unwrap()); @@ -1344,7 +1344,7 @@ mod tests { index_uid: IndexUid::new_with_random_ulid("test-index"), source_id: "test-source".to_string(), node_id: "test-node".to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::default(), }; let doc_mapper: Arc = Arc::new(serde_json::from_str::(DOCMAPPER_SIMPLE_JSON).unwrap()); @@ -1416,7 +1416,7 @@ mod tests { index_uid: IndexUid::new_with_random_ulid("test-index"), source_id: "test-source".to_string(), node_id: "test-node".to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::default(), }; let doc_mapper: Arc = Arc::new(serde_json::from_str::(DOCMAPPER_SIMPLE_JSON).unwrap()); @@ -1481,7 +1481,7 @@ mod tests { index_uid: IndexUid::new_with_random_ulid("test-index"), source_id: "test-source".to_string(), node_id: "test-node".to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::default(), }; let doc_mapper: Arc = Arc::new(serde_json::from_str::(DOCMAPPER_SIMPLE_JSON).unwrap()); @@ -1542,7 +1542,7 @@ mod tests { index_uid: IndexUid::new_with_random_ulid("test-index"), source_id: "test-source".to_string(), node_id: "test-node".to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::default(), }; let doc_mapper = Arc::new(default_doc_mapper_for_test()); let last_delete_opstamp = 10; diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index 886e75dc3ac..ae65a142659 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -311,7 +311,7 @@ impl IndexingPipeline { info!( index_id=%index_id, source_id=%source_id, - pipeline_ord=%self.params.pipeline_id.pipeline_ord, + pipeline_uid=%self.params.pipeline_id.pipeline_uid, "spawning indexing pipeline", ); let (source_mailbox, source_inbox) = ctx @@ -598,7 +598,7 @@ mod tests { EmptyResponse, IndexMetadataResponse, LastDeleteOpstampResponse, ListSplitsResponse, MetastoreError, }; - use quickwit_proto::types::IndexUid; + use quickwit_proto::types::{IndexUid, PipelineUid}; use quickwit_storage::RamStorage; use super::{IndexingPipeline, *}; @@ -670,7 +670,7 @@ mod tests { index_uid: "test-index:11111111111111111111111111".to_string().into(), source_id: "test-source".to_string(), node_id: node_id.to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::from_u128(0u128), }; let source_config = SourceConfig { source_id: "test-source".to_string(), @@ -770,7 +770,7 @@ mod tests { index_uid: "test-index:11111111111111111111111111".to_string().into(), source_id: "test-source".to_string(), node_id: node_id.to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::from_u128(0u128), }; let source_config = SourceConfig { source_id: "test-source".to_string(), @@ -837,7 +837,7 @@ mod tests { index_uid: IndexUid::new_with_random_ulid("test-index"), source_id: "test-source".to_string(), node_id: node_id.to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::from_u128(0u128), }; let source_config = SourceConfig { source_id: "test-source".to_string(), @@ -955,7 +955,7 @@ mod tests { index_uid: "test-index:11111111111111111111111111".to_string().into(), source_id: "test-source".to_string(), node_id: node_id.to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::from_u128(0u128), }; let source_config = SourceConfig { source_id: "test-source".to_string(), diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index 87826d76852..19710d24704 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -18,12 +18,13 @@ // along with this program. If not, see . use std::collections::{HashMap, HashSet}; -use std::fmt::Debug; +use std::fmt::{Debug, Display, Formatter}; use std::path::PathBuf; use std::sync::Arc; use anyhow::Context; use async_trait::async_trait; +use fnv::FnvHashSet; use futures::future::try_join_all; use itertools::Itertools; use quickwit_actors::{ @@ -48,7 +49,7 @@ use quickwit_proto::indexing::{ use quickwit_proto::metastore::{ IndexMetadataRequest, ListIndexesMetadataRequest, MetastoreService, MetastoreServiceClient, }; -use quickwit_proto::types::{IndexId, IndexUid}; +use quickwit_proto::types::{IndexId, IndexUid, PipelineUid}; use quickwit_storage::StorageResolver; use serde::{Deserialize, Serialize}; use tokio::sync::Semaphore; @@ -80,6 +81,12 @@ pub struct MergePipelineId { source_id: String, } +impl Display for MergePipelineId { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "merge:{}:{}", self.index_uid, self.source_id) + } +} + impl<'a> From<&'a IndexingPipelineId> for MergePipelineId { fn from(pipeline_id: &'a IndexingPipelineId) -> Self { MergePipelineId { @@ -94,6 +101,12 @@ struct MergePipelineHandle { handle: ActorHandle, } +struct PipelineHandle { + mailbox: Mailbox, + handle: ActorHandle, + indexing_pipeline_id: IndexingPipelineId, +} + /// The indexing service is (single) actor service running on indexer and in charge /// of executing the indexing plans received from the control plane. /// @@ -109,8 +122,7 @@ pub struct IndexingService { ingest_api_service_opt: Option>, ingester_pool: IngesterPool, storage_resolver: StorageResolver, - indexing_pipelines: - HashMap, ActorHandle)>, + indexing_pipelines: HashMap, counters: IndexingServiceCounters, local_split_store: Arc, max_concurrent_split_uploads: usize, @@ -180,17 +192,14 @@ impl IndexingService { async fn detach_pipeline( &mut self, - pipeline_id: &IndexingPipelineId, + pipeline_uid: PipelineUid, ) -> Result, IndexingError> { - let (_pipeline_mailbox, pipeline_handle) = self + let pipeline_handle = self .indexing_pipelines - .remove(pipeline_id) - .ok_or_else(|| IndexingError::MissingPipeline { - index_id: pipeline_id.index_uid.index_id().to_string(), - source_id: pipeline_id.source_id.clone(), - })?; + .remove(&pipeline_uid) + .ok_or(IndexingError::MissingPipeline { pipeline_uid })?; self.counters.num_running_pipelines -= 1; - Ok(pipeline_handle) + Ok(pipeline_handle.handle) } async fn detach_merge_pipeline( @@ -200,9 +209,8 @@ impl IndexingService { let pipeline_handle = self .merge_pipeline_handles .remove(pipeline_id) - .ok_or_else(|| IndexingError::MissingPipeline { - index_id: pipeline_id.index_uid.index_id().to_string(), - source_id: pipeline_id.source_id.clone(), + .ok_or_else(|| IndexingError::MissingMergePipeline { + merge_pipeline_id: pipeline_id.to_string(), })?; self.counters.num_running_merge_pipelines -= 1; Ok(pipeline_handle.handle) @@ -210,15 +218,13 @@ impl IndexingService { async fn observe_pipeline( &mut self, - pipeline_id: &IndexingPipelineId, + pipeline_uid: PipelineUid, ) -> Result, IndexingError> { - let (_pipeline_mailbox, pipeline_handle) = self + let pipeline_handle = &self .indexing_pipelines - .get(pipeline_id) - .ok_or_else(|| IndexingError::MissingPipeline { - index_id: pipeline_id.index_uid.index_id().to_string(), - source_id: pipeline_id.source_id.clone(), - })?; + .get(&pipeline_uid) + .ok_or(IndexingError::MissingPipeline { pipeline_uid })? + .handle; let observation = pipeline_handle.observe().await; Ok(observation) } @@ -228,14 +234,14 @@ impl IndexingService { ctx: &ActorContext, index_id: IndexId, source_config: SourceConfig, - pipeline_ord: usize, + pipeline_uid: PipelineUid, ) -> Result { let index_metadata = self.index_metadata(ctx, &index_id).await?; let pipeline_id = IndexingPipelineId { index_uid: index_metadata.index_uid.clone(), source_id: source_config.source_id.clone(), node_id: self.node_id.clone(), - pipeline_ord, + pipeline_uid, }; let index_config = index_metadata.into_index_config(); self.spawn_pipeline_inner(ctx, pipeline_id.clone(), index_config, source_config) @@ -250,18 +256,22 @@ impl IndexingService { index_config: IndexConfig, source_config: SourceConfig, ) -> Result<(), IndexingError> { - if self.indexing_pipelines.contains_key(&pipeline_id) { + if self + .indexing_pipelines + .contains_key(&pipeline_id.pipeline_uid) + { return Err(IndexingError::PipelineAlreadyExists { index_id: pipeline_id.index_uid.index_id().to_string(), source_id: pipeline_id.source_id, - pipeline_ord: pipeline_id.pipeline_ord, + pipeline_uid: pipeline_id.pipeline_uid, }); } + let pipeline_uid_str = pipeline_id.pipeline_uid.to_string(); let indexing_directory = temp_dir::Builder::default() .join(pipeline_id.index_uid.index_id()) .join(pipeline_id.index_uid.incarnation_id()) .join(&pipeline_id.source_id) - .join(&pipeline_id.pipeline_ord.to_string()) + .join(&pipeline_uid_str) .tempdir_in(&self.indexing_root_directory) .map_err(IndexingError::Io)?; let storage = self @@ -327,8 +337,13 @@ impl IndexingService { }; let pipeline = IndexingPipeline::new(pipeline_params); let (pipeline_mailbox, pipeline_handle) = ctx.spawn_actor().spawn(pipeline); + let pipeline_handle = PipelineHandle { + mailbox: pipeline_mailbox, + handle: pipeline_handle, + indexing_pipeline_id: pipeline_id.clone(), + }; self.indexing_pipelines - .insert(pipeline_id, (pipeline_mailbox, pipeline_handle)); + .insert(pipeline_id.pipeline_uid, pipeline_handle); self.counters.num_running_pipelines += 1; Ok(()) } @@ -353,14 +368,12 @@ impl IndexingService { async fn handle_supervise(&mut self) -> Result<(), ActorExitStatus> { self.indexing_pipelines - .retain(|pipeline_id, (_pipeline_mailbox, pipeline_handle)| { - match pipeline_handle.state() { + .retain(|pipeline_uid, pipeline_handle| { + match pipeline_handle.handle.state() { ActorState::Idle | ActorState::Paused | ActorState::Processing => true, ActorState::Success => { info!( - index_id=%pipeline_id.index_uid.index_id(), - source_id=%pipeline_id.source_id, - pipeline_ord=%pipeline_id.pipeline_ord, + pipeline_uid=%pipeline_uid, "Indexing pipeline exited successfully." ); self.counters.num_successful_pipelines += 1; @@ -371,9 +384,7 @@ impl IndexingService { // This should never happen: Indexing Pipelines are not supposed to fail, // and are themselves in charge of supervising the pipeline actors. error!( - index_id=%pipeline_id.index_uid.index_id(), - source_id=%pipeline_id.source_id, - pipeline_ord=%pipeline_id.pipeline_ord, + pipeline_uid=%pipeline_uid, "Indexing pipeline exited with failure. This should never happen." ); self.counters.num_failed_pipelines += 1; @@ -385,8 +396,8 @@ impl IndexingService { // Evict and kill merge pipelines that are not needed. let needed_merge_pipeline_ids: HashSet = self .indexing_pipelines - .keys() - .map(MergePipelineId::from) + .values() + .map(|pipeline_handle| MergePipelineId::from(&pipeline_handle.indexing_pipeline_id)) .collect(); let current_merge_pipeline_ids: HashSet = self.merge_pipeline_handles.keys().cloned().collect(); @@ -417,11 +428,11 @@ impl IndexingService { let pipeline_metrics: HashMap<&IndexingPipelineId, PipelineMetrics> = self .indexing_pipelines - .iter() - .filter_map(|(pipeline_id, (_, pipeline_handle))| { - let indexing_statistics = pipeline_handle.last_observation(); + .values() + .filter_map(|pipeline_handle| { + let indexing_statistics = pipeline_handle.handle.last_observation(); let pipeline_metrics = indexing_statistics.pipeline_metrics_opt?; - Some((pipeline_id, pipeline_metrics)) + Some((&pipeline_handle.indexing_pipeline_id, pipeline_metrics)) }) .collect(); self.cluster @@ -465,31 +476,42 @@ impl IndexingService { ctx: &ActorContext, physical_indexing_plan_request: ApplyIndexingPlanRequest, ) -> Result { - let mut updated_pipeline_ids: HashSet = HashSet::new(); - let mut pipeline_ordinals: HashMap<&IndexingTask, usize> = HashMap::new(); - for indexing_task in &physical_indexing_plan_request.indexing_tasks { - let pipeline_ord = pipeline_ordinals.entry(indexing_task).or_insert(0); - let pipeline_id = IndexingPipelineId { - node_id: self.node_id.clone(), - index_uid: IndexUid::from(indexing_task.index_uid.to_string()), - source_id: indexing_task.source_id.clone(), - pipeline_ord: *pipeline_ord, - }; - *pipeline_ord += 1; - updated_pipeline_ids.insert(pipeline_id); - } - - let running_pipeline_ids: HashSet = - self.indexing_pipelines.keys().cloned().collect(); + let pipelines_uid_in_plan: FnvHashSet = physical_indexing_plan_request + .indexing_tasks + .iter() + .map(|indexing_task| indexing_task.pipeline_uid()) + .collect(); + let pipeline_to_add: FnvHashSet<&IndexingTask> = physical_indexing_plan_request + .indexing_tasks + .iter() + .filter(|indexing_task| { + let pipeline_uid = indexing_task.pipeline_uid(); + !self.indexing_pipelines.contains_key(&pipeline_uid) + }) + .collect::>(); + let pipeline_uid_to_remove: Vec = self + .indexing_pipelines + .keys() + .cloned() + .filter(|pipeline_uid| !pipelines_uid_in_plan.contains(pipeline_uid)) + .collect::>(); + let indexing_pipeline_ids_to_add: Vec = pipeline_to_add + .iter() + .flat_map(|indexing_task| { + let pipeline_uid = indexing_task.pipeline_uid(); + let index_uid = IndexUid::parse(indexing_task.index_uid.clone()).ok()?; + Some(IndexingPipelineId { + node_id: self.node_id.clone(), + index_uid, + source_id: indexing_task.source_id.clone(), + pipeline_uid, + }) + }) + .collect(); // Spawn new pipeline in the new plan that are not currently running let failed_spawning_pipeline_ids = self - .spawn_pipelines( - ctx, - updated_pipeline_ids - .difference(&running_pipeline_ids) - .collect(), - ) + .spawn_pipelines(ctx, &indexing_pipeline_ids_to_add[..]) .await?; // TODO: Temporary hack to assign shards to pipelines. @@ -497,15 +519,8 @@ impl IndexingService { if indexing_task.shard_ids.is_empty() { continue; } - let pipeline_id = IndexingPipelineId { - node_id: self.node_id.clone(), - index_uid: indexing_task.index_uid.clone().into(), - source_id: indexing_task.source_id.clone(), - pipeline_ord: 0, - }; - let Some((pipeline_mailbox, _pipeline_handle)) = - self.indexing_pipelines.get(&pipeline_id) - else { + let pipeline_uid = indexing_task.pipeline_uid(); + let Some(pipeline_handle) = self.indexing_pipelines.get(&pipeline_uid) else { continue; }; let assignment = Assignment { @@ -513,17 +528,13 @@ impl IndexingService { }; let message = AssignShards(assignment); - if let Err(error) = pipeline_mailbox.send_message(message).await { - error!("failed to assign shards to indexing pipeline: {}", error); + if let Err(error) = pipeline_handle.mailbox.send_message(message).await { + error!(error=%error, "failed to assign shards to indexing pipeline"); } } + // Shut down currently running pipelines that are missing in the new plan. - self.shutdown_pipelines( - running_pipeline_ids - .difference(&updated_pipeline_ids) - .collect(), - ) - .await; + self.shutdown_pipelines(&pipeline_uid_to_remove).await; self.update_cluster_running_indexing_tasks().await; @@ -540,7 +551,7 @@ impl IndexingService { async fn spawn_pipelines( &mut self, ctx: &ActorContext, - added_pipeline_ids: Vec<&IndexingPipelineId>, + added_pipeline_ids: &[IndexingPipelineId], ) -> Result, IndexingError> { // We fetch the new indexes metadata. let indexes_metadata_futures = added_pipeline_ids @@ -592,9 +603,16 @@ impl IndexingService { } /// Shuts down the pipelines with supplied ids and performs necessary cleanup. - async fn shutdown_pipelines(&mut self, pipeline_ids: Vec<&IndexingPipelineId>) { - for pipeline_id_to_remove in pipeline_ids.clone() { - match self.detach_pipeline(pipeline_id_to_remove).await { + async fn shutdown_pipelines(&mut self, pipeline_uids: &[PipelineUid]) { + let should_gc_ingest_api_queues = pipeline_uids + .iter() + .flat_map(|pipeline_uid| self.indexing_pipelines.get(pipeline_uid)) + .any(|pipeline_handle| { + pipeline_handle.indexing_pipeline_id.source_id == INGEST_API_SOURCE_ID + }); + + for &pipeline_uid_to_remove in pipeline_uids { + match self.detach_pipeline(pipeline_uid_to_remove).await { Ok(pipeline_handle) => { // Killing the pipeline ensure that all pipeline actors will stop. pipeline_handle.kill().await; @@ -603,7 +621,7 @@ impl IndexingService { // Just log the detach error, it can only come from a missing pipeline in the // `indexing_pipeline_handles`. error!( - pipeline_id=?pipeline_id_to_remove, + pipeline_id=?pipeline_uid_to_remove, err=?error, "Detach error.", ); @@ -614,9 +632,6 @@ impl IndexingService { // If at least one ingest source has been removed, the related index has possibly been // deleted. Thus we run a garbage collect to remove queues of potentially deleted // indexes. - let should_gc_ingest_api_queues = pipeline_ids - .iter() - .any(|pipeline_id_to_remove| pipeline_id_to_remove.source_id == INGEST_API_SOURCE_ID); if should_gc_ingest_api_queues { if let Err(error) = self.run_ingest_api_queues_gc().await { warn!( @@ -631,10 +646,12 @@ impl IndexingService { async fn update_cluster_running_indexing_tasks(&self) { let indexing_tasks = self .indexing_pipelines - .keys() + .values() + .map(|pipeline_handle| &pipeline_handle.indexing_pipeline_id) .map(|pipeline_id| IndexingTask { index_uid: pipeline_id.index_uid.to_string(), source_id: pipeline_id.source_id.clone(), + pipeline_uid: Some(pipeline_id.pipeline_uid), shard_ids: Vec::new(), }) // Sort indexing tasks so it's more readable for debugging purpose. @@ -710,7 +727,7 @@ impl Handler for IndexingService { msg: ObservePipeline, _ctx: &ActorContext, ) -> Result { - let observation = self.observe_pipeline(&msg.pipeline_id).await; + let observation = self.observe_pipeline(msg.pipeline_id.pipeline_uid).await; Ok(observation) } } @@ -724,7 +741,7 @@ impl Handler for IndexingService { msg: DetachIndexingPipeline, _ctx: &ActorContext, ) -> Result { - Ok(self.detach_pipeline(&msg.pipeline_id).await) + Ok(self.detach_pipeline(msg.pipeline_id.pipeline_uid).await) } } @@ -787,7 +804,7 @@ impl Handler for IndexingService { ctx, message.index_id, message.source_config, - message.pipeline_ord, + message.pipeline_uid, ) .await) } @@ -925,10 +942,10 @@ mod tests { }; let spawn_pipeline_msg = SpawnPipeline { index_id: index_id.clone(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::from_u128(1111u128), source_config: source_config_0.clone(), }; - let pipeline_id = indexing_service + let pipeline_id: IndexingPipelineId = indexing_service .ask_for_res(spawn_pipeline_msg.clone()) .await .unwrap(); @@ -939,7 +956,7 @@ mod tests { assert_eq!(pipeline_id.index_uid.index_id(), index_id); assert_eq!(pipeline_id.source_id, source_config_0.source_id); assert_eq!(pipeline_id.node_id, "test-node"); - assert_eq!(pipeline_id.pipeline_ord, 0); + assert_eq!(pipeline_id.pipeline_uid, PipelineUid::from_u128(1111u128)); assert_eq!( indexing_service_handle .observe() @@ -1018,7 +1035,7 @@ mod tests { .ask_for_res(SpawnPipeline { index_id: index_id.clone(), source_config, - pipeline_ord: 0, + pipeline_uid: PipelineUid::default(), }) .await .unwrap(); @@ -1095,11 +1112,13 @@ mod tests { index_uid: metadata.index_uid.to_string(), source_id: "test-indexing-service--source-1".to_string(), shard_ids: Vec::new(), + pipeline_uid: Some(PipelineUid::from_u128(0u128)), }, IndexingTask { index_uid: metadata.index_uid.to_string(), source_id: "test-indexing-service--source-1".to_string(), shard_ids: Vec::new(), + pipeline_uid: Some(PipelineUid::from_u128(1u128)), }, ]; indexing_service @@ -1139,21 +1158,25 @@ mod tests { index_uid: metadata.index_uid.to_string(), source_id: INGEST_API_SOURCE_ID.to_string(), shard_ids: Vec::new(), + pipeline_uid: Some(PipelineUid::from_u128(3u128)), }, IndexingTask { index_uid: metadata.index_uid.to_string(), source_id: "test-indexing-service--source-1".to_string(), shard_ids: Vec::new(), + pipeline_uid: Some(PipelineUid::from_u128(1u128)), }, IndexingTask { index_uid: metadata.index_uid.to_string(), source_id: "test-indexing-service--source-1".to_string(), shard_ids: Vec::new(), + pipeline_uid: Some(PipelineUid::from_u128(2u128)), }, IndexingTask { index_uid: metadata.index_uid.to_string(), source_id: source_config_2.source_id.clone(), shard_ids: Vec::new(), + pipeline_uid: Some(PipelineUid::from_u128(4u128)), }, ]; indexing_service @@ -1193,16 +1216,19 @@ mod tests { index_uid: metadata.index_uid.to_string(), source_id: INGEST_API_SOURCE_ID.to_string(), shard_ids: Vec::new(), + pipeline_uid: Some(PipelineUid::from_u128(3u128)), }, IndexingTask { index_uid: metadata.index_uid.to_string(), source_id: "test-indexing-service--source-1".to_string(), shard_ids: Vec::new(), + pipeline_uid: Some(PipelineUid::from_u128(1u128)), }, IndexingTask { index_uid: metadata.index_uid.to_string(), source_id: source_config_2.source_id.clone(), shard_ids: Vec::new(), + pipeline_uid: Some(PipelineUid::from_u128(4u128)), }, ]; indexing_service @@ -1324,7 +1350,7 @@ mod tests { .ask_for_res(SpawnPipeline { index_id: index_id.clone(), source_config, - pipeline_ord: 0, + pipeline_uid: PipelineUid::default(), }) .await .unwrap(); @@ -1382,9 +1408,9 @@ mod tests { ) -> Result { Ok(self .indexing_pipelines - .get(&message.0) + .get(&message.0.pipeline_uid) .unwrap() - .1 + .handle .check_health(true)) } } @@ -1442,7 +1468,7 @@ mod tests { .ask_for_res(SpawnPipeline { index_id: index_id.clone(), source_config, - pipeline_ord: 0, + pipeline_uid: PipelineUid::default(), }) .await .unwrap(); diff --git a/quickwit/quickwit-indexing/src/actors/merge_executor.rs b/quickwit/quickwit-indexing/src/actors/merge_executor.rs index 9444a05c22c..81f4f642f12 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_executor.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_executor.rs @@ -39,6 +39,7 @@ use quickwit_proto::metastore::{ DeleteTask, ListDeleteTasksRequest, MarkSplitsForDeletionRequest, MetastoreService, MetastoreServiceClient, }; +use quickwit_proto::types::PipelineUid; use quickwit_query::get_quickwit_fastfield_normalizer_manager; use quickwit_query::query_ast::QueryAst; use tantivy::directory::{Advice, DirectoryClone, MmapDirectory, RamDirectory}; @@ -431,7 +432,7 @@ impl MergeExecutor { let index_pipeline_id = IndexingPipelineId { index_uid: split.index_uid, node_id: split.node_id.clone(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::new(), source_id: split.source_id.clone(), }; let indexed_split = IndexedSplit { @@ -591,7 +592,7 @@ mod tests { index_uid: index_uid.clone(), source_id: "test-source".to_string(), node_id: "test-node".to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::from_u128(0u128), }; for split_id in 0..4 { let single_doc = std::iter::once( @@ -715,7 +716,7 @@ mod tests { let pipeline_id = IndexingPipelineId { index_uid: index_uid.clone(), node_id: "unknown".to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::from_u128(0u128), source_id: "unknown".to_string(), }; test_sandbox.add_documents(docs).await?; diff --git a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs index be4e816871e..c4509e08a2b 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs @@ -214,7 +214,7 @@ impl MergePipeline { info!( index_id=%self.params.pipeline_id.index_uid.index_id(), source_id=%self.params.pipeline_id.source_id, - pipeline_ord=%self.params.pipeline_id.pipeline_ord, + pipeline_uid=%self.params.pipeline_id.pipeline_uid, root_dir=%self.params.indexing_directory.path().display(), merge_policy=?self.params.merge_policy, "spawn merge pipeline", @@ -489,7 +489,7 @@ mod tests { use quickwit_metastore::{ListSplitsRequestExt, ListSplitsResponseExt}; use quickwit_proto::indexing::IndexingPipelineId; use quickwit_proto::metastore::{ListSplitsResponse, MetastoreServiceClient}; - use quickwit_proto::types::IndexUid; + use quickwit_proto::types::{IndexUid, PipelineUid}; use quickwit_storage::RamStorage; use crate::actors::merge_pipeline::{MergePipeline, MergePipelineParams}; @@ -504,7 +504,7 @@ mod tests { index_uid: index_uid.clone(), source_id: "test-source".to_string(), node_id: "test-node".to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::default(), }; metastore .expect_list_splits() diff --git a/quickwit/quickwit-indexing/src/actors/merge_planner.rs b/quickwit/quickwit-indexing/src/actors/merge_planner.rs index 43f6fe28123..f735e10af97 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_planner.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_planner.rs @@ -470,7 +470,7 @@ mod tests { use quickwit_config::IndexingSettings; use quickwit_metastore::{SplitMaturity, SplitMetadata}; use quickwit_proto::indexing::IndexingPipelineId; - use quickwit_proto::types::IndexUid; + use quickwit_proto::types::{IndexUid, PipelineUid}; use tantivy::TrackedObject; use time::OffsetDateTime; @@ -513,7 +513,7 @@ mod tests { index_uid: index_uid.clone(), source_id: "test-source".to_string(), node_id: "test-node".to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::default(), }; let merge_policy = Arc::new(StableLogMergePolicy::new( StableLogMergePolicyConfig { @@ -599,7 +599,7 @@ mod tests { index_uid: index_uid.clone(), source_id: "test-source".to_string(), node_id: "test-node".to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::default(), }; let merge_policy_config = ConstWriteAmplificationMergePolicyConfig { merge_factor: 2, @@ -651,7 +651,7 @@ mod tests { index_uid: index_uid.clone(), source_id: "test-source".to_string(), node_id: "test-node".to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::default(), }; let merge_policy_config = ConstWriteAmplificationMergePolicyConfig { merge_factor: 2, @@ -750,7 +750,7 @@ mod tests { index_uid: index_uid.clone(), source_id: "test-source".to_string(), node_id: "test-node".to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::default(), }; let merge_policy_config = ConstWriteAmplificationMergePolicyConfig { merge_factor: 2, @@ -828,7 +828,7 @@ mod tests { index_uid, source_id: "test-source".to_string(), node_id: "test-node".to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::default(), }; let merge_policy_config = ConstWriteAmplificationMergePolicyConfig { merge_factor: 2, @@ -892,7 +892,7 @@ mod tests { index_uid: index_uid.clone(), source_id: "test-source".to_string(), node_id: "test-node".to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::default(), }; let merge_policy_config = ConstWriteAmplificationMergePolicyConfig { merge_factor: 2, @@ -972,7 +972,7 @@ mod tests { index_uid: index_uid.clone(), source_id: "test-source".to_string(), node_id: "test-node".to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::default(), }; let merge_policy_config = ConstWriteAmplificationMergePolicyConfig { merge_factor: 2, diff --git a/quickwit/quickwit-indexing/src/actors/packager.rs b/quickwit/quickwit-indexing/src/actors/packager.rs index 24a66e5fde7..658a498b91e 100644 --- a/quickwit/quickwit-indexing/src/actors/packager.rs +++ b/quickwit/quickwit-indexing/src/actors/packager.rs @@ -338,7 +338,7 @@ mod tests { use quickwit_actors::{ObservationType, Universe}; use quickwit_metastore::checkpoint::IndexCheckpointDelta; use quickwit_proto::indexing::IndexingPipelineId; - use quickwit_proto::types::IndexUid; + use quickwit_proto::types::{IndexUid, PipelineUid}; use tantivy::directory::MmapDirectory; use tantivy::schema::{NumericOptions, Schema, FAST, STRING, TEXT}; use tantivy::{doc, DateTime, IndexBuilder, IndexSettings}; @@ -413,7 +413,7 @@ mod tests { index_uid: IndexUid::new_with_random_ulid("test-index"), source_id: "test-source".to_string(), node_id: "test-node".to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::default(), }; // TODO: In the future we would like that kind of segment flush to emit a new split, diff --git a/quickwit/quickwit-indexing/src/actors/uploader.rs b/quickwit/quickwit-indexing/src/actors/uploader.rs index e70653bce4e..8892c220f90 100644 --- a/quickwit/quickwit-indexing/src/actors/uploader.rs +++ b/quickwit/quickwit-indexing/src/actors/uploader.rs @@ -490,6 +490,7 @@ mod tests { use quickwit_metastore::checkpoint::{IndexCheckpointDelta, SourceCheckpointDelta}; use quickwit_proto::indexing::IndexingPipelineId; use quickwit_proto::metastore::EmptyResponse; + use quickwit_proto::types::PipelineUid; use quickwit_storage::RamStorage; use tantivy::DateTime; use tokio::sync::oneshot; @@ -507,7 +508,7 @@ mod tests { index_uid: IndexUid::new_with_random_ulid("test-index"), source_id: "test-source".to_string(), node_id: "test-node".to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::default(), }; let (sequencer_mailbox, sequencer_inbox) = universe.create_test_mailbox::>(); @@ -618,7 +619,7 @@ mod tests { index_uid: IndexUid::new_with_random_ulid("test-index"), source_id: "test-source".to_string(), node_id: "test-node".to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::default(), }; let universe = Universe::new(); let (sequencer_mailbox, sequencer_inbox) = @@ -765,7 +766,7 @@ mod tests { index_uid: IndexUid::from("test-index-no-sequencer:11111111111111111111111111"), source_id: "test-source".to_string(), node_id: "test-node".to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::default(), }; let universe = Universe::new(); let (publisher_mailbox, publisher_inbox) = universe.create_test_mailbox::(); @@ -945,7 +946,7 @@ mod tests { index_uid: IndexUid::new_with_random_ulid("test-index"), source_id: "test-source".to_string(), node_id: "test-node".to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::default(), }; let mut mock_metastore = MetastoreServiceClient::mock(); mock_metastore diff --git a/quickwit/quickwit-indexing/src/merge_policy/mod.rs b/quickwit/quickwit-indexing/src/merge_policy/mod.rs index d264f8a9f20..0f30a482242 100644 --- a/quickwit/quickwit-indexing/src/merge_policy/mod.rs +++ b/quickwit/quickwit-indexing/src/merge_policy/mod.rs @@ -170,7 +170,7 @@ pub mod tests { use proptest::prelude::*; use quickwit_actors::Universe; use quickwit_proto::indexing::IndexingPipelineId; - use quickwit_proto::types::IndexUid; + use quickwit_proto::types::{IndexUid, PipelineUid}; use rand::seq::SliceRandom; use tantivy::TrackedObject; use time::OffsetDateTime; @@ -335,7 +335,7 @@ pub mod tests { index_uid: IndexUid::new_with_random_ulid("test_index"), source_id: "test_source".to_string(), node_id: "test_node".to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::from_u128(0u128), }; let split_attrs = merge_split_attrs(merged_split_id, &pipeline_id, splits); create_split_metadata(merge_policy, &split_attrs, tags, 0..0) @@ -366,7 +366,7 @@ pub mod tests { index_uid: IndexUid::new_with_random_ulid("test-index"), source_id: "test-source".to_string(), node_id: "test-node".to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::default(), }; let merge_planner = MergePlanner::new( pipeline_id, diff --git a/quickwit/quickwit-indexing/src/models/indexed_split.rs b/quickwit/quickwit-indexing/src/models/indexed_split.rs index d5182800f61..81aa51b8af7 100644 --- a/quickwit/quickwit-indexing/src/models/indexed_split.rs +++ b/quickwit/quickwit-indexing/src/models/indexed_split.rs @@ -123,7 +123,7 @@ impl IndexedSplitBuilder { index_id=%self.split_attrs.pipeline_id.index_uid.index_id(), source_id=%self.split_attrs.pipeline_id.source_id, node_id=%self.split_attrs.pipeline_id.node_id, - pipeline_id=%self.split_attrs.pipeline_id.pipeline_ord, + pipeline_uid=%self.split_attrs.pipeline_id.pipeline_uid, split_id=%self.split_attrs.split_id, partition_id=%self.split_attrs.partition_id, num_docs=%self.split_attrs.num_docs, diff --git a/quickwit/quickwit-indexing/src/models/indexing_service_message.rs b/quickwit/quickwit-indexing/src/models/indexing_service_message.rs index 4c2de58aabe..14afa430028 100644 --- a/quickwit/quickwit-indexing/src/models/indexing_service_message.rs +++ b/quickwit/quickwit-indexing/src/models/indexing_service_message.rs @@ -19,6 +19,7 @@ use quickwit_config::SourceConfig; use quickwit_proto::indexing::IndexingPipelineId; +use quickwit_proto::types::PipelineUid; use crate::actors::MergePipelineId; @@ -26,7 +27,7 @@ use crate::actors::MergePipelineId; pub struct SpawnPipeline { pub index_id: String, pub source_config: SourceConfig, - pub pipeline_ord: usize, + pub pipeline_uid: PipelineUid, } #[derive(Clone, Debug)] diff --git a/quickwit/quickwit-indexing/src/source/ingest/mod.rs b/quickwit/quickwit-indexing/src/source/ingest/mod.rs index 352d4962f9f..254e62c7c87 100644 --- a/quickwit/quickwit-indexing/src/source/ingest/mod.rs +++ b/quickwit/quickwit-indexing/src/source/ingest/mod.rs @@ -84,7 +84,7 @@ impl TypedSourceFactory for IngestSourceFactory { struct ClientId { node_id: NodeId, source_uid: SourceUid, - pipeline_ord: usize, + pipeline_uid: String, } impl fmt::Display for ClientId { @@ -92,17 +92,17 @@ impl fmt::Display for ClientId { write!( formatter, "indexer/{}/{}/{}/{}", - self.node_id, self.source_uid.index_uid, self.source_uid.source_id, self.pipeline_ord + self.node_id, self.source_uid.index_uid, self.source_uid.source_id, self.pipeline_uid ) } } impl ClientId { - fn new(node_id: NodeId, source_uid: SourceUid, pipeline_ord: usize) -> Self { - Self { + fn new(node_id: NodeId, source_uid: SourceUid, pipeline_uid: String) -> Self { + ClientId { node_id, source_uid, - pipeline_ord, + pipeline_uid, } } @@ -163,7 +163,7 @@ impl IngestSource { index_uid: runtime_args.index_uid().clone(), source_id: runtime_args.source_id().to_string(), }, - runtime_args.pipeline_ord(), + runtime_args.pipeline_uid().to_string(), ); let metastore = runtime_args.metastore.clone(); let ingester_pool = runtime_args.ingester_pool.clone(); @@ -522,6 +522,7 @@ mod tests { use quickwit_proto::ingest::ingester::{IngesterServiceClient, TruncateShardsResponse}; use quickwit_proto::ingest::{IngestV2Error, MRecordBatch, Shard, ShardState}; use quickwit_proto::metastore::{AcquireShardsResponse, AcquireShardsSubresponse}; + use quickwit_proto::types::PipelineUid; use quickwit_storage::StorageResolver; use tokio::sync::mpsc::error::TryRecvError; use tokio::sync::watch; @@ -536,11 +537,11 @@ mod tests { node_id: "test-node".to_string(), index_uid: "test-index:0".into(), source_id: "test-source".to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::default(), }; let source_config = SourceConfig::for_test("test-source", SourceParams::Ingest); - let publish_token = - "indexer/test-node/test-index:0/test-source/0/00000000000000000000000000"; + let publish_token = "indexer/test-node/test-index:0/test-source/\ + 00000000000000000000000000/00000000000000000000000000"; let mut mock_metastore = MetastoreServiceClient::mock(); mock_metastore @@ -581,7 +582,7 @@ mod tests { .returning(|request| { assert_eq!( request.client_id, - "indexer/test-node/test-index:0/test-source/0" + "indexer/test-node/test-index:0/test-source/00000000000000000000000000" ); assert_eq!(request.index_uid, "test-index:0"); assert_eq!(request.source_id, "test-source"); @@ -693,11 +694,11 @@ mod tests { node_id: "test-node".to_string(), index_uid: "test-index:0".into(), source_id: "test-source".to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::default(), }; let source_config = SourceConfig::for_test("test-source", SourceParams::Ingest); - let publish_token = - "indexer/test-node/test-index:0/test-source/0/00000000000000000000000000"; + let publish_token = "indexer/test-node/test-index:0/test-source/\ + 00000000000000000000000000/00000000000000000000000000"; let mut mock_metastore = MetastoreServiceClient::mock(); mock_metastore @@ -812,11 +813,11 @@ mod tests { node_id: "test-node".to_string(), index_uid: "test-index:0".into(), source_id: "test-source".to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::default(), }; let source_config = SourceConfig::for_test("test-source", SourceParams::Ingest); - let publish_token = - "indexer/test-node/test-index:0/test-source/0/00000000000000000000000000"; + let publish_token = "indexer/test-node/test-index:0/test-source/\ + 00000000000000000000000000/00000000000000000000000000"; let mut mock_metastore = MetastoreServiceClient::mock(); mock_metastore @@ -974,7 +975,7 @@ mod tests { node_id: "test-node".to_string(), index_uid: "test-index:0".into(), source_id: "test-source".to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::default(), }; let source_config = SourceConfig::for_test("test-source", SourceParams::Ingest); let mock_metastore = MetastoreServiceClient::mock(); @@ -1138,7 +1139,7 @@ mod tests { node_id: "test-node".to_string(), index_uid: "test-index:0".into(), source_id: "test-source".to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::default(), }; let source_config = SourceConfig::for_test("test-source", SourceParams::Ingest); let mock_metastore = MetastoreServiceClient::mock(); diff --git a/quickwit/quickwit-indexing/src/source/mod.rs b/quickwit/quickwit-indexing/src/source/mod.rs index c5bd193d152..55217865e53 100644 --- a/quickwit/quickwit-indexing/src/source/mod.rs +++ b/quickwit/quickwit-indexing/src/source/mod.rs @@ -96,7 +96,7 @@ use quickwit_ingest::IngesterPool; use quickwit_metastore::checkpoint::{SourceCheckpoint, SourceCheckpointDelta}; use quickwit_proto::indexing::IndexingPipelineId; use quickwit_proto::metastore::MetastoreServiceClient; -use quickwit_proto::types::{IndexUid, ShardId}; +use quickwit_proto::types::{IndexUid, PipelineUid, ShardId}; use quickwit_storage::StorageResolver; use serde_json::Value as JsonValue; pub use source_factory::{SourceFactory, SourceLoader, TypedSourceFactory}; @@ -155,8 +155,8 @@ impl SourceRuntimeArgs { &self.pipeline_id.source_id } - pub fn pipeline_ord(&self) -> usize { - self.pipeline_id.pipeline_ord + pub fn pipeline_uid(&self) -> PipelineUid { + self.pipeline_id.pipeline_uid } #[cfg(test)] @@ -171,7 +171,7 @@ impl SourceRuntimeArgs { node_id: "test-node".to_string(), index_uid, source_id: source_config.source_id.clone(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::from_u128(0u128), }; Arc::new(SourceRuntimeArgs { pipeline_id, diff --git a/quickwit/quickwit-indexing/src/test_utils.rs b/quickwit/quickwit-indexing/src/test_utils.rs index 784cc1b60be..1069def83f6 100644 --- a/quickwit/quickwit-indexing/src/test_utils.rs +++ b/quickwit/quickwit-indexing/src/test_utils.rs @@ -39,7 +39,7 @@ use quickwit_metastore::{ CreateIndexRequestExt, MetastoreResolver, Split, SplitMetadata, SplitState, }; use quickwit_proto::metastore::{CreateIndexRequest, MetastoreService, MetastoreServiceClient}; -use quickwit_proto::types::IndexUid; +use quickwit_proto::types::{IndexUid, PipelineUid}; use quickwit_storage::{Storage, StorageResolver}; use serde_json::Value as JsonValue; @@ -173,7 +173,7 @@ impl TestSandbox { .ask_for_res(SpawnPipeline { index_id: self.index_uid.index_id().to_string(), source_config, - pipeline_ord: 0, + pipeline_uid: PipelineUid::from_u128(0u128), }) .await?; let pipeline_handle = self diff --git a/quickwit/quickwit-jaeger/src/integration_tests.rs b/quickwit/quickwit-jaeger/src/integration_tests.rs index 67a59b09e72..5069da86983 100644 --- a/quickwit/quickwit-jaeger/src/integration_tests.rs +++ b/quickwit/quickwit-jaeger/src/integration_tests.rs @@ -54,7 +54,7 @@ use quickwit_proto::opentelemetry::proto::trace::v1::span::{Event as OtlpEvent, use quickwit_proto::opentelemetry::proto::trace::v1::{ ResourceSpans, ScopeSpans, Span as OtlpSpan, Status as OtlpStatus, }; -use quickwit_proto::types::IndexUid; +use quickwit_proto::types::{IndexUid, PipelineUid}; use quickwit_search::{ start_searcher_service, SearchJobPlacer, SearchService, SearchServiceClient, SearcherContext, SearcherPool, @@ -427,7 +427,7 @@ async fn setup_traces_index( let spawn_pipeline_request = SpawnPipeline { index_id: index_id.clone(), source_config, - pipeline_ord: 0, + pipeline_uid: PipelineUid::default(), }; indexer_service .ask_for_res(spawn_pipeline_request) diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs index 78196e48bce..d64db0b4ba5 100644 --- a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs +++ b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs @@ -39,7 +39,7 @@ use quickwit_indexing::{IndexingSplitStore, PublisherType, SplitsUpdateMailbox}; use quickwit_metastore::IndexMetadataResponseExt; use quickwit_proto::indexing::IndexingPipelineId; use quickwit_proto::metastore::{IndexMetadataRequest, MetastoreService, MetastoreServiceClient}; -use quickwit_proto::types::IndexUid; +use quickwit_proto::types::{IndexUid, PipelineUid}; use quickwit_search::SearchJobPlacer; use quickwit_storage::Storage; use serde::Serialize; @@ -191,7 +191,7 @@ impl DeleteTaskPipeline { let index_pipeline_id = IndexingPipelineId { index_uid: self.index_uid.clone(), node_id: "unknown".to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::from_u128(0u128), source_id: "unknown".to_string(), }; let throughput_limit: f64 = index_config diff --git a/quickwit/quickwit-proto/protos/quickwit/indexing.proto b/quickwit/quickwit-proto/protos/quickwit/indexing.proto index e6aafee7600..555cf56ad6e 100644 --- a/quickwit/quickwit-proto/protos/quickwit/indexing.proto +++ b/quickwit/quickwit-proto/protos/quickwit/indexing.proto @@ -39,6 +39,8 @@ message IndexingTask { string index_uid = 1; // The task's source ID. string source_id = 2; + // pipeline id + PipelineUid pipeline_uid = 4; // The shards assigned to the indexer. repeated uint64 shard_ids = 3; } diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.indexing.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.indexing.rs index deccabf4b09..e7d3715d5d2 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.indexing.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.indexing.rs @@ -15,6 +15,9 @@ pub struct IndexingTask { /// The task's source ID. #[prost(string, tag = "2")] pub source_id: ::prost::alloc::string::String, + /// pipeline id + #[prost(message, optional, tag = "4")] + pub pipeline_uid: ::core::option::Option, /// The shards assigned to the indexer. #[prost(uint64, repeated, tag = "3")] pub shard_ids: ::prost::alloc::vec::Vec, diff --git a/quickwit/quickwit-proto/src/indexing/mod.rs b/quickwit/quickwit-proto/src/indexing/mod.rs index 88bcc82c4b0..3a10f336aa7 100644 --- a/quickwit/quickwit-proto/src/indexing/mod.rs +++ b/quickwit/quickwit-proto/src/indexing/mod.rs @@ -28,7 +28,7 @@ use quickwit_common::pubsub::Event; use serde::{Deserialize, Serialize}; use thiserror; -use crate::types::{IndexUid, Position, ShardId, SourceId, SourceUid}; +use crate::types::{IndexUid, PipelineUid, Position, ShardId, SourceId, SourceUid}; use crate::{ServiceError, ServiceErrorCode}; include!("../codegen/quickwit/quickwit.indexing.rs"); @@ -37,15 +37,17 @@ pub type IndexingResult = std::result::Result; #[derive(Debug, thiserror::Error)] pub enum IndexingError { - #[error("indexing pipeline `{index_id}` for source `{source_id}` does not exist")] - MissingPipeline { index_id: String, source_id: String }, + #[error("indexing pipeline `{pipeline_uid}` does not exist")] + MissingPipeline { pipeline_uid: PipelineUid }, + #[error("indexing merge pipeline `{merge_pipeline_id}` does not exist")] + MissingMergePipeline { merge_pipeline_id: String }, #[error( - "pipeline #{pipeline_ord} for index `{index_id}` and source `{source_id}` already exists" + "pipeline #{pipeline_uid} for index `{index_id}` and source `{source_id}` already exists" )] PipelineAlreadyExists { index_id: String, source_id: SourceId, - pipeline_ord: usize, + pipeline_uid: PipelineUid, }, #[error("I/O error `{0}`")] Io(io::Error), @@ -68,16 +70,18 @@ pub enum IndexingError { impl From for tonic::Status { fn from(error: IndexingError) -> Self { match error { - IndexingError::MissingPipeline { - index_id, - source_id, - } => tonic::Status::not_found(format!("missing pipeline {index_id}/{source_id}")), + IndexingError::MissingPipeline { pipeline_uid } => { + tonic::Status::not_found(format!("missing pipeline `{pipeline_uid}`")) + } + IndexingError::MissingMergePipeline { merge_pipeline_id } => { + tonic::Status::not_found(format!("missing merge pipeline `{merge_pipeline_id}`")) + } IndexingError::PipelineAlreadyExists { index_id, source_id, - pipeline_ord, + pipeline_uid, } => tonic::Status::already_exists(format!( - "pipeline {index_id}/{source_id} {pipeline_ord} already exists " + "pipeline {index_id}/{source_id} {pipeline_uid} already exists " )), IndexingError::Io(error) => tonic::Status::internal(error.to_string()), IndexingError::InvalidParams(error) => { @@ -103,13 +107,12 @@ impl From for IndexingError { IndexingError::InvalidParams(anyhow!(status.message().to_string())) } tonic::Code::NotFound => IndexingError::MissingPipeline { - index_id: "".to_string(), - source_id: "".to_string(), + pipeline_uid: PipelineUid::default(), }, tonic::Code::AlreadyExists => IndexingError::PipelineAlreadyExists { index_id: "".to_string(), source_id: "".to_string(), - pipeline_ord: 0, + pipeline_uid: PipelineUid::default(), }, tonic::Code::Unavailable => IndexingError::Unavailable, _ => IndexingError::InvalidParams(anyhow!(status.message().to_string())), @@ -121,6 +124,7 @@ impl ServiceError for IndexingError { fn error_code(&self) -> ServiceErrorCode { match self { Self::MissingPipeline { .. } => ServiceErrorCode::NotFound, + Self::MissingMergePipeline { .. } => ServiceErrorCode::NotFound, Self::PipelineAlreadyExists { .. } => ServiceErrorCode::BadRequest, Self::InvalidParams(_) => ServiceErrorCode::BadRequest, Self::SpawnPipelinesError { .. } => ServiceErrorCode::Internal, @@ -150,7 +154,7 @@ pub struct IndexingPipelineId { pub node_id: String, pub index_uid: IndexUid, pub source_id: SourceId, - pub pipeline_ord: usize, + pub pipeline_uid: PipelineUid, } impl Display for IndexingPipelineId { @@ -333,6 +337,13 @@ pub struct ShardPositionsUpdate { impl Event for ShardPositionsUpdate {} +impl IndexingTask { + pub fn pipeline_uid(&self) -> PipelineUid { + self.pipeline_uid + .expect("Pipeline UID should always be present.") + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/quickwit/quickwit-proto/src/types/pipeline_uid.rs b/quickwit/quickwit-proto/src/types/pipeline_uid.rs index 5f14f3358d2..279e0249ed5 100644 --- a/quickwit/quickwit-proto/src/types/pipeline_uid.rs +++ b/quickwit/quickwit-proto/src/types/pipeline_uid.rs @@ -17,21 +17,46 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use std::fmt::{Display, Formatter}; +use std::str::FromStr; + use serde::{Deserialize, Serialize}; use ulid::Ulid; /// The size of a ULID in bytes. const ULID_SIZE: usize = 16; +/// A pipeline uid identify an indexing pipeline and an indexing task. #[derive(Debug, Clone, Copy, Default, Hash, Eq, PartialEq, Ord, PartialOrd)] pub struct PipelineUid(Ulid); impl PipelineUid { + pub fn from_u128(ulid_u128: u128) -> PipelineUid { + PipelineUid(Ulid::from_bytes(ulid_u128.to_le_bytes())) + } + + /// Creates a new random pipeline uid. pub fn new() -> Self { Self(Ulid::new()) } } +impl FromStr for PipelineUid { + type Err = &'static str; + + fn from_str(pipeline_uid_str: &str) -> Result { + let pipeline_ulid = + Ulid::from_string(pipeline_uid_str).map_err(|_| "invalid pipeline uid")?; + Ok(PipelineUid(pipeline_ulid)) + } +} + +impl Display for PipelineUid { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + self.0.fmt(f) + } +} + impl Serialize for PipelineUid { fn serialize(&self, serializer: S) -> Result { serializer.collect_str(&self.0) diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 8444b8de7ee..42ec6043e89 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -913,6 +913,7 @@ mod tests { use quickwit_metastore::{metastore_for_test, IndexMetadata}; use quickwit_proto::indexing::IndexingTask; use quickwit_proto::metastore::ListIndexesMetadataResponse; + use quickwit_proto::types::PipelineUid; use quickwit_search::Job; use tokio::sync::{mpsc, watch}; use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream}; @@ -1015,6 +1016,7 @@ mod tests { assert!(new_indexer_node_info.indexing_tasks.is_empty()); let new_indexing_task = IndexingTask { + pipeline_uid: Some(PipelineUid::from_u128(0u128)), index_uid: "test-index:0".to_string(), source_id: "test-source".to_string(), shard_ids: Vec::new(),