From 9ff8411a85807ea3c406651e1ba34efd2cd80a7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Massot?= Date: Wed, 12 Jul 2023 16:23:38 +0900 Subject: [PATCH] Add http source. --- quickwit/Cargo.lock | 18 +- quickwit/Cargo.toml | 2 + quickwit/quickwit-common/src/uri.rs | 6 + quickwit/quickwit-config/src/lib.rs | 9 +- .../quickwit-config/src/source_config/mod.rs | 44 ++ .../src/source_config/serialize.rs | 3 +- quickwit/quickwit-indexing/Cargo.toml | 4 + .../src/actors/indexing_pipeline.rs | 2 +- .../src/source/file_source.rs | 25 +- .../src/source/http_source.rs | 512 ++++++++++++++++++ .../src/source/ingest_api_source.rs | 52 +- .../src/source/kafka_source.rs | 73 ++- quickwit/quickwit-indexing/src/source/mod.rs | 11 +- .../src/source/pulsar_source.rs | 26 +- .../src/source/source_factory.rs | 9 +- .../src/source/vec_source.rs | 17 +- .../src/source/void_source.rs | 17 +- 17 files changed, 775 insertions(+), 55 deletions(-) create mode 100644 quickwit/quickwit-indexing/src/source/http_source.rs diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 02ca64566c8..28715531047 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -262,6 +262,19 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-compression" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62b74f44609f0f91493e3082d3734d98497e094777144380ea4db9f9905dd5b6" +dependencies = [ + "flate2", + "futures-core", + "futures-io", + "memchr", + "pin-project-lite", +] + [[package]] name = "async-speed-limit" version = "0.4.0" @@ -4722,6 +4735,7 @@ version = "0.6.2" dependencies = [ "anyhow", "arc-swap", + "async-compression 0.4.1", "async-trait", "aws-config", "aws-sdk-kinesis", @@ -4735,6 +4749,7 @@ dependencies = [ "flume", "fnv", "futures", + "futures-util", "itertools", "libz-sys", "mockall", @@ -4758,6 +4773,7 @@ dependencies = [ "quickwit-storage", "rand 0.8.5", "rdkafka", + "regex", "reqwest", "serde", "serde_json", @@ -7125,7 +7141,7 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d1d42a9b3f3ec46ba828e8d376aec14592ea199f70a06a548587ecd1c4ab658" dependencies = [ - "async-compression", + "async-compression 0.3.15", "bitflags", "bytes", "futures-core", diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index d410d9759b0..22373d1249a 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -44,6 +44,7 @@ arc-swap = "1.6" assert-json-diff = "2" async-speed-limit = "0.4" async-trait = "0.1" +async-compression = { version = "0.4.1", features = ["gzip", "futures-io"] } atty = "0.2" backoff = { version = "0.4", features = ["tokio"] } base64 = "0.21" @@ -133,6 +134,7 @@ regex = "1.9.1" reqwest = { version = "0.11", default-features = false, features = [ "json", "rustls-tls", + "stream", ] } rust-embed = "6.8.1" serde = { version = "1.0", features = ["derive", "rc"] } diff --git a/quickwit/quickwit-common/src/uri.rs b/quickwit/quickwit-common/src/uri.rs index a6d52bd8ea9..04fd0cfcb0d 100644 --- a/quickwit/quickwit-common/src/uri.rs +++ b/quickwit/quickwit-common/src/uri.rs @@ -37,6 +37,8 @@ pub enum Protocol { Azure, File, Grpc, + Https, + Http, PostgreSQL, Ram, S3, @@ -48,6 +50,8 @@ impl Protocol { Protocol::Azure => "azure", Protocol::File => "file", Protocol::Grpc => "grpc", + Protocol::Https => "https", + Protocol::Http => "http", Protocol::PostgreSQL => "postgresql", Protocol::Ram => "ram", Protocol::S3 => "s3", @@ -105,6 +109,8 @@ impl FromStr for Protocol { "azure" => Ok(Protocol::Azure), "file" => Ok(Protocol::File), "grpc" => Ok(Protocol::Grpc), + "https" => Ok(Protocol::Https), + "http" => Ok(Protocol::Http), "pg" | "postgres" | "postgresql" => Ok(Protocol::PostgreSQL), "ram" => Ok(Protocol::Ram), "s3" => Ok(Protocol::S3), diff --git a/quickwit/quickwit-config/src/lib.rs b/quickwit/quickwit-config/src/lib.rs index 303dab7b5da..84bb3d4bff3 100644 --- a/quickwit/quickwit-config/src/lib.rs +++ b/quickwit/quickwit-config/src/lib.rs @@ -50,10 +50,10 @@ use serde::de::DeserializeOwned; use serde::Serialize; use serde_json::Value as JsonValue; pub use source_config::{ - load_source_config_from_user_config, FileSourceParams, KafkaSourceParams, KinesisSourceParams, - PulsarSourceAuth, PulsarSourceParams, RegionOrEndpoint, SourceConfig, SourceInputFormat, - SourceParams, TransformConfig, VecSourceParams, VoidSourceParams, CLI_INGEST_SOURCE_ID, - INGEST_API_SOURCE_ID, + load_source_config_from_user_config, FileSourceParams, HttpSourceParams, KafkaSourceParams, + KinesisSourceParams, PulsarSourceAuth, PulsarSourceParams, RegionOrEndpoint, SourceConfig, + SourceInputFormat, SourceParams, TransformConfig, VecSourceParams, VoidSourceParams, + CLI_INGEST_SOURCE_ID, INGEST_API_SOURCE_ID, }; use tracing::warn; @@ -88,6 +88,7 @@ pub use crate::storage_config::{ SourceInputFormat, SourceParams, FileSourceParams, + HttpSourceParams, KafkaSourceParams, KinesisSourceParams, PulsarSourceParams, diff --git a/quickwit/quickwit-config/src/source_config/mod.rs b/quickwit/quickwit-config/src/source_config/mod.rs index b51d7fa4d92..0e66e7b9338 100644 --- a/quickwit/quickwit-config/src/source_config/mod.rs +++ b/quickwit/quickwit-config/src/source_config/mod.rs @@ -89,6 +89,7 @@ impl SourceConfig { pub fn source_type(&self) -> &str { match self.source_params { SourceParams::File(_) => "file", + SourceParams::Http(_) => "http", SourceParams::Kafka(_) => "kafka", SourceParams::Kinesis(_) => "kinesis", SourceParams::Vec(_) => "vec", @@ -103,6 +104,7 @@ impl SourceConfig { pub fn params(&self) -> JsonValue { match &self.source_params { SourceParams::File(params) => serde_json::to_value(params), + SourceParams::Http(params) => serde_json::to_value(params), SourceParams::Kafka(params) => serde_json::to_value(params), SourceParams::Kinesis(params) => serde_json::to_value(params), SourceParams::Vec(params) => serde_json::to_value(params), @@ -206,6 +208,8 @@ impl FromStr for SourceInputFormat { pub enum SourceParams { #[serde(rename = "file")] File(FileSourceParams), + #[serde(rename = "http")] + Http(HttpSourceParams), #[serde(rename = "kafka")] Kafka(KafkaSourceParams), #[serde(rename = "kinesis")] @@ -476,6 +480,34 @@ impl TransformConfig { } } +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)] +#[serde(deny_unknown_fields)] +pub struct HttpSourceParams { + #[schema(value_type = Vec)] + #[serde(default)] + pub uris: Vec, + #[schema(value_type = Option)] + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + pub uri_pattern: Option, +} + +impl HttpSourceParams { + pub fn from_pattern(uri: Uri) -> Self { + Self { + uris: vec![], + uri_pattern: Some(uri), + } + } + + pub fn from_list(uris: Vec) -> Self { + Self { + uris, + uri_pattern: None, + } + } +} + #[cfg(test)] mod tests { use std::str::FromStr; @@ -1136,4 +1168,16 @@ mod tests { .unwrap(); assert_eq!(source_config.input_format, SourceInputFormat::PlainText); } + + #[tokio::test] + async fn test_http_source_config() { + let file_content = r#"{"uris":["http://localhost:9000/file.json"], "uri_pattern": "http://localhost:9000/a.json"}"#; + assert_eq!( + serde_json::from_str::(file_content).unwrap(), + HttpSourceParams { + uris: vec![Uri::from_well_formed("http://localhost:9000/file.json")], + uri_pattern: Some(Uri::from_well_formed("http://localhost:9000/a.json")), + } + ); + } } diff --git a/quickwit/quickwit-config/src/source_config/serialize.rs b/quickwit/quickwit-config/src/source_config/serialize.rs index 9e86b7a7b75..20e355fd6d8 100644 --- a/quickwit/quickwit-config/src/source_config/serialize.rs +++ b/quickwit/quickwit-config/src/source_config/serialize.rs @@ -91,6 +91,7 @@ impl SourceConfigForSerialization { ) } } + SourceParams::Http(_) => {} SourceParams::Kafka(_) | SourceParams::Kinesis(_) | SourceParams::Pulsar(_) => { // TODO consider any validation opportunity } @@ -100,7 +101,7 @@ impl SourceConfigForSerialization { | SourceParams::IngestCli => {} } match &self.source_params { - SourceParams::Kafka(_) => {} + SourceParams::Kafka(_) | SourceParams::Http(_) => {} _ => { if self.desired_num_pipelines > 1 || self.max_num_pipelines_per_indexer > 1 { bail!("Quickwit currently supports multiple pipelines only for Kafka sources. Open an issue https://github.com/quickwit-oss/quickwit/issues if you need the feature for other source types."); diff --git a/quickwit/quickwit-indexing/Cargo.toml b/quickwit/quickwit-indexing/Cargo.toml index d1ffd5c6f7d..36c81c8911c 100644 --- a/quickwit/quickwit-indexing/Cargo.toml +++ b/quickwit/quickwit-indexing/Cargo.toml @@ -16,6 +16,7 @@ aws-sdk-kinesis = { workspace = true, optional = true } anyhow = { workspace = true } arc-swap = { workspace = true } +async-compression = { workspace = true } async-trait = { workspace = true } backoff = { workspace = true, optional = true } byte-unit = { workspace = true } @@ -25,6 +26,7 @@ fail = { workspace = true } flume = { workspace = true } fnv = { workspace = true } futures = { workspace = true } +futures-util = {workspace = true } itertools = { workspace = true } libz-sys = { workspace = true, optional = true } once_cell = { workspace = true } @@ -33,6 +35,8 @@ openssl = { workspace = true, optional = true } pulsar = { workspace = true, optional = true } quickwit-query = { workspace = true } rdkafka = { workspace = true, optional = true } +regex = { workspace = true } +reqwest = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } tantivy = { workspace = true } diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index 736ff51726e..c6c766b217a 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -380,7 +380,7 @@ impl IndexingPipeline { .protect_future(quickwit_supported_sources().load_source( Arc::new(SourceExecutionContext { metastore: self.params.metastore.clone(), - index_uid: self.params.pipeline_id.index_uid.clone(), + pipeline_id: self.params.pipeline_id.clone(), queues_dir_path: self.params.queues_dir_path.clone(), source_config: self.params.source_config.clone(), }), diff --git a/quickwit/quickwit-indexing/src/source/file_source.rs b/quickwit/quickwit-indexing/src/source/file_source.rs index d07f6e669d5..85755db7e9a 100644 --- a/quickwit/quickwit-indexing/src/source/file_source.rs +++ b/quickwit/quickwit-indexing/src/source/file_source.rs @@ -181,6 +181,7 @@ mod tests { use quickwit_proto::IndexUid; use super::*; + use crate::models::IndexingPipelineId; use crate::source::SourceActor; #[tokio::test] @@ -190,10 +191,16 @@ mod tests { let params = FileSourceParams::file("data/test_corpus.json"); let metastore = metastore_for_test(); + let pipeline_id = IndexingPipelineId { + index_uid: IndexUid::new("test-index"), + source_id: "test-file-source".to_string(), + node_id: "test-node".to_string(), + pipeline_ord: 0, + }; let file_source = FileSourceFactory::typed_create_source( SourceExecutionContext::for_test( metastore, - IndexUid::new("test-index"), + pipeline_id, PathBuf::from("./queues"), SourceConfig { source_id: "test-file-source".to_string(), @@ -256,10 +263,16 @@ mod tests { .to_string(); let metastore = metastore_for_test(); + let pipeline_id = IndexingPipelineId { + index_uid: IndexUid::new("test-index"), + source_id: "test-file-source".to_string(), + node_id: "test-node".to_string(), + pipeline_ord: 0, + }; let source = FileSourceFactory::typed_create_source( SourceExecutionContext::for_test( metastore, - IndexUid::new("test-index"), + pipeline_id, PathBuf::from("./queues"), SourceConfig { source_id: "test-file-source".to_string(), @@ -346,10 +359,16 @@ mod tests { checkpoint.try_apply_delta(checkpoint_delta).unwrap(); let metastore = metastore_for_test(); + let pipeline_id = IndexingPipelineId { + index_uid: IndexUid::new("test-index"), + source_id: "test-file-source".to_string(), + node_id: "test-node".to_string(), + pipeline_ord: 0, + }; let source = FileSourceFactory::typed_create_source( SourceExecutionContext::for_test( metastore, - IndexUid::new("test-index"), + pipeline_id, PathBuf::from("./queues"), SourceConfig { source_id: "test-file-source".to_string(), diff --git a/quickwit/quickwit-indexing/src/source/http_source.rs b/quickwit/quickwit-indexing/src/source/http_source.rs new file mode 100644 index 00000000000..ad61e4ca584 --- /dev/null +++ b/quickwit/quickwit-indexing/src/source/http_source.rs @@ -0,0 +1,512 @@ +// Copyright (C) 2023 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::collections::BTreeMap; +use std::fmt; +use std::ops::Range; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Context; +use async_compression::futures::bufread::GzipDecoder; +use async_trait::async_trait; +use bytes::Bytes; +use futures::AsyncRead; +use futures_util::io::BufReader; +use futures_util::{AsyncBufReadExt, StreamExt, TryStreamExt}; +use itertools::Itertools; +use once_cell::sync::Lazy; +use quickwit_actors::{ActorExitStatus, Mailbox}; +use quickwit_config::HttpSourceParams; +use quickwit_metastore::checkpoint::{PartitionId, Position, SourceCheckpoint}; +use regex::Regex; +use serde::Serialize; +use tracing::log::warn; +use tracing::{debug, info}; + +use crate::actors::DocProcessor; +use crate::models::RawDocBatch; +use crate::source::{Source, SourceContext, SourceExecutionContext, TypedSourceFactory}; + +/// Number of bytes after which a new batch is cut. + +static URI_EXPAND_PATTERN: Lazy = Lazy::new(|| Regex::new(r"(\{\d+..\d+})").unwrap()); + +#[derive(Default, Clone, Debug, Eq, PartialEq, Serialize)] +pub struct HttpSourceCounters { + pub http_offset: u64, + pub previous_offset: u64, + pub current_offset: u64, + pub num_lines_processed: u64, +} + +pub struct HttpSource { + ctx: Arc, + current_counters: BTreeMap, + current_reader: Option>>>, +} + +impl fmt::Debug for HttpSource { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "HttpSource {{ source_id: {} }}", + self.ctx.pipeline_id.source_id + ) + } +} + +static NODE_IDX_PATTERN: Lazy = + Lazy::new(|| Regex::new(r"^[a-zA-Z0-9-_\.]+[_-](\d+)$").unwrap()); + +impl HttpSource { + pub fn new( + ctx: Arc, + params: HttpSourceParams, + checkpoint: SourceCheckpoint, + ) -> Self { + let node_idx = parse_node_idx(ctx.pipeline_id.node_id.as_str()); + let mut uris = params + .uris + .iter() + .map(|uri| uri.as_str().to_string()) + .collect_vec(); + if let Some(uri_pattern) = params.uri_pattern { + uris.extend(expand_uris(uri_pattern.as_str())); + } + let desired_num_pipelines = ctx.source_config.desired_num_pipelines; + let mut current_counters = BTreeMap::new(); + debug!("`HttpSource` partitions expressed as URIs: {:?}", uris); + info!( + "`HttpSource` will select partitions that satisfy `idx % desired_num_pipelines.get() \ + == node_idx` with node_idx={node_idx} and \ + desired_num_pipelines={desired_num_pipelines:?}" + ); + for (idx, uri) in uris.iter().enumerate() { + if idx % desired_num_pipelines.get() != node_idx { + continue; + } + let partition_id = PartitionId::from(uri.to_string()); + let position_opt = checkpoint.position_for_partition(&partition_id).cloned(); + if let Some(position) = &position_opt { + let next_offset = position_to_u64(position); + if next_offset == u64::MAX { + // Skip partitions that have been fully processed. + continue; + } + let counters = HttpSourceCounters { + previous_offset: next_offset, + current_offset: next_offset, + num_lines_processed: 0, + http_offset: 0, + }; + current_counters.insert(partition_id.clone(), counters); + } else { + let counters = HttpSourceCounters { + previous_offset: 0, + current_offset: 0, + num_lines_processed: 0, + http_offset: 0, + }; + current_counters.insert(partition_id.clone(), counters); + } + } + info!("HttpSource current counters: {current_counters:?}"); + Self { + ctx, + current_counters, + current_reader: None, + } + } +} + +fn position_to_u64(position: &Position) -> u64 { + match position { + Position::Beginning => 0, + Position::Offset(offset_str) => offset_str + .parse() + .expect("Failed to parse checkpoint position to u64."), + } +} + +pub(crate) const BATCH_NUM_BYTES_LIMIT: u64 = 5_000_000u64; + +#[async_trait] +impl Source for HttpSource { + async fn emit_batches( + &mut self, + doc_processor_mailbox: &Mailbox, + ctx: &SourceContext, + ) -> Result { + // Get next partition and position to read from + let Some((partition, counters)) = self + .current_counters + .iter_mut() + .find(|(_, partition_counters)| partition_counters.current_offset < u64::MAX) + else { + info!("No more partitions to read from, stopping source."); + ctx.send_exit_with_success(doc_processor_mailbox).await?; + return Err(ActorExitStatus::Success); + }; + let uri = partition.0.as_str(); + let client = reqwest::Client::new(); + let mut lines = match self.current_reader.take() { + Some(lines) => lines, + None => { + let stream = client + .get(uri) + .send() + .await + .context(format!("Failed to get response from uri: {uri:?}"))? + .error_for_status() + .context("Invalid status code returned.")? + .bytes_stream() + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) + .into_async_read(); + counters.http_offset = 0; + let reader = Box::new(GzipDecoder::new(BufReader::new(stream))) + as Box; + BufReader::new(reader).lines() + } + }; + let mut doc_batch = RawDocBatch::default(); + let mut reach_eof = true; + while let Some(Ok(line)) = lines.next().await { + counters.http_offset += 1 + line.as_bytes().len() as u64; // +1 for newline + counters.num_lines_processed += 1; + if counters.previous_offset >= counters.http_offset { + continue; + } + if counters.http_offset - counters.current_offset > BATCH_NUM_BYTES_LIMIT { + reach_eof = false; + break; + } + doc_batch.docs.push(Bytes::from(line)); + } + + if !doc_batch.docs.is_empty() { + let final_offset: u64 = if reach_eof { + u64::MAX + } else { + counters.http_offset + }; + counters.previous_offset = counters.current_offset; + counters.current_offset = final_offset; + doc_batch + .checkpoint_delta + .record_partition_delta( + partition.clone(), + Position::from(counters.previous_offset), + Position::from(counters.current_offset), + ) + .unwrap(); + ctx.send_message(doc_processor_mailbox, doc_batch).await?; + } + if !reach_eof { + self.current_reader = Some(lines); + } + Ok(Duration::default()) + } + + fn name(&self) -> String { + format!("HttpSource{{source_id={}}}", self.ctx.pipeline_id.source_id) + } + + fn observable_state(&self) -> serde_json::Value { + serde_json::to_value(&self.current_counters).unwrap() + } +} + +pub struct HttpSourceFactory; + +#[async_trait] +impl TypedSourceFactory for HttpSourceFactory { + type Source = HttpSource; + type Params = HttpSourceParams; + + async fn typed_create_source( + ctx: Arc, + params: HttpSourceParams, + checkpoint: SourceCheckpoint, + ) -> anyhow::Result { + Ok(HttpSource::new(ctx, params, checkpoint)) + } +} + +fn parse_node_idx(node_id: &str) -> usize { + match NODE_IDX_PATTERN.captures(node_id) { + Some(captured) => { + let idx = captured.get(1).unwrap().as_str().parse::().unwrap(); + idx + } + None => { + warn!("`HttpSource` cannot use `{node_id}` to select a subset of partitions."); + 0 + } + } +} + +pub struct RangeExpand<'a> { + replace_str: &'a str, + range: Range, + zero_pad_by: usize, +} + +/// Expands a uri with the range syntax into the exported/expected uris. +fn expand_uris(uri: &str) -> Vec { + let mut total_variants = 0; + let mut ranges = Vec::new(); + for capture in URI_EXPAND_PATTERN.captures_iter(uri) { + let cap = capture.get(0).unwrap(); + let replace_str = cap.as_str(); + + let range_str = replace_str.trim_matches('{').trim_matches('}'); + let (start, end) = range_str.split_once("..").unwrap(); + let pad_start = start.starts_with('0'); + let zero_pad_by = if pad_start { start.len() } else { 0 }; + let start = start.parse::().unwrap(); + let end = end.parse::().unwrap(); + let range = start..end; + + total_variants += range.len(); + ranges.push(RangeExpand { + replace_str, + range, + zero_pad_by, + }) + } + + let mut uris = Vec::with_capacity(total_variants); + populate_uri(uri, &ranges, &mut uris); + uris +} + +fn populate_uri(uri: &str, range_expand: &[RangeExpand], uris: &mut Vec) { + assert!(!range_expand.is_empty()); + let uri_clone = uri.to_string(); + let current_expand_range = &range_expand[0]; + for i in current_expand_range.range.clone() { + let value = format!("{i:0>pad_by$}", pad_by = current_expand_range.zero_pad_by); + let updated_uri = uri_clone.replacen(range_expand[0].replace_str, &value, 1); + if range_expand.len() > 1 { + populate_uri(&updated_uri, &range_expand[1..], uris); + } else { + uris.push(updated_uri); + } + } +} + +#[cfg(test)] +mod tests { + use std::num::NonZeroUsize; + use std::path::PathBuf; + + use quickwit_actors::{Command, Universe}; + use quickwit_common::uri::Uri; + use quickwit_config::{SourceConfig, SourceInputFormat, SourceParams}; + use quickwit_metastore::checkpoint::{SourceCheckpoint, SourceCheckpointDelta}; + use quickwit_metastore::metastore_for_test; + use quickwit_proto::IndexUid; + + use super::*; + use crate::models::IndexingPipelineId; + use crate::source::SourceActor; + + #[test] + fn test_parse_node_idx() { + assert_eq!(parse_node_idx("kafka-node-0"), 0); + assert_eq!(parse_node_idx("searcher-1"), 1); + assert_eq!(parse_node_idx("kafka_node_020"), 20); + } + + #[test] + fn test_uri_expand() { + let uri = "http://localhost:3000/{00..2}-{0..3}.json"; + let uris = expand_uris(uri); + + assert_eq!( + uris, + vec![ + "http://localhost:3000/00-0.json", + "http://localhost:3000/00-1.json", + "http://localhost:3000/00-2.json", + "http://localhost:3000/01-0.json", + "http://localhost:3000/01-1.json", + "http://localhost:3000/01-2.json", + ] + ) + } + + #[tokio::test] + async fn test_http_source() -> anyhow::Result<()> { + quickwit_common::setup_logging_for_tests(); + let universe = Universe::with_accelerated_time(); + let (doc_processor_mailbox, indexer_inbox) = universe.create_test_mailbox(); + let params = HttpSourceParams::from_pattern(Uri::from_well_formed( + "https://data.gharchive.org/2015-01-01-{0..2}.json.gz", + )); + + let metastore = metastore_for_test(); + let pipeline_id = IndexingPipelineId { + index_uid: IndexUid::new("test-index"), + source_id: "kafka-file-source".to_string(), + node_id: "kafka-node".to_string(), + pipeline_ord: 0, + }; + let file_source = HttpSourceFactory::typed_create_source( + SourceExecutionContext::for_test( + metastore, + pipeline_id, + PathBuf::from("./queues"), + SourceConfig { + source_id: "test-http-source".to_string(), + desired_num_pipelines: NonZeroUsize::new(1).unwrap(), + max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + enabled: true, + source_params: SourceParams::Http(params.clone()), + transform_config: None, + input_format: SourceInputFormat::Json, + }, + ), + params, + SourceCheckpoint::default(), + ) + .await?; + let file_source_actor = SourceActor { + source: Box::new(file_source), + doc_processor_mailbox, + }; + let (_http_source_mailbox, http_source_handle) = + universe.spawn_builder().spawn(file_source_actor); + let (actor_termination, positions) = http_source_handle.join().await; + assert!(actor_termination.is_success()); + assert_eq!( + positions, + serde_json::json!({ + "https://data.gharchive.org/2015-01-01-0.json.gz": { + "current_offset": 18446744073709551615u64, + "http_offset": 18023797, + "num_lines_processed": 7702, + "previous_offset": 15002237, + }, + "https://data.gharchive.org/2015-01-01-1.json.gz": { + "current_offset": 18446744073709551615u64, + "num_lines_processed": 7427, + "http_offset": 17649671, + "previous_offset": 15007652, + } + }) + ); + let indexer_msgs = indexer_inbox.drain_for_test(); + assert_eq!(indexer_msgs.len(), 9); + let batch1 = indexer_msgs[0].downcast_ref::().unwrap(); + let batch2 = indexer_msgs[1].downcast_ref::().unwrap(); + let command = indexer_msgs[8].downcast_ref::().unwrap(); + assert_eq!( + format!("{:?}", &batch1.checkpoint_delta), + format!( + "∆({}:{})", + "https://data.gharchive.org/2015-01-01-0.json.gz", + "(00000000000000000000..00000000000005000080]" + ) + ); + assert_eq!( + &extract_position_delta(&batch2.checkpoint_delta).unwrap(), + "00000000000005000080..00000000000010000360" + ); + assert!(matches!(command, &Command::ExitWithSuccess)); + Ok(()) + } + + fn extract_position_delta(checkpoint_delta: &SourceCheckpointDelta) -> Option { + let checkpoint_delta_str = format!("{checkpoint_delta:?}"); + let (_left, right) = + &checkpoint_delta_str[..checkpoint_delta_str.len() - 2].rsplit_once('(')?; + Some(right.to_string()) + } + + #[tokio::test] + async fn test_http_source_resume_from_checkpoint() { + quickwit_common::setup_logging_for_tests(); + let universe = Universe::with_accelerated_time(); + let (doc_processor_mailbox, doc_processor_inbox) = universe.create_test_mailbox(); + let source_uri = "https://data.gharchive.org/2015-01-01-0.json.gz"; + let params = HttpSourceParams::from_list(vec![Uri::from_well_formed( + "https://data.gharchive.org/2015-01-01-0.json.gz", + )]); + let mut checkpoint = SourceCheckpoint::default(); + let partition_id = PartitionId::from(source_uri.to_string()); + let checkpoint_delta = SourceCheckpointDelta::from_partition_delta( + partition_id, + Position::from(0u64), + Position::from(18017456u64), + ) + .unwrap(); + checkpoint.try_apply_delta(checkpoint_delta).unwrap(); + + let metastore = metastore_for_test(); + let pipeline_id = IndexingPipelineId { + index_uid: IndexUid::new("test-index"), + source_id: "test-file-source".to_string(), + node_id: "test-node".to_string(), + pipeline_ord: 0, + }; + let source = HttpSourceFactory::typed_create_source( + SourceExecutionContext::for_test( + metastore, + pipeline_id, + PathBuf::from("./queues"), + SourceConfig { + source_id: "test-http-source".to_string(), + desired_num_pipelines: NonZeroUsize::new(1).unwrap(), + max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(), + enabled: true, + source_params: SourceParams::Http(params.clone()), + transform_config: None, + input_format: SourceInputFormat::Json, + }, + ), + params, + checkpoint, + ) + .await + .unwrap(); + let http_source_actor = SourceActor { + source: Box::new(source), + doc_processor_mailbox, + }; + let (_file_source_mailbox, file_source_handle) = + universe.spawn_builder().spawn(http_source_actor); + let (actor_termination, counters) = file_source_handle.join().await; + assert!(actor_termination.is_success()); + assert_eq!( + counters, + serde_json::json!({ + "https://data.gharchive.org/2015-01-01-0.json.gz": { + "current_offset": 18446744073709551615u64, + "http_offset": 18023797, + "num_lines_processed": 7702, + "previous_offset": 18017456, + } + }) + ); + let indexer_messages: Vec = doc_processor_inbox.drain_for_test_typed(); + assert!(&indexer_messages[0].docs[0].starts_with(b"{\"id\"")); + } +} diff --git a/quickwit/quickwit-indexing/src/source/ingest_api_source.rs b/quickwit/quickwit-indexing/src/source/ingest_api_source.rs index 610dc658fb1..33dc64df42d 100644 --- a/quickwit/quickwit-indexing/src/source/ingest_api_source.rs +++ b/quickwit/quickwit-indexing/src/source/ingest_api_source.rs @@ -80,7 +80,7 @@ impl IngestApiSource { // Ensure a queue for this index exists. let create_queue_req = CreateQueueIfNotExistsRequest { - queue_id: ctx.index_uid.index_id().to_string(), + queue_id: ctx.pipeline_id.index_uid.index_id().to_string(), }; ingest_api_service.ask_for_res(create_queue_req).await?; @@ -121,7 +121,7 @@ impl Source for IngestApiSource { ctx: &SourceContext, ) -> Result { let fetch_req = FetchRequest { - index_id: self.ctx.index_uid.index_id().to_string(), + index_id: self.ctx.pipeline_id.index_uid.index_id().to_string(), start_after: self.counters.current_offset, num_bytes_limit: None, }; @@ -180,7 +180,7 @@ impl Source for IngestApiSource { { let up_to_position_included = offset_str.parse::()?; let suggest_truncate_req = SuggestTruncateRequest { - index_id: self.ctx.index_uid.index_id().to_string(), + index_id: self.ctx.pipeline_id.index_uid.index_id().to_string(), up_to_position_included, }; ctx.ask_for_res(&self.ingest_api_service, suggest_truncate_req) @@ -231,6 +231,7 @@ mod tests { use quickwit_proto::IndexUid; use super::*; + use crate::models::IndexingPipelineId; use crate::source::SourceActor; fn make_ingest_request( @@ -275,7 +276,6 @@ mod tests { let universe = Universe::with_accelerated_time(); let metastore = metastore_for_test(); let index_id = append_random_suffix("test-ingest-api-source"); - let index_uid = IndexUid::new(&index_id); let temp_dir = tempfile::tempdir()?; let queues_dir_path = temp_dir.path(); @@ -283,9 +283,15 @@ mod tests { init_ingest_api(&universe, queues_dir_path, &IngestApiConfig::default()).await?; let (doc_processor_mailbox, doc_processor_inbox) = universe.create_test_mailbox(); let source_config = make_source_config(); + let pipeline_id = IndexingPipelineId { + index_uid: IndexUid::new(&index_id), + source_id: "kafka-file-source".to_string(), + node_id: "kafka-node".to_string(), + pipeline_ord: 0, + }; let ctx = SourceExecutionContext::for_test( metastore, - index_uid, + pipeline_id, queues_dir_path.to_path_buf(), source_config, ); @@ -364,7 +370,6 @@ mod tests { let universe = Universe::with_accelerated_time(); let metastore = metastore_for_test(); let index_id = append_random_suffix("test-ingest-api-source"); - let index_uid = IndexUid::new(&index_id); let temp_dir = tempfile::tempdir()?; let queues_dir_path = temp_dir.path(); let ingest_api_service = @@ -382,9 +387,15 @@ mod tests { checkpoint.try_apply_delta(checkpoint_delta).unwrap(); let source_config = make_source_config(); + let pipeline_id = IndexingPipelineId { + index_uid: IndexUid::new(&index_id), + source_id: "kafka-file-source".to_string(), + node_id: "kafka-node".to_string(), + pipeline_ord: 0, + }; let ctx = SourceExecutionContext::for_test( metastore, - index_uid, + pipeline_id, queues_dir_path.to_path_buf(), source_config, ); @@ -434,7 +445,6 @@ mod tests { let universe = Universe::with_accelerated_time(); let metastore = metastore_for_test(); let index_id = append_random_suffix("test-ingest-api-source"); - let index_uid = IndexUid::new(&index_id); let temp_dir = tempfile::tempdir()?; let queues_dir_path = temp_dir.path(); let ingest_api_service = @@ -442,9 +452,15 @@ mod tests { let (doc_processor_mailbox, doc_processor_inbox) = universe.create_test_mailbox(); let source_config = make_source_config(); + let pipeline_id = IndexingPipelineId { + index_uid: IndexUid::new(&index_id), + source_id: "kafka-file-source".to_string(), + node_id: "kafka-node".to_string(), + pipeline_ord: 0, + }; let ctx = SourceExecutionContext::for_test( metastore, - index_uid, + pipeline_id, queues_dir_path.to_path_buf(), source_config, ); @@ -488,7 +504,6 @@ mod tests { let universe = Universe::with_accelerated_time(); let metastore = metastore_for_test(); let index_id = append_random_suffix("test-ingest-api-source"); - let index_uid = IndexUid::new(&index_id); let temp_dir = tempfile::tempdir()?; let queues_dir_path = temp_dir.path(); @@ -496,9 +511,15 @@ mod tests { init_ingest_api(&universe, queues_dir_path, &IngestApiConfig::default()).await?; let (doc_processor_mailbox, doc_processor_inbox) = universe.create_test_mailbox(); let source_config = make_source_config(); + let pipeline_id = IndexingPipelineId { + index_uid: IndexUid::new(&index_id), + source_id: "kafka-file-source".to_string(), + node_id: "kafka-node".to_string(), + pipeline_ord: 0, + }; let ctx = SourceExecutionContext::for_test( metastore, - index_uid, + pipeline_id, queues_dir_path.to_path_buf(), source_config, ); @@ -555,7 +576,6 @@ mod tests { let universe = Universe::with_accelerated_time(); let metastore = metastore_for_test(); let index_id = append_random_suffix("test-ingest-api-source"); - let index_uid = IndexUid::new(&index_id); let temp_dir = tempfile::tempdir()?; let queues_dir_path = temp_dir.path(); @@ -563,9 +583,15 @@ mod tests { init_ingest_api(&universe, queues_dir_path, &IngestApiConfig::default()).await?; let (doc_processor_mailbox, doc_processor_inbox) = universe.create_test_mailbox(); let source_config = make_source_config(); + let pipeline_id = IndexingPipelineId { + index_uid: IndexUid::new(&index_id), + source_id: "kafka-file-source".to_string(), + node_id: "kafka-node".to_string(), + pipeline_ord: 0, + }; let ctx = SourceExecutionContext::for_test( metastore, - index_uid, + pipeline_id, queues_dir_path.to_path_buf(), source_config, ); diff --git a/quickwit/quickwit-indexing/src/source/kafka_source.rs b/quickwit/quickwit-indexing/src/source/kafka_source.rs index f21d88847f5..810e8d24ce9 100644 --- a/quickwit/quickwit-indexing/src/source/kafka_source.rs +++ b/quickwit/quickwit-indexing/src/source/kafka_source.rs @@ -229,7 +229,7 @@ impl fmt::Debug for KafkaSource { fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result { formatter .debug_struct("KafkaSource") - .field("index_id", &self.ctx.index_uid.index_id()) + .field("index_id", &self.ctx.pipeline_id.index_uid.index_id()) .field("source_id", &self.ctx.source_config.source_id) .field("topic", &self.topic) .finish() @@ -248,7 +248,7 @@ impl KafkaSource { let (events_tx, events_rx) = mpsc::channel(100); let (client_config, consumer) = create_consumer( - &ctx.index_uid, + &ctx.pipeline_id.index_uid, &ctx.source_config.source_id, params, events_tx.clone(), @@ -266,7 +266,7 @@ impl KafkaSource { let publish_lock = PublishLock::default(); info!( - index_id=%ctx.index_uid.index_id(), + index_id=%ctx.pipeline_id.index_uid.index_id(), source_id=%ctx.source_config.source_id, topic=%topic, group_id=%group_id, @@ -349,13 +349,13 @@ impl KafkaSource { .protect_future( self.ctx .metastore - .index_metadata_strict(&self.ctx.index_uid), + .index_metadata_strict(&self.ctx.pipeline_id.index_uid), ) .await .with_context(|| { format!( "Failed to fetch index metadata for index `{}`.", - self.ctx.index_uid.index_id() + self.ctx.pipeline_id.index_uid.index_id() ) })?; let checkpoint = index_metadata @@ -392,7 +392,7 @@ impl KafkaSource { next_offsets.push((partition, next_offset)); } info!( - index_id=%self.ctx.index_uid.index_id(), + index_id=%self.ctx.pipeline_id.index_uid.index_id(), source_id=%self.ctx.source_config.source_id, topic=%self.topic, partitions=?partitions, @@ -562,7 +562,7 @@ impl Source for KafkaSource { .sorted() .collect(); json!({ - "index_id": self.ctx.index_uid.index_id(), + "index_id": self.ctx.pipeline_id.index_uid.index_id(), "source_id": self.ctx.source_config.source_id, "topic": self.topic, "assigned_partitions": assigned_partitions, @@ -778,6 +778,7 @@ mod kafka_broker_tests { use tokio::sync::watch; use super::*; + use crate::models::IndexingPipelineId; use crate::new_split_id; use crate::source::{quickwit_supported_sources, SourceActor}; @@ -955,16 +956,21 @@ mod kafka_broker_tests { let metastore = metastore_for_test(); let index_id = append_random_suffix("test-kafka-source--process-message--index"); - let index_uid = IndexUid::new(&index_id); let (_source_id, source_config) = get_source_config(&topic); let params = if let SourceParams::Kafka(params) = source_config.clone().source_params { params } else { unreachable!() }; + let pipeline_id = IndexingPipelineId { + index_uid: IndexUid::new(&index_id), + source_id: "kafka-file-source".to_string(), + node_id: "kafka-node".to_string(), + pipeline_ord: 0, + }; let ctx = SourceExecutionContext::for_test( metastore, - index_uid, + pipeline_id, PathBuf::from("./queues"), source_config, ); @@ -1082,15 +1088,20 @@ mod kafka_broker_tests { let (source_id, source_config) = get_source_config(&topic); let index_uid = setup_index(metastore.clone(), &index_id, &source_id, &[(2, -1, 42)]).await; - let params = if let SourceParams::Kafka(params) = source_config.clone().source_params { params } else { unreachable!() }; + let pipeline_id = IndexingPipelineId { + index_uid, + source_id: "kafka-file-source".to_string(), + node_id: "kafka-node".to_string(), + pipeline_ord: 0, + }; let ctx = Arc::new(SourceExecutionContext { metastore, - index_uid, + pipeline_id, queues_dir_path: PathBuf::from("./queues"), source_config, }); @@ -1149,9 +1160,15 @@ mod kafka_broker_tests { } else { unreachable!() }; + let pipeline_id = IndexingPipelineId { + index_uid, + source_id: "kafka-file-source".to_string(), + node_id: "kafka-node".to_string(), + pipeline_ord: 0, + }; let ctx = SourceExecutionContext::for_test( metastore, - index_uid, + pipeline_id, PathBuf::from("./queues"), source_config, ); @@ -1206,9 +1223,15 @@ mod kafka_broker_tests { } else { unreachable!() }; + let pipeline_id = IndexingPipelineId { + index_uid, + source_id: "kafka-file-source".to_string(), + node_id: "kafka-node".to_string(), + pipeline_ord: 0, + }; let ctx = SourceExecutionContext::for_test( metastore, - index_uid, + pipeline_id, PathBuf::from("./queues"), source_config, ); @@ -1242,11 +1265,17 @@ mod kafka_broker_tests { let index_id = append_random_suffix("test-kafka-source--index"); let (source_id, source_config) = get_source_config(&topic); let index_uid = setup_index(metastore.clone(), &index_id, &source_id, &[]).await; + let pipeline_id = IndexingPipelineId { + index_uid, + source_id: "kafka-file-source".to_string(), + node_id: "kafka-node".to_string(), + pipeline_ord: 0, + }; let source = source_loader .load_source( SourceExecutionContext::for_test( metastore, - index_uid, + pipeline_id, PathBuf::from("./queues"), source_config, ), @@ -1301,11 +1330,17 @@ mod kafka_broker_tests { let index_id = append_random_suffix("test-kafka-source--index"); let (source_id, source_config) = get_source_config(&topic); let index_uid = setup_index(metastore.clone(), &index_id, &source_id, &[]).await; + let pipeline_id = IndexingPipelineId { + index_uid, + source_id: "kafka-file-source".to_string(), + node_id: "kafka-node".to_string(), + pipeline_ord: 0, + }; let source = source_loader .load_source( Arc::new(SourceExecutionContext { metastore, - index_uid, + pipeline_id, queues_dir_path: PathBuf::from("./queues"), source_config, }), @@ -1370,11 +1405,17 @@ mod kafka_broker_tests { &[(0, -1, 0), (1, -1, 2)], ) .await; + let pipeline_id = IndexingPipelineId { + index_uid, + source_id: "kafka-file-source".to_string(), + node_id: "kafka-node".to_string(), + pipeline_ord: 0, + }; let source = source_loader .load_source( Arc::new(SourceExecutionContext { metastore, - index_uid, + pipeline_id, queues_dir_path: PathBuf::from("./queues"), source_config, }), diff --git a/quickwit/quickwit-indexing/src/source/mod.rs b/quickwit/quickwit-indexing/src/source/mod.rs index 65a69b47ab3..d9389577f60 100644 --- a/quickwit/quickwit-indexing/src/source/mod.rs +++ b/quickwit/quickwit-indexing/src/source/mod.rs @@ -58,6 +58,7 @@ //! - the kafka source: the partition id is a kafka topic partition id, and the position is a kafka //! offset. mod file_source; +mod http_source; mod ingest_api_source; #[cfg(feature = "kafka")] mod kafka_source; @@ -88,7 +89,6 @@ use quickwit_common::runtimes::RuntimeType; use quickwit_config::{SourceConfig, SourceParams}; use quickwit_metastore::checkpoint::SourceCheckpoint; use quickwit_metastore::Metastore; -use quickwit_proto::IndexUid; use serde_json::Value as JsonValue; pub use source_factory::{SourceFactory, SourceLoader, TypedSourceFactory}; use tokio::runtime::Handle; @@ -97,12 +97,14 @@ pub use vec_source::{VecSource, VecSourceFactory}; pub use void_source::{VoidSource, VoidSourceFactory}; use crate::actors::DocProcessor; +use crate::models::IndexingPipelineId; +use crate::source::http_source::HttpSourceFactory; use crate::source::ingest_api_source::IngestApiSourceFactory; /// Runtime configuration used during execution of a source actor. pub struct SourceExecutionContext { pub metastore: Arc, - pub index_uid: IndexUid, + pub pipeline_id: IndexingPipelineId, // Ingest API queues directory path. pub queues_dir_path: PathBuf, pub source_config: SourceConfig, @@ -112,13 +114,13 @@ impl SourceExecutionContext { #[cfg(test)] fn for_test( metastore: Arc, - index_uid: IndexUid, + pipeline_id: IndexingPipelineId, queues_dir_path: PathBuf, source_config: SourceConfig, ) -> Arc { Arc::new(Self { metastore, - index_uid, + pipeline_id, queues_dir_path, source_config, }) @@ -296,6 +298,7 @@ pub fn quickwit_supported_sources() -> &'static SourceLoader { source_factory.add_source("vec", VecSourceFactory); source_factory.add_source("void", VoidSourceFactory); source_factory.add_source("ingest-api", IngestApiSourceFactory); + source_factory.add_source("http", HttpSourceFactory); source_factory }) } diff --git a/quickwit/quickwit-indexing/src/source/pulsar_source.rs b/quickwit/quickwit-indexing/src/source/pulsar_source.rs index 215bd95a0ef..049f1b2d264 100644 --- a/quickwit/quickwit-indexing/src/source/pulsar_source.rs +++ b/quickwit/quickwit-indexing/src/source/pulsar_source.rs @@ -106,9 +106,10 @@ impl PulsarSource { params: PulsarSourceParams, checkpoint: SourceCheckpoint, ) -> anyhow::Result { - let subscription_name = subscription_name(&ctx.index_uid, &ctx.source_config.source_id); + let subscription_name = + subscription_name(&ctx.pipeline_id.index_uid, &ctx.source_config.source_id); info!( - index_id=%ctx.index_uid.index_id(), + index_id=%ctx.pipeline_id.index_uid.index_id(), source_id=%ctx.source_config.source_id, topics=?params.topics, subscription_name=%subscription_name, @@ -284,7 +285,7 @@ impl Source for PulsarSource { fn observable_state(&self) -> JsonValue { json!({ - "index_id": self.ctx.index_uid.index_id(), + "index_id": self.ctx.pipeline_id.index_uid.index_id(), "source_id": self.ctx.source_config.source_id, "topics": self.params.topics, "subscription_name": self.subscription_name, @@ -474,6 +475,7 @@ mod pulsar_broker_tests { use reqwest::StatusCode; use super::*; + use crate::models::IndexingPipelineId; use crate::new_split_id; use crate::source::pulsar_source::{msg_id_from_position, msg_id_to_position}; use crate::source::{quickwit_supported_sources, SuggestTruncate}; @@ -709,9 +711,15 @@ mod pulsar_broker_tests { source_config: SourceConfig, start_checkpoint: SourceCheckpoint, ) -> anyhow::Result<(ActorHandle, Inbox)> { + let pipeline_id = IndexingPipelineId { + index_uid: index_uid.clone(), + source_id: source_config.source_id.to_string(), + node_id: "kafka-node".to_string(), + pipeline_ord: 0, + }; let ctx = SourceExecutionContext::for_test( metastore, - index_uid, + pipeline_id, PathBuf::from("./queues"), source_config, ); @@ -823,8 +831,6 @@ mod pulsar_broker_tests { let metastore = metastore_for_test(); let topic = append_random_suffix("test-pulsar-source-topic"); - let index_id = append_random_suffix("test-pulsar-source-index"); - let index_uid = IndexUid::new(&index_id); let (_source_id, source_config) = get_source_config([&topic]); let params = if let SourceParams::Pulsar(params) = source_config.clone().source_params { params @@ -832,9 +838,15 @@ mod pulsar_broker_tests { unreachable!() }; + let pipeline_id = IndexingPipelineId { + index_uid: IndexUid::new("test-index"), + source_id: "kafka-file-source".to_string(), + node_id: "kafka-node".to_string(), + pipeline_ord: 0, + }; let ctx = SourceExecutionContext::for_test( metastore, - index_uid, + pipeline_id, PathBuf::from("./queues"), source_config, ); diff --git a/quickwit/quickwit-indexing/src/source/source_factory.rs b/quickwit/quickwit-indexing/src/source/source_factory.rs index be4f0ee32f9..01ef3b6833a 100644 --- a/quickwit/quickwit-indexing/src/source/source_factory.rs +++ b/quickwit/quickwit-indexing/src/source/source_factory.rs @@ -127,6 +127,7 @@ mod tests { use quickwit_proto::IndexUid; use super::*; + use crate::models::IndexingPipelineId; use crate::source::quickwit_supported_sources; #[tokio::test] @@ -142,11 +143,17 @@ mod tests { transform_config: None, input_format: SourceInputFormat::Json, }; + let pipeline_id = IndexingPipelineId { + index_uid: IndexUid::new("test-index"), + source_id: "kafka-file-source".to_string(), + node_id: "kafka-node".to_string(), + pipeline_ord: 0, + }; source_loader .load_source( SourceExecutionContext::for_test( metastore, - IndexUid::new("test-index"), + pipeline_id, PathBuf::from("./queues"), source_config, ), diff --git a/quickwit/quickwit-indexing/src/source/vec_source.rs b/quickwit/quickwit-indexing/src/source/vec_source.rs index be88fdbac70..3a1fd5aefa0 100644 --- a/quickwit/quickwit-indexing/src/source/vec_source.rs +++ b/quickwit/quickwit-indexing/src/source/vec_source.rs @@ -136,6 +136,7 @@ mod tests { use serde_json::json; use super::*; + use crate::models::IndexingPipelineId; use crate::source::SourceActor; #[tokio::test] @@ -151,10 +152,16 @@ mod tests { partition: "partition".to_string(), }; let metastore = metastore_for_test(); + let pipeline_id = IndexingPipelineId { + index_uid: IndexUid::new("test-index"), + source_id: "kafka-file-source".to_string(), + node_id: "kafka-node".to_string(), + pipeline_ord: 0, + }; let vec_source = VecSourceFactory::typed_create_source( SourceExecutionContext::for_test( metastore, - IndexUid::new("test-index"), + pipeline_id, PathBuf::from("./queues"), SourceConfig { source_id: "test-vec-source".to_string(), @@ -211,10 +218,16 @@ mod tests { checkpoint.try_apply_delta(SourceCheckpointDelta::from_range(0u64..2u64))?; let metastore = metastore_for_test(); + let pipeline_id = IndexingPipelineId { + index_uid: IndexUid::new("test-index"), + source_id: "kafka-file-source".to_string(), + node_id: "kafka-node".to_string(), + pipeline_ord: 0, + }; let vec_source = VecSourceFactory::typed_create_source( SourceExecutionContext::for_test( metastore, - IndexUid::new("test-index"), + pipeline_id, PathBuf::from("./queues"), SourceConfig { source_id: "test-vec-source".to_string(), diff --git a/quickwit/quickwit-indexing/src/source/void_source.rs b/quickwit/quickwit-indexing/src/source/void_source.rs index ed58ab2192d..80f9226f12f 100644 --- a/quickwit/quickwit-indexing/src/source/void_source.rs +++ b/quickwit/quickwit-indexing/src/source/void_source.rs @@ -82,6 +82,7 @@ mod tests { use serde_json::json; use super::*; + use crate::models::IndexingPipelineId; use crate::source::{quickwit_supported_sources, SourceActor, SourceConfig}; #[tokio::test] @@ -96,9 +97,15 @@ mod tests { input_format: SourceInputFormat::Json, }; let metastore = metastore_for_test(); + let pipeline_id = IndexingPipelineId { + index_uid: IndexUid::new("test-index"), + source_id: "kafka-file-source".to_string(), + node_id: "kafka-node".to_string(), + pipeline_ord: 0, + }; let ctx = SourceExecutionContext::for_test( metastore, - IndexUid::new("test-index"), + pipeline_id, PathBuf::from("./queues"), source_config, ); @@ -113,10 +120,16 @@ mod tests { async fn test_void_source_running() -> anyhow::Result<()> { let universe = Universe::with_accelerated_time(); let metastore = metastore_for_test(); + let pipeline_id = IndexingPipelineId { + index_uid: IndexUid::new("test-index"), + source_id: "kafka-file-source".to_string(), + node_id: "kafka-node".to_string(), + pipeline_ord: 0, + }; let void_source = VoidSourceFactory::typed_create_source( SourceExecutionContext::for_test( metastore, - IndexUid::new("test-index"), + pipeline_id, PathBuf::from("./queues"), SourceConfig { source_id: "test-void-source".to_string(),