Skip to content

Commit

Permalink
Deprecate max pipeline per indexer (#4754)
Browse files Browse the repository at this point in the history
* Changed the default of `default_merge_concurrency` to `2 * num_cpus / 3`

We have a observed at least one case (rather extreme however) where the
default settings did not make it possible for merging to keep up with
indexing.

* Added documentation for indexer's cpu_capacity

Closes #4716

* This PR deprecates the max_num_pipeline_per_index in Kafka's source
config.

It also renames desired_num_pipelines into num_pipelines.

Closes #4624
  • Loading branch information
fulmicoton authored Mar 18, 2024
1 parent fc8ac18 commit 0868894
Show file tree
Hide file tree
Showing 32 changed files with 198 additions and 247 deletions.
5 changes: 2 additions & 3 deletions config/tutorials/gh-archive/kafka-source.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
version: 0.7
version: 0.8
source_id: kafka-source
source_type: kafka
max_num_pipelines_per_indexer: 1
desired_num_pipelines: 2
num_pipelines: 2
params:
topic: gh-archive
client_params:
Expand Down
40 changes: 7 additions & 33 deletions docs/configuration/source-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,10 @@ Short max poll interval durations may cause a source to crash when back pressure

```bash
cat << EOF > source-config.yaml
version: 0.7
version: 0.8
source_id: my-kafka-source
source_type: kafka
max_num_pipelines_per_indexer: 1
desired_num_pipelines: 2
num_pipelines: 2
params:
topic: my-topic
client_params:
Expand Down Expand Up @@ -164,38 +163,13 @@ EOF
./quickwit source create --index my-index --source-config source-config.yaml
```

## Maximum number of pipelines per indexer

The `max_num_pipelines_per_indexer` parameter is only available for sources that can be distributed: Kafka, GCP PubSub and Pulsar(coming soon).

The maximum number of indexing pipelines defines the limit of pipelines spawned for the source on a given indexer.
This maximum can be reached only if there are enough `desired_num_pipelines` to run.

:::note

With the following parameters, only one pipeline will run on one indexer.

- `max_num_pipelines_per_indexer=2`
- `desired_num_pipelines=1`

:::

## Desired number of pipelines

`desired_num_pipelines` parameter is only available for sources that can be distributed: Kafka, GCP PubSub and Pulsar (coming soon).

The desired number of indexing pipelines defines the number of pipelines to run on a cluster for the source. It is a "desired"
number as it cannot be reach it there is not enough indexers in
the cluster.

:::note

With the following parameters, only one pipeline will start on the sole indexer.
## Number of pipelines

- `max_num_pipelines_per_indexer=1`
- `desired_num_pipelines=2`
`num_pipelines` parameter is only available for sources that can be distributed: Kafka, GCP PubSub and Pulsar (coming soon).

:::
It defines the number of pipelines to run on a cluster for the source. The actual placement of these pipelines on the different indexer
will be decided by the control plane. Note that distributions of a source like Kafka is done by assigning a set of partitions to different pipelines.
As a result, it is recommended to make sure the number of partitions is a multiple of the number of `num_pipelines`.

## Transform parameters

Expand Down
5 changes: 2 additions & 3 deletions docs/ingest-data/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,10 @@ This tutorial assumes that the Kafka cluster is available locally on the default
#
# Kafka source config file.
#
version: 0.7
version: 0.8
source_id: kafka-source
source_type: kafka
max_num_pipelines_per_indexer: 1
desired_num_pipelines: 2
num_pipelines: 2
params:
topic: gh-archive
client_params:
Expand Down
9 changes: 3 additions & 6 deletions quickwit/quickwit-cli/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -741,8 +741,7 @@ mod tests {
.collect();
let sources = vec![SourceConfig {
source_id: "foo-source".to_string(),
desired_num_pipelines: NonZeroUsize::new(1).unwrap(),
max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(),
num_pipelines: NonZeroUsize::new(1).unwrap(),
enabled: true,
source_params: SourceParams::file("path/to/file"),
transform_config: None,
Expand Down Expand Up @@ -802,17 +801,15 @@ mod tests {
let sources = [
SourceConfig {
source_id: "foo-source".to_string(),
desired_num_pipelines: NonZeroUsize::new(1).unwrap(),
max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(),
num_pipelines: NonZeroUsize::new(1).unwrap(),
enabled: true,
source_params: SourceParams::stdin(),
transform_config: None,
input_format: SourceInputFormat::Json,
},
SourceConfig {
source_id: "bar-source".to_string(),
desired_num_pipelines: NonZeroUsize::new(1).unwrap(),
max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(),
num_pipelines: NonZeroUsize::new(1).unwrap(),
enabled: true,
source_params: SourceParams::stdin(),
transform_config: None,
Expand Down
6 changes: 2 additions & 4 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,8 +422,7 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
.map(|vrl_script| TransformConfig::new(vrl_script, None));
let source_config = SourceConfig {
source_id: CLI_SOURCE_ID.to_string(),
max_num_pipelines_per_indexer: NonZeroUsize::new(1).expect("1 is always non-zero."),
desired_num_pipelines: NonZeroUsize::new(1).expect("1 is always non-zero."),
num_pipelines: NonZeroUsize::new(1).expect("1 is always non-zero."),
enabled: true,
source_params,
transform_config,
Expand Down Expand Up @@ -608,8 +607,7 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> {
index_id: args.index_id,
source_config: SourceConfig {
source_id: args.source_id,
max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(),
desired_num_pipelines: NonZeroUsize::new(1).unwrap(),
num_pipelines: NonZeroUsize::new(1).unwrap(),
enabled: true,
source_params: SourceParams::Vec(VecSourceParams::default()),
transform_config: None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
"version": "0.7",
"source_id": "hdfs-logs-kafka-source",
"desired_num_pipelines": 2,
"max_num_pipelines_per_indexer": 2,
"source_type": "kafka",
"params": {
"topic": "cloudera-cluster-logs",
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub use crate::node_config::{
enable_ingest_v2, IndexerConfig, IngestApiConfig, JaegerConfig, NodeConfig, SearcherConfig,
SplitCacheLimits, DEFAULT_QW_CONFIG_PATH,
};
use crate::source_config::serialize::{SourceConfigV0_7, VersionedSourceConfig};
use crate::source_config::serialize::{SourceConfigV0_7, SourceConfigV0_8, VersionedSourceConfig};
pub use crate::storage_config::{
AzureStorageConfig, FileStorageConfig, GoogleCloudStorageConfig, RamStorageConfig,
S3StorageConfig, StorageBackend, StorageBackendFlavor, StorageConfig, StorageConfigs,
Expand All @@ -88,6 +88,7 @@ pub use crate::storage_config::{
DocMapping,
VersionedSourceConfig,
SourceConfigV0_7,
SourceConfigV0_8,
VersionedIndexConfig,
IndexConfigV0_7,
VersionedIndexTemplate,
Expand Down
Loading

0 comments on commit 0868894

Please sign in to comment.