Skip to content

Commit

Permalink
Introducing pipeline uid in indexing tasks/ indexing pipelines.
Browse files Browse the repository at this point in the history
Introducing pipeline uid in indexing tasks/ indexing pipelines.

The scheduling algorithm is for the most part unchanged.
Only the allocation of shard to pipelines is modified.

Currently the logic yields similar results as before. In particular, the scheduling algorithm
may in some case, move shard from one pipeline to another.
In the future, we will prevent this behavior and instead close/open new shards.

Closes #4191
  • Loading branch information
fulmicoton authored Nov 29, 2023
1 parent 340522d commit f5f9661
Show file tree
Hide file tree
Showing 30 changed files with 800 additions and 750 deletions.
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use quickwit_metastore::IndexMetadataResponseExt;
use quickwit_proto::indexing::CpuCapacity;
use quickwit_proto::metastore::{IndexMetadataRequest, MetastoreService, MetastoreServiceClient};
use quickwit_proto::search::{CountHits, SearchResponse};
use quickwit_proto::types::NodeId;
use quickwit_proto::types::{NodeId, PipelineUid};
use quickwit_search::{single_node_search, SearchResponseRest};
use quickwit_serve::{
search_request_from_api_request, BodyFormat, SearchRequestQueryString, SortBy,
Expand Down Expand Up @@ -473,7 +473,7 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
.ask_for_res(SpawnPipeline {
index_id: args.index_id.clone(),
source_config,
pipeline_ord: 0,
pipeline_uid: PipelineUid::from_u128(0u128),
})
.await?;
let merge_pipeline_handle = indexing_server_mailbox
Expand Down Expand Up @@ -613,7 +613,7 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> {
transform_config: None,
input_format: SourceInputFormat::Json,
},
pipeline_ord: 0,
pipeline_uid: PipelineUid::from_u128(0u128),
})
.await?;
let pipeline_handle: ActorHandle<MergePipeline> = indexing_service_mailbox
Expand Down
Loading

0 comments on commit f5f9661

Please sign in to comment.