diff --git a/config/tutorials/gh-archive/kafka-source.yaml b/config/tutorials/gh-archive/kafka-source.yaml index 3b0bd54b1eb..e4670538a29 100644 --- a/config/tutorials/gh-archive/kafka-source.yaml +++ b/config/tutorials/gh-archive/kafka-source.yaml @@ -1,8 +1,7 @@ -version: 0.7 +version: 0.8 source_id: kafka-source source_type: kafka -max_num_pipelines_per_indexer: 1 -desired_num_pipelines: 2 +num_pipelines: 2 params: topic: gh-archive client_params: diff --git a/docs/configuration/source-config.md b/docs/configuration/source-config.md index 04e0369f8c9..b588c4315fe 100644 --- a/docs/configuration/source-config.md +++ b/docs/configuration/source-config.md @@ -80,11 +80,10 @@ Short max poll interval durations may cause a source to crash when back pressure ```bash cat << EOF > source-config.yaml -version: 0.7 +version: 0.8 source_id: my-kafka-source source_type: kafka -max_num_pipelines_per_indexer: 1 -desired_num_pipelines: 2 +num_pipelines: 2 params: topic: my-topic client_params: @@ -164,38 +163,13 @@ EOF ./quickwit source create --index my-index --source-config source-config.yaml ``` -## Maximum number of pipelines per indexer - -The `max_num_pipelines_per_indexer` parameter is only available for sources that can be distributed: Kafka, GCP PubSub and Pulsar(coming soon). - -The maximum number of indexing pipelines defines the limit of pipelines spawned for the source on a given indexer. -This maximum can be reached only if there are enough `desired_num_pipelines` to run. - -:::note - -With the following parameters, only one pipeline will run on one indexer. - -- `max_num_pipelines_per_indexer=2` -- `desired_num_pipelines=1` - -::: - -## Desired number of pipelines - -`desired_num_pipelines` parameter is only available for sources that can be distributed: Kafka, GCP PubSub and Pulsar (coming soon). - -The desired number of indexing pipelines defines the number of pipelines to run on a cluster for the source. It is a "desired" -number as it cannot be reach it there is not enough indexers in -the cluster. - -:::note - -With the following parameters, only one pipeline will start on the sole indexer. +## Number of pipelines -- `max_num_pipelines_per_indexer=1` -- `desired_num_pipelines=2` +`num_pipelines` parameter is only available for sources that can be distributed: Kafka, GCP PubSub and Pulsar (coming soon). -::: +It defines the number of pipelines to run on a cluster for the source. The actual placement of these pipelines on the different indexer +will be decided by the control plane. Note that distributions of a source like Kafka is done by assigning a set of partitions to different pipelines. +As a result, it is recommended to make sure the number of partitions is a multiple of the number of `num_pipelines`. ## Transform parameters diff --git a/docs/ingest-data/kafka.md b/docs/ingest-data/kafka.md index a92a8968f1a..3f1a458afeb 100644 --- a/docs/ingest-data/kafka.md +++ b/docs/ingest-data/kafka.md @@ -101,11 +101,10 @@ This tutorial assumes that the Kafka cluster is available locally on the default # # Kafka source config file. # -version: 0.7 +version: 0.8 source_id: kafka-source source_type: kafka -max_num_pipelines_per_indexer: 1 -desired_num_pipelines: 2 +num_pipelines: 2 params: topic: gh-archive client_params: diff --git a/quickwit/quickwit-cli/src/source.rs b/quickwit/quickwit-cli/src/source.rs index aa444d6576f..7a12cfc0821 100644 --- a/quickwit/quickwit-cli/src/source.rs +++ b/quickwit/quickwit-cli/src/source.rs @@ -741,8 +741,7 @@ mod tests { .collect(); let sources = vec![SourceConfig { source_id: "foo-source".to_string(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::file("path/to/file"), transform_config: None, @@ -802,8 +801,7 @@ mod tests { let sources = [ SourceConfig { source_id: "foo-source".to_string(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::stdin(), transform_config: None, @@ -811,8 +809,7 @@ mod tests { }, SourceConfig { source_id: "bar-source".to_string(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::stdin(), transform_config: None, diff --git a/quickwit/quickwit-cli/src/tool.rs b/quickwit/quickwit-cli/src/tool.rs index d9826c2ea56..4ccc20f163c 100644 --- a/quickwit/quickwit-cli/src/tool.rs +++ b/quickwit/quickwit-cli/src/tool.rs @@ -422,8 +422,7 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result< .map(|vrl_script| TransformConfig::new(vrl_script, None)); let source_config = SourceConfig { source_id: CLI_SOURCE_ID.to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).expect("1 is always non-zero."), - desired_num_pipelines: NonZeroUsize::new(1).expect("1 is always non-zero."), + num_pipelines: NonZeroUsize::new(1).expect("1 is always non-zero."), enabled: true, source_params, transform_config, @@ -608,8 +607,7 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> { index_id: args.index_id, source_config: SourceConfig { source_id: args.source_id, - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::Vec(VecSourceParams::default()), transform_config: None, diff --git a/quickwit/quickwit-config/resources/tests/source_config/kafka-source.json b/quickwit/quickwit-config/resources/tests/source_config/kafka-source.json index 1b809299768..cd3380fe8db 100644 --- a/quickwit/quickwit-config/resources/tests/source_config/kafka-source.json +++ b/quickwit/quickwit-config/resources/tests/source_config/kafka-source.json @@ -2,7 +2,6 @@ "version": "0.7", "source_id": "hdfs-logs-kafka-source", "desired_num_pipelines": 2, - "max_num_pipelines_per_indexer": 2, "source_type": "kafka", "params": { "topic": "cloudera-cluster-logs", diff --git a/quickwit/quickwit-config/src/lib.rs b/quickwit/quickwit-config/src/lib.rs index ca003263686..1cb8119ee8e 100644 --- a/quickwit/quickwit-config/src/lib.rs +++ b/quickwit/quickwit-config/src/lib.rs @@ -72,7 +72,7 @@ pub use crate::node_config::{ enable_ingest_v2, IndexerConfig, IngestApiConfig, JaegerConfig, NodeConfig, SearcherConfig, SplitCacheLimits, DEFAULT_QW_CONFIG_PATH, }; -use crate::source_config::serialize::{SourceConfigV0_7, VersionedSourceConfig}; +use crate::source_config::serialize::{SourceConfigV0_7, SourceConfigV0_8, VersionedSourceConfig}; pub use crate::storage_config::{ AzureStorageConfig, FileStorageConfig, GoogleCloudStorageConfig, RamStorageConfig, S3StorageConfig, StorageBackend, StorageBackendFlavor, StorageConfig, StorageConfigs, @@ -88,6 +88,7 @@ pub use crate::storage_config::{ DocMapping, VersionedSourceConfig, SourceConfigV0_7, + SourceConfigV0_8, VersionedIndexConfig, IndexConfigV0_7, VersionedIndexTemplate, diff --git a/quickwit/quickwit-config/src/source_config/mod.rs b/quickwit/quickwit-config/src/source_config/mod.rs index e077f112b36..2bec4be7935 100644 --- a/quickwit/quickwit-config/src/source_config/mod.rs +++ b/quickwit/quickwit-config/src/source_config/mod.rs @@ -55,33 +55,14 @@ pub const RESERVED_SOURCE_IDS: &[&str] = pub struct SourceConfig { pub source_id: String, - /// Maximum number of indexing pipelines spawned for the source on a given indexer. - /// The maximum is reached only if there is enough `desired_num_pipelines` to run. - /// The value is only used by sources that Quickwit knows how to distribute across - /// pipelines/nodes, that is for Kafka sources only. - /// Example: - /// - `max_num_pipelines_per_indexer=2` - /// - `desired_num_pipelines=1` - /// => Only one pipeline will run on one indexer. - pub max_num_pipelines_per_indexer: NonZeroUsize, - /// Number of desired indexing pipelines to run on a cluster for the source. - /// This number could not be reach if there is not enough indexers. - /// The value is only used by sources that Quickwit knows how to distribute across - /// pipelines/nodes, that is for Kafka sources only. - /// Example: - /// - `max_num_pipelines_per_indexer=1` - /// - `desired_num_pipelines=2` - /// - 1 indexer - /// => Only one pipeline will start on the sole indexer. - pub desired_num_pipelines: NonZeroUsize, + /// Number of indexing pipelines to run on a cluster for the source. + pub num_pipelines: NonZeroUsize, // Denotes if this source is enabled. pub enabled: bool, pub source_params: SourceParams, - #[serde(skip_serializing_if = "Option::is_none")] - #[serde(rename = "transform")] pub transform_config: Option, // Denotes the input data format. @@ -126,8 +107,7 @@ impl SourceConfig { pub fn cli() -> Self { Self { source_id: CLI_SOURCE_ID.to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).expect("1 should be non-zero"), - desired_num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"), + num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"), enabled: true, source_params: SourceParams::IngestCli, transform_config: None, @@ -139,8 +119,7 @@ impl SourceConfig { pub fn ingest_v2() -> Self { Self { source_id: INGEST_V2_SOURCE_ID.to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).expect("1 should be non-zero"), - desired_num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"), + num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"), enabled: enable_ingest_v2(), source_params: SourceParams::Ingest, transform_config: None, @@ -152,8 +131,7 @@ impl SourceConfig { pub fn ingest_api_default() -> Self { Self { source_id: INGEST_API_SOURCE_ID.to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).expect("1 should be non-zero"), - desired_num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"), + num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"), enabled: true, source_params: SourceParams::IngestApi, transform_config: None, @@ -165,8 +143,7 @@ impl SourceConfig { pub fn for_test(source_id: &str, source_params: SourceParams) -> Self { Self { source_id: source_id.to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).expect("1 should be non-zero"), - desired_num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"), + num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"), enabled: true, source_params, transform_config: None, @@ -179,8 +156,7 @@ impl TestableForRegression for SourceConfig { fn sample_for_regression() -> Self { SourceConfig { source_id: "kafka-source".to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(2).unwrap(), - desired_num_pipelines: NonZeroUsize::new(2).unwrap(), + num_pipelines: NonZeroUsize::new(2).unwrap(), enabled: true, source_params: SourceParams::Kafka(KafkaSourceParams { topic: "kafka-topic".to_string(), @@ -572,8 +548,7 @@ mod tests { load_source_config_from_user_config(config_format, file_content.as_bytes()).unwrap(); let expected_source_config = SourceConfig { source_id: "hdfs-logs-kafka-source".to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(2).unwrap(), - desired_num_pipelines: NonZeroUsize::new(2).unwrap(), + num_pipelines: NonZeroUsize::new(2).unwrap(), enabled: true, source_params: SourceParams::Kafka(KafkaSourceParams { topic: "cloudera-cluster-logs".to_string(), @@ -588,7 +563,7 @@ mod tests { input_format: SourceInputFormat::Json, }; assert_eq!(source_config, expected_source_config); - assert_eq!(source_config.desired_num_pipelines.get(), 2); + assert_eq!(source_config.num_pipelines.get(), 2); } #[test] @@ -669,8 +644,7 @@ mod tests { load_source_config_from_user_config(config_format, file_content.as_bytes()).unwrap(); let expected_source_config = SourceConfig { source_id: "hdfs-logs-kinesis-source".to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).expect("1 should be non-zero"), - desired_num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"), + num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"), enabled: true, source_params: SourceParams::Kinesis(KinesisSourceParams { stream_name: "emr-cluster-logs".to_string(), @@ -684,7 +658,7 @@ mod tests { input_format: SourceInputFormat::Json, }; assert_eq!(source_config, expected_source_config); - assert_eq!(source_config.desired_num_pipelines.get(), 1); + assert_eq!(source_config.num_pipelines.get(), 1); } #[tokio::test] @@ -706,30 +680,29 @@ mod tests { .to_string() .contains("`desired_num_pipelines` must be")); } + // { + // let content = r#" + // { + // "version": "0.7", + // "source_id": "hdfs-logs-void-source", + // "desired_num_pipelines": 1, + // "max_num_pipelines_per_indexer": 0, + // "source_type": "void", + // "params": {} + // } + // "#; + // let error = load_source_config_from_user_config(ConfigFormat::Json, + // content.as_bytes()) .unwrap_err(); + // assert!(error + // .to_string() + // .contains("`max_num_pipelines_per_indexer` must be")); + // } { let content = r#" { - "version": "0.7", - "source_id": "hdfs-logs-void-source", - "desired_num_pipelines": 1, - "max_num_pipelines_per_indexer": 0, - "source_type": "void", - "params": {} - } - "#; - let error = load_source_config_from_user_config(ConfigFormat::Json, content.as_bytes()) - .unwrap_err(); - assert!(error - .to_string() - .contains("`max_num_pipelines_per_indexer` must be")); - } - { - let content = r#" - { - "version": "0.7", + "version": "0.8", "source_id": "hdfs-logs-void-source", - "desired_num_pipelines": 1, - "max_num_pipelines_per_indexer": 2, + "num_pipelines": 2, "source_type": "void", "params": {} } @@ -756,7 +729,7 @@ mod tests { } #[tokio::test] - async fn test_load_valid_distributed_source_config() { + async fn test_load_valid_distributed_source_config_0_7() { { let content = r#" { @@ -773,8 +746,7 @@ mod tests { let source_config = load_source_config_from_user_config(ConfigFormat::Json, content.as_bytes()) .unwrap(); - assert_eq!(source_config.desired_num_pipelines.get(), 3); - assert_eq!(source_config.max_num_pipelines_per_indexer.get(), 3); + assert_eq!(source_config.num_pipelines.get(), 3); } { let content = r#" @@ -793,11 +765,32 @@ mod tests { load_source_config_from_user_config(ConfigFormat::Json, content.as_bytes()) .unwrap_err(); // TODO: uncomment asserts once distributed indexing is activated for pulsar. - // assert_eq!(source_config.desired_num_pipelines(), 3); + // assert_eq!(source_config.num_pipelines(), 3); // assert_eq!(source_config.max_num_pipelines_per_indexer(), 3); } } + #[tokio::test] + async fn test_load_valid_distributed_source_config() { + { + let content = r#" + { + "version": "0.8", + "source_id": "hdfs-logs-kafka-source", + "num_pipelines": 3, + "source_type": "kafka", + "params": { + "topic": "my-topic" + } + } + "#; + let source_config = + load_source_config_from_user_config(ConfigFormat::Json, content.as_bytes()) + .unwrap(); + assert_eq!(source_config.num_pipelines.get(), 3); + } + } + #[test] fn test_file_source_params_serialization() { { @@ -1077,8 +1070,7 @@ mod tests { let source_config: SourceConfig = ConfigFormat::Json.parse(&file_content).unwrap(); let expected_source_config = SourceConfig { source_id: INGEST_API_SOURCE_ID.to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).expect("1 should be non-zero"), - desired_num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"), + num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"), enabled: true, source_params: SourceParams::IngestApi, transform_config: Some(TransformConfig { @@ -1088,7 +1080,7 @@ mod tests { input_format: SourceInputFormat::Json, }; assert_eq!(source_config, expected_source_config); - assert_eq!(source_config.desired_num_pipelines.get(), 1); + assert_eq!(source_config.num_pipelines.get(), 1); } #[test] diff --git a/quickwit/quickwit-config/src/source_config/serialize.rs b/quickwit/quickwit-config/src/source_config/serialize.rs index 3df558eb59d..3aacf5bf77f 100644 --- a/quickwit/quickwit-config/src/source_config/serialize.rs +++ b/quickwit/quickwit-config/src/source_config/serialize.rs @@ -25,7 +25,7 @@ use serde::{Deserialize, Serialize}; use super::{TransformConfig, RESERVED_SOURCE_IDS}; use crate::{validate_identifier, ConfigFormat, SourceConfig, SourceInputFormat, SourceParams}; -type SourceConfigForSerialization = SourceConfigV0_7; +type SourceConfigForSerialization = SourceConfigV0_8; #[derive(Serialize, Deserialize, utoipa::ToSchema)] #[serde(deny_unknown_fields)] @@ -37,12 +37,15 @@ pub enum VersionedSourceConfig { #[serde(alias = "0.5")] #[serde(alias = "0.4")] V0_7(SourceConfigV0_7), + #[serde(rename = "0.8")] + V0_8(SourceConfigV0_8), } impl From for SourceConfigForSerialization { fn from(versioned_source_config: VersionedSourceConfig) -> Self { match versioned_source_config { - VersionedSourceConfig::V0_7(v0_6) => v0_6, + VersionedSourceConfig::V0_7(v0_7) => v0_7.into(), + VersionedSourceConfig::V0_8(v0_8) => v0_8, } } } @@ -73,12 +76,8 @@ impl SourceConfigForSerialization { if !RESERVED_SOURCE_IDS.contains(&self.source_id.as_str()) { validate_identifier("Source ID", &self.source_id)?; } - let desired_num_pipelines = NonZeroUsize::new(self.desired_num_pipelines) + let num_pipelines = NonZeroUsize::new(self.num_pipelines) .ok_or_else(|| anyhow::anyhow!("`desired_num_pipelines` must be strictly positive"))?; - let max_num_pipelines_per_indexer = NonZeroUsize::new(self.max_num_pipelines_per_indexer) - .ok_or_else(|| { - anyhow::anyhow!("`max_num_pipelines_per_indexer` must be strictly positive") - })?; match &self.source_params { // We want to forbid source_config with no filepath SourceParams::File(file_params) => { @@ -102,7 +101,7 @@ impl SourceConfigForSerialization { match &self.source_params { SourceParams::PubSub(_) | SourceParams::Kafka(_) => {} _ => { - if self.desired_num_pipelines > 1 || self.max_num_pipelines_per_indexer > 1 { + if self.num_pipelines > 1 { bail!("Quickwit currently supports multiple pipelines only for GCP PubSub or Kafka sources. open an issue https://github.com/quickwit-oss/quickwit/issues if you need the feature for other source types"); } } @@ -120,8 +119,7 @@ impl SourceConfigForSerialization { Ok(SourceConfig { source_id: self.source_id, - max_num_pipelines_per_indexer, - desired_num_pipelines, + num_pipelines, enabled: self.enabled, source_params: self.source_params, transform_config: self.transform, @@ -130,12 +128,11 @@ impl SourceConfigForSerialization { } } -impl From for SourceConfigV0_7 { +impl From for SourceConfigV0_8 { fn from(source_config: SourceConfig) -> Self { - SourceConfigV0_7 { + SourceConfigV0_8 { source_id: source_config.source_id, - max_num_pipelines_per_indexer: source_config.max_num_pipelines_per_indexer.get(), - desired_num_pipelines: source_config.desired_num_pipelines.get(), + num_pipelines: source_config.num_pipelines.get(), enabled: source_config.enabled, source_params: source_config.source_params, transform: source_config.transform_config, @@ -146,7 +143,7 @@ impl From for SourceConfigV0_7 { impl From for VersionedSourceConfig { fn from(source_config: SourceConfig) -> Self { - VersionedSourceConfig::V0_7(source_config.into()) + VersionedSourceConfig::V0_8(source_config.into()) } } @@ -154,7 +151,7 @@ impl TryFrom for SourceConfig { type Error = anyhow::Error; fn try_from(versioned_source_config: VersionedSourceConfig) -> anyhow::Result { - let v1: SourceConfigV0_7 = versioned_source_config.into(); + let v1: SourceConfigV0_8 = versioned_source_config.into(); v1.validate_and_build() } } @@ -163,7 +160,7 @@ fn default_max_num_pipelines_per_indexer() -> usize { 1 } -fn default_desired_num_pipelines() -> usize { +fn default_num_pipelines() -> usize { 1 } @@ -172,6 +169,7 @@ fn default_source_enabled() -> bool { } #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)] +#[serde(deny_unknown_fields)] pub struct SourceConfigV0_7 { pub source_id: String, @@ -181,7 +179,7 @@ pub struct SourceConfigV0_7 { )] pub max_num_pipelines_per_indexer: usize, - #[serde(default = "default_desired_num_pipelines")] + #[serde(default = "default_num_pipelines")] pub desired_num_pipelines: usize, // Denotes if this source is enabled. @@ -198,3 +196,48 @@ pub struct SourceConfigV0_7 { #[serde(default)] pub input_format: SourceInputFormat, } + +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)] +#[serde(deny_unknown_fields)] +pub struct SourceConfigV0_8 { + pub source_id: String, + + #[serde(default = "default_num_pipelines")] + pub num_pipelines: usize, + + // Denotes if this source is enabled. + #[serde(default = "default_source_enabled")] + pub enabled: bool, + + #[serde(flatten)] + pub source_params: SourceParams, + + #[serde(skip_serializing_if = "Option::is_none")] + pub transform: Option, + + // Denotes the input data format. + #[serde(default)] + pub input_format: SourceInputFormat, +} + +impl From for SourceConfigV0_8 { + fn from(source_config_v0_7: SourceConfigV0_7) -> Self { + let SourceConfigV0_7 { + source_id, + max_num_pipelines_per_indexer: _, + desired_num_pipelines, + enabled, + source_params, + transform, + input_format, + } = source_config_v0_7; + SourceConfigV0_8 { + source_id, + num_pipelines: desired_num_pipelines, + enabled, + source_params, + transform, + input_format, + } + } +} diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs index 0f7539009e5..be8379258a7 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs @@ -172,7 +172,7 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec { sources.push(SourceToSchedule { source_uid, source_type: SourceToScheduleType::NonSharded { - num_pipelines: source_config.desired_num_pipelines.get() as u32, + num_pipelines: source_config.num_pipelines.get() as u32, // FIXME load_per_pipeline: NonZeroU32::new(PIPELINE_FULL_CAPACITY.cpu_millis()) .unwrap(), @@ -681,8 +681,7 @@ mod tests { &index_uid, SourceConfig { source_id: "source_disabled".to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(3).unwrap(), - desired_num_pipelines: NonZeroUsize::new(3).unwrap(), + num_pipelines: NonZeroUsize::new(3).unwrap(), enabled: false, source_params: SourceParams::Kafka(kafka_source_params.clone()), transform_config: None, @@ -695,8 +694,7 @@ mod tests { &index_uid, SourceConfig { source_id: "source_enabled".to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(2).unwrap(), - desired_num_pipelines: NonZeroUsize::new(2).unwrap(), + num_pipelines: NonZeroUsize::new(2).unwrap(), enabled: true, source_params: SourceParams::Kafka(kafka_source_params.clone()), transform_config: None, @@ -709,8 +707,7 @@ mod tests { &index_uid, SourceConfig { source_id: "ingest_v1".to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(2).unwrap(), - desired_num_pipelines: NonZeroUsize::new(2).unwrap(), + num_pipelines: NonZeroUsize::new(2).unwrap(), enabled: true, // ingest v1 source_params: SourceParams::IngestApi, @@ -724,8 +721,7 @@ mod tests { &index_uid, SourceConfig { source_id: "ingest_v2".to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(2).unwrap(), - desired_num_pipelines: NonZeroUsize::new(2).unwrap(), + num_pipelines: NonZeroUsize::new(2).unwrap(), enabled: true, // ingest v2 source_params: SourceParams::Ingest, @@ -740,8 +736,7 @@ mod tests { &index_uid, SourceConfig { source_id: "ingest_v2_without_shard".to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(2).unwrap(), - desired_num_pipelines: NonZeroUsize::new(2).unwrap(), + num_pipelines: NonZeroUsize::new(2).unwrap(), enabled: true, // ingest v2 source_params: SourceParams::Ingest, @@ -755,8 +750,7 @@ mod tests { &index_uid, SourceConfig { source_id: "ingest_cli".to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(2).unwrap(), - desired_num_pipelines: NonZeroUsize::new(2).unwrap(), + num_pipelines: NonZeroUsize::new(2).unwrap(), enabled: true, // ingest v1 source_params: SourceParams::IngestCli, @@ -857,13 +851,12 @@ mod tests { prop_compose! { fn gen_kafka_source() - (index_idx in 0usize..100usize, desired_num_pipelines in 1usize..51usize, max_num_pipelines_per_indexer in 1usize..5usize) -> (IndexUid, SourceConfig) { + (index_idx in 0usize..100usize, num_pipelines in 1usize..51usize) -> (IndexUid, SourceConfig) { let index_uid = IndexUid::from_parts(&format!("index-id-{index_idx}"), 0 /* this is the index uid */); let source_id = quickwit_common::rand::append_random_suffix("kafka-source"); (index_uid, SourceConfig { source_id, - desired_num_pipelines: NonZeroUsize::new(desired_num_pipelines).unwrap(), - max_num_pipelines_per_indexer: NonZeroUsize::new(max_num_pipelines_per_indexer).unwrap(), + num_pipelines: NonZeroUsize::new(num_pipelines).unwrap(), enabled: true, source_params: kafka_source_params_for_test(), transform_config: None, diff --git a/quickwit/quickwit-control-plane/src/tests.rs b/quickwit/quickwit-control-plane/src/tests.rs index 830bb72c711..0375774d765 100644 --- a/quickwit/quickwit-control-plane/src/tests.rs +++ b/quickwit/quickwit-control-plane/src/tests.rs @@ -43,18 +43,12 @@ use crate::control_plane::{ControlPlane, CONTROL_PLAN_LOOP_INTERVAL}; use crate::indexing_scheduler::MIN_DURATION_BETWEEN_SCHEDULING; use crate::IndexerNodeInfo; -fn index_metadata_for_test( - index_id: &str, - source_id: &str, - desired_num_pipelines: usize, - max_num_pipelines_per_indexer: usize, -) -> IndexMetadata { +fn index_metadata_for_test(index_id: &str, source_id: &str, num_pipelines: usize) -> IndexMetadata { let mut index_metadata = IndexMetadata::for_test(index_id, "ram://indexes/test-index"); let source_config = SourceConfig { enabled: true, source_id: source_id.to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(max_num_pipelines_per_indexer).unwrap(), - desired_num_pipelines: NonZeroUsize::new(desired_num_pipelines).unwrap(), + num_pipelines: NonZeroUsize::new(num_pipelines).unwrap(), source_params: SourceParams::Kafka(KafkaSourceParams { topic: "topic".to_string(), client_log_level: None, @@ -115,8 +109,8 @@ async fn start_control_plane( let source_1 = "source-1"; let index_2 = "test-indexing-plan-2"; let source_2 = "source-2"; - let index_metadata_1 = index_metadata_for_test(index_1, source_1, 2, 2); - let mut index_metadata_2 = index_metadata_for_test(index_2, source_2, 1, 1); + let index_metadata_1 = index_metadata_for_test(index_1, source_1, 2); + let mut index_metadata_2 = index_metadata_for_test(index_2, source_2, 1); index_metadata_2.create_timestamp = index_metadata_1.create_timestamp + 1; let mut metastore = MetastoreServiceClient::mock(); metastore.expect_list_indexes_metadata().returning( diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index ca3c911bb9e..23766d10ba7 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -702,8 +702,7 @@ mod tests { }; let source_config = SourceConfig { source_id: "test-source".to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::file(PathBuf::from(test_file)), transform_config: None, @@ -810,8 +809,7 @@ mod tests { }; let source_config = SourceConfig { source_id: "test-source".to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::file(PathBuf::from(test_file)), transform_config: None, @@ -887,8 +885,7 @@ mod tests { }; let source_config = SourceConfig { source_id: "test-source".to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::Void(VoidSourceParams), transform_config: None, @@ -1006,8 +1003,7 @@ mod tests { }; let source_config = SourceConfig { source_id: "test-source".to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::file(PathBuf::from(test_file)), transform_config: None, diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index b9ca436dc3d..0675115b86b 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -964,8 +964,7 @@ mod tests { // Test `spawn_pipeline`. let source_config_0 = SourceConfig { source_id: "test-indexing-service--source-0".to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::void(), transform_config: None, @@ -1052,8 +1051,7 @@ mod tests { // Test `supervise_pipelines` let source_config = SourceConfig { source_id: "test-indexing-service--source".to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::Vec(VecSourceParams { docs: Vec::new(), @@ -1123,8 +1121,7 @@ mod tests { // Test `apply plan`. let source_config_1 = SourceConfig { source_id: "test-indexing-service--source-1".to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::void(), transform_config: None, @@ -1173,8 +1170,7 @@ mod tests { }; let source_config_2 = SourceConfig { source_id: "test-indexing-service--source-2".to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(2).unwrap(), - desired_num_pipelines: NonZeroUsize::new(2).unwrap(), + num_pipelines: NonZeroUsize::new(2).unwrap(), enabled: true, source_params: SourceParams::Kafka(kafka_params), transform_config: None, @@ -1330,8 +1326,7 @@ mod tests { let source_config = SourceConfig { source_id: "test-indexing-service--source".to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::void(), transform_config: None, @@ -1460,8 +1455,7 @@ mod tests { let mut index_metadata = IndexMetadata::for_test(&index_id, &index_uri); let source_config = SourceConfig { source_id: "test-indexing-service--source".to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::void(), transform_config: None, diff --git a/quickwit/quickwit-indexing/src/source/file_source.rs b/quickwit/quickwit-indexing/src/source/file_source.rs index 2c25eec96f8..b84153aa355 100644 --- a/quickwit/quickwit-indexing/src/source/file_source.rs +++ b/quickwit/quickwit-indexing/src/source/file_source.rs @@ -268,8 +268,7 @@ mod tests { }; let source_config = SourceConfig { source_id: "test-file-source".to_string(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::File(params.clone()), transform_config: None, @@ -351,8 +350,7 @@ mod tests { let source_config = SourceConfig { source_id: "test-file-source".to_string(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::File(params.clone()), transform_config: None, @@ -460,8 +458,7 @@ mod tests { let source_config = SourceConfig { source_id: "test-file-source".to_string(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::File(params.clone()), transform_config: None, diff --git a/quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs b/quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs index 70263b67739..bf0ccb72702 100644 --- a/quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs +++ b/quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs @@ -314,8 +314,7 @@ mod gcp_pubsub_emulator_tests { let source_id = append_random_suffix("test-gcp-pubsub-source--source"); SourceConfig { source_id, - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::PubSub(PubSubSourceParams { project_id: Some(GCP_TEST_PROJECT.to_string()), diff --git a/quickwit/quickwit-indexing/src/source/ingest_api_source.rs b/quickwit/quickwit-indexing/src/source/ingest_api_source.rs index ceb22c78cdf..6abdd9c4a73 100644 --- a/quickwit/quickwit-indexing/src/source/ingest_api_source.rs +++ b/quickwit/quickwit-indexing/src/source/ingest_api_source.rs @@ -283,8 +283,7 @@ mod tests { fn make_source_config() -> SourceConfig { SourceConfig { source_id: INGEST_API_SOURCE_ID.to_string(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::IngestApi, transform_config: None, diff --git a/quickwit/quickwit-indexing/src/source/kafka_source.rs b/quickwit/quickwit-indexing/src/source/kafka_source.rs index ff435041b1f..b1181da8b6f 100644 --- a/quickwit/quickwit-indexing/src/source/kafka_source.rs +++ b/quickwit/quickwit-indexing/src/source/kafka_source.rs @@ -897,8 +897,7 @@ mod kafka_broker_tests { let source_id = append_random_suffix("test-kafka-source--source"); let source_config = SourceConfig { source_id: source_id.clone(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::Kafka(KafkaSourceParams { topic: topic.to_string(), diff --git a/quickwit/quickwit-indexing/src/source/mod.rs b/quickwit/quickwit-indexing/src/source/mod.rs index b55ad9c8b99..819f6223dfc 100644 --- a/quickwit/quickwit-indexing/src/source/mod.rs +++ b/quickwit/quickwit-indexing/src/source/mod.rs @@ -549,8 +549,7 @@ mod tests { { let source_config = SourceConfig { source_id: "void".to_string(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::void(), transform_config: None, @@ -561,8 +560,7 @@ mod tests { { let source_config = SourceConfig { source_id: "vec".to_string(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::Vec(VecSourceParams::default()), transform_config: None, @@ -573,8 +571,7 @@ mod tests { { let source_config = SourceConfig { source_id: "file".to_string(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::file("file-does-not-exist.json"), transform_config: None, @@ -589,8 +586,7 @@ mod tests { { let source_config = SourceConfig { source_id: "file".to_string(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::file("data/test_corpus.json"), transform_config: None, diff --git a/quickwit/quickwit-indexing/src/source/pulsar_source.rs b/quickwit/quickwit-indexing/src/source/pulsar_source.rs index 6c2c6e1cfc5..0dc6f38b7fc 100644 --- a/quickwit/quickwit-indexing/src/source/pulsar_source.rs +++ b/quickwit/quickwit-indexing/src/source/pulsar_source.rs @@ -546,8 +546,7 @@ mod pulsar_broker_tests { let source_id = append_random_suffix("test-pulsar-source--source"); let source_config = SourceConfig { source_id: source_id.clone(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::Pulsar(PulsarSourceParams { topics: topics.into_iter().map(|v| v.as_ref().to_string()).collect(), diff --git a/quickwit/quickwit-indexing/src/source/source_factory.rs b/quickwit/quickwit-indexing/src/source/source_factory.rs index 76b372a4158..4bc6a29ed31 100644 --- a/quickwit/quickwit-indexing/src/source/source_factory.rs +++ b/quickwit/quickwit-indexing/src/source/source_factory.rs @@ -134,8 +134,7 @@ mod tests { let source_loader = quickwit_supported_sources(); let source_config = SourceConfig { source_id: "test-source".to_string(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::void(), transform_config: None, diff --git a/quickwit/quickwit-indexing/src/source/vec_source.rs b/quickwit/quickwit-indexing/src/source/vec_source.rs index 06b0b84883c..efbf3c13162 100644 --- a/quickwit/quickwit-indexing/src/source/vec_source.rs +++ b/quickwit/quickwit-indexing/src/source/vec_source.rs @@ -163,8 +163,7 @@ mod tests { }; let source_config = SourceConfig { source_id: "test-vec-source".to_string(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::Vec(params.clone()), transform_config: None, @@ -224,8 +223,7 @@ mod tests { let source_config = SourceConfig { source_id: "test-vec-source".to_string(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::Vec(params.clone()), transform_config: None, diff --git a/quickwit/quickwit-indexing/src/source/void_source.rs b/quickwit/quickwit-indexing/src/source/void_source.rs index 7a8db8d9000..2166986b183 100644 --- a/quickwit/quickwit-indexing/src/source/void_source.rs +++ b/quickwit/quickwit-indexing/src/source/void_source.rs @@ -88,8 +88,7 @@ mod tests { async fn test_void_source_loading() { let source_config = SourceConfig { source_id: "test-void-source".to_string(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::void(), transform_config: None, @@ -114,8 +113,7 @@ mod tests { let universe = Universe::with_accelerated_time(); let source_config = SourceConfig { source_id: "test-void-source".to_string(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::void(), transform_config: None, diff --git a/quickwit/quickwit-indexing/src/test_utils.rs b/quickwit/quickwit-indexing/src/test_utils.rs index 5953e1e8674..4dbe288fd9c 100644 --- a/quickwit/quickwit-indexing/src/test_utils.rs +++ b/quickwit/quickwit-indexing/src/test_utils.rs @@ -158,8 +158,7 @@ impl TestSandbox { let add_docs_id = self.add_docs_id.fetch_add(1, Ordering::SeqCst); let source_config = SourceConfig { source_id: self.index_uid.index_id.to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), - desired_num_pipelines: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::Vec(VecSourceParams { docs, diff --git a/quickwit/quickwit-lambda/src/indexer/ingest.rs b/quickwit/quickwit-lambda/src/indexer/ingest.rs index b449c81b84f..9237b523777 100644 --- a/quickwit/quickwit-lambda/src/indexer/ingest.rs +++ b/quickwit/quickwit-lambda/src/indexer/ingest.rs @@ -132,8 +132,7 @@ pub async fn ingest(args: IngestArgs) -> anyhow::Result { .map(|vrl_script| TransformConfig::new(vrl_script, None)); let source_config = SourceConfig { source_id: CLI_SOURCE_ID.to_string(), - max_num_pipelines_per_indexer: NonZeroUsize::new(1).expect("1 is always non-zero."), - desired_num_pipelines: NonZeroUsize::new(1).expect("1 is always non-zero."), + num_pipelines: NonZeroUsize::new(1).expect("1 is always non-zero."), enabled: true, source_params, transform_config, diff --git a/quickwit/quickwit-metastore/src/tests/source.rs b/quickwit/quickwit-metastore/src/tests/source.rs index 30a42d8d8dc..ba46cbad37e 100644 --- a/quickwit/quickwit-metastore/src/tests/source.rs +++ b/quickwit/quickwit-metastore/src/tests/source.rs @@ -55,8 +55,7 @@ pub async fn test_metastore_add_source