diff --git a/docker-compose.yml b/docker-compose.yml index dc026729815..4fdf9dc2903 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -77,7 +77,7 @@ services: - "${MAP_HOST_PULSAR:-127.0.0.1}:6650:6650" - "${MAP_HOST_PULSAR:-127.0.0.1}:8081:8080" environment: - PULSAR_MEM: "-Xms256M -Xmx256M" + PULSAR_MEM: "-Xms384M -Xmx384M" profiles: - all - pulsar diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 559fae6233c..536d69f428b 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -333,9 +333,9 @@ checksum = "f1fdabc7756949593fe60f30ec81974b613357de856987752631dea1e3394c80" [[package]] name = "aws-config" -version = "1.2.1" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2a4707646259764ab59fd9a50e9de2e92c637b28b36285d6f6fa030e915fbd9" +checksum = "baaa0be6ee7d90b775ae6ccb6d2ba182b91219ec2001f92338773a094246af1d" dependencies = [ "aws-credential-types", "aws-runtime", @@ -400,9 +400,9 @@ dependencies = [ [[package]] name = "aws-sdk-kinesis" -version = "1.21.0" +version = "1.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "928bf256d2aa89753732ce6876dea0debeab663af1cc860725d93a96d54f41a4" +checksum = "8bf9c1d36172650239d32e8f8cd87a825d4e598f86c44abf50a02ef7e6dd8c8f" dependencies = [ "aws-credential-types", "aws-runtime", @@ -422,9 +422,9 @@ dependencies = [ [[package]] name = "aws-sdk-s3" -version = "1.24.0" +version = "1.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f522b68eb0294c59f7beb0defa30e84fed24ebc50ee219e111d6c33eaea96a8" +checksum = "cedc97499da49c3e36cde578340f9925284685073cb3e512aaf9ab16cd9a2541" dependencies = [ "ahash 0.8.11", "aws-credential-types", @@ -457,9 +457,9 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.21.0" +version = "1.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d70fb493f4183f5102d8a8d0cc9b57aec29a762f55c0e7bf527e0f7177bb408" +checksum = "ca3d6c4cba4e009391b72b0fcf12aff04ea3c9c3aa2ecaafa330326a8bd7e601" dependencies = [ "aws-credential-types", "aws-runtime", @@ -479,9 +479,9 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.21.0" +version = "1.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de3f37549b3e38b7ea5efd419d4d7add6ea1e55223506eb0b4fef9d25e7cc90d" +checksum = "73400dc239d14f63d932f4ca7b55af5e9ef1f857f7d70655249ccc287adb2570" dependencies = [ "aws-credential-types", "aws-runtime", @@ -501,9 +501,9 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.21.0" +version = "1.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b2ff219a5d4b795cd33251c19dbe9c4b401f2b2cbe513e07c76ada644eaf34e" +checksum = "10f8858308af76fba3e5ffcf1bb56af5471574d2bdfaf0159470c25bc2f760e5" dependencies = [ "aws-credential-types", "aws-runtime", @@ -652,9 +652,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.3.1" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44e7945379821074549168917e89e60630647e186a69243248f08c6d168b975a" +checksum = "1cf64e73ef8d4dac6c933230d56d136b75b252edcf82ed36e37d603090cd7348" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -683,9 +683,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "1.4.0" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4cc56a5c96ec741de6c5e6bf1ce6948be969d6506dfa9c39cffc284e31e4979b" +checksum = "8c19fdae6e3d5ac9cd01f2d6e6c359c5f5a3e028c2d148a8f5b90bf3399a18a7" dependencies = [ "aws-smithy-async", "aws-smithy-types", diff --git a/quickwit/quickwit-common/src/rand.rs b/quickwit/quickwit-common/src/rand.rs index a9c876f5dcc..d66704ec279 100644 --- a/quickwit/quickwit-common/src/rand.rs +++ b/quickwit/quickwit-common/src/rand.rs @@ -23,12 +23,14 @@ use rand::Rng; /// Appends a random suffix composed of a hyphen and five random alphanumeric characters. pub fn append_random_suffix(string: &str) -> String { let rng = rand::thread_rng(); - let slug: String = rng - .sample_iter(&Alphanumeric) - .take(5) - .map(char::from) - .collect(); - format!("{string}-{slug}") + let mut randomized_string = String::with_capacity(string.len() + 6); + randomized_string.push_str(string); + randomized_string.push('-'); + + for random_byte in rng.sample_iter(&Alphanumeric).take(5) { + randomized_string.push(char::from(random_byte)); + } + randomized_string } #[cfg(test)] diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index 6c64ccb3580..8e4a98f8920 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -33,11 +33,8 @@ use quickwit_common::KillSwitch; use quickwit_config::{IndexingSettings, SourceConfig}; use quickwit_doc_mapper::DocMapper; use quickwit_ingest::IngesterPool; -use quickwit_metastore::IndexMetadataResponseExt; use quickwit_proto::indexing::IndexingPipelineId; -use quickwit_proto::metastore::{ - IndexMetadataRequest, MetastoreError, MetastoreService, MetastoreServiceClient, -}; +use quickwit_proto::metastore::{MetastoreError, MetastoreServiceClient}; use quickwit_proto::types::ShardId; use quickwit_storage::{Storage, StorageResolver}; use tokio::sync::Semaphore; @@ -53,7 +50,7 @@ use crate::actors::{Indexer, Packager, Publisher, Uploader}; use crate::merge_policy::MergePolicy; use crate::models::IndexingStatistics; use crate::source::{ - quickwit_supported_sources, AssignShards, Assignment, SourceActor, SourceRuntimeArgs, + quickwit_supported_sources, AssignShards, Assignment, SourceActor, SourceRuntime, }; use crate::split_store::IndexingSplitStore; use crate::SplitsUpdateMailbox; @@ -431,31 +428,17 @@ impl IndexingPipeline { ) .set_kill_switch(self.kill_switch.clone()) .spawn(doc_processor); - - // Fetch index_metadata to be sure to have the last updated checkpoint. - let index_metadata_request = IndexMetadataRequest::for_index_id(index_id.to_string()); - let index_metadata = ctx - .protect_future(self.params.metastore.index_metadata(index_metadata_request)) - .await? - .deserialize_index_metadata()?; - let source_checkpoint = index_metadata - .checkpoint - .source_checkpoint(source_id) - .cloned() - .unwrap_or_default(); // TODO Have a stricter check. + let source_runtime = SourceRuntime { + pipeline_id: self.params.pipeline_id.clone(), + source_config: self.params.source_config.clone(), + metastore: self.params.metastore.clone(), + ingester_pool: self.params.ingester_pool.clone(), + queues_dir_path: self.params.queues_dir_path.clone(), + storage_resolver: self.params.source_storage_resolver.clone(), + event_broker: self.params.event_broker.clone(), + }; let source = ctx - .protect_future(quickwit_supported_sources().load_source( - Arc::new(SourceRuntimeArgs { - pipeline_id: self.params.pipeline_id.clone(), - source_config: self.params.source_config.clone(), - metastore: self.params.metastore.clone(), - ingester_pool: self.params.ingester_pool.clone(), - queues_dir_path: self.params.queues_dir_path.clone(), - storage_resolver: self.params.source_storage_resolver.clone(), - event_broker: self.params.event_broker.clone(), - }), - source_checkpoint, - )) + .protect_future(quickwit_supported_sources().load_source(source_runtime)) .await?; let actor_source = SourceActor { source, @@ -617,7 +600,7 @@ mod tests { use quickwit_config::{IndexingSettings, SourceInputFormat, SourceParams, VoidSourceParams}; use quickwit_doc_mapper::{default_doc_mapper_for_test, DefaultDocMapper}; use quickwit_metastore::checkpoint::IndexCheckpointDelta; - use quickwit_metastore::{IndexMetadata, PublishSplitsRequestExt}; + use quickwit_metastore::{IndexMetadata, IndexMetadataResponseExt, PublishSplitsRequestExt}; use quickwit_proto::metastore::{ EmptyResponse, IndexMetadataResponse, LastDeleteOpstampResponse, MetastoreError, MockMetastoreService, @@ -643,20 +626,40 @@ mod tests { mut num_fails: usize, test_file: &str, ) -> anyhow::Result<()> { - let universe = Universe::new(); + let node_id = "test-node".to_string(); + let index_uid = IndexUid::for_test("test-index", 2); + let pipeline_id = IndexingPipelineId { + node_id, + index_uid, + source_id: "test-source".to_string(), + pipeline_uid: PipelineUid::for_test(0u128), + }; + let source_config = SourceConfig { + source_id: "test-source".to_string(), + num_pipelines: NonZeroUsize::MIN, + enabled: true, + source_params: SourceParams::file(PathBuf::from(test_file)), + transform_config: None, + input_format: SourceInputFormat::Json, + }; + let source_config_clone = source_config.clone(); + let mut mock_metastore = MockMetastoreService::new(); mock_metastore .expect_index_metadata() .withf(|index_metadata_request| { - index_metadata_request.index_id.as_ref().unwrap() == "test-index" + index_metadata_request.index_uid.as_ref().unwrap() == &("test-index", 2) }) .returning(move |_| { if num_fails == 0 { - let index_metadata = + let mut index_metadata = IndexMetadata::for_test("test-index", "ram:///indexes/test-index"); - return Ok( - IndexMetadataResponse::try_from_index_metadata(&index_metadata).unwrap(), - ); + index_metadata + .add_source(source_config_clone.clone()) + .unwrap(); + let response = + IndexMetadataResponse::try_from_index_metadata(&index_metadata).unwrap(); + return Ok(response); } num_fails -= 1; Err(MetastoreError::Timeout("timeout error".to_string())) @@ -670,10 +673,7 @@ mod tests { mock_metastore .expect_stage_splits() .withf(|stage_splits_request| -> bool { - stage_splits_request.index_uid() - == &"test-index:00000000000000000000000002" - .parse::() - .unwrap() + stage_splits_request.index_uid() == &("test-index", 2) }) .returning(|_| Ok(EmptyResponse {})); mock_metastore @@ -683,10 +683,7 @@ mod tests { .deserialize_index_checkpoint() .unwrap() .unwrap(); - publish_splits_request.index_uid() - == &"test-index:00000000000000000000000002" - .parse::() - .unwrap() + publish_splits_request.index_uid() == &("test-index", 2) && checkpoint_delta.source_id == "test-source" && publish_splits_request.staged_split_ids.len() == 1 && publish_splits_request.replaced_split_ids.is_empty() @@ -694,25 +691,11 @@ mod tests { .ends_with(":(00000000000000000000..00000000000000001030])") }) .returning(|_| Ok(EmptyResponse {})); - let node_id = "test-node"; - let pipeline_id = IndexingPipelineId { - index_uid: IndexUid::for_test("test-index", 2), - source_id: "test-source".to_string(), - node_id: node_id.to_string(), - pipeline_uid: PipelineUid::for_test(0u128), - }; - let source_config = SourceConfig { - source_id: "test-source".to_string(), - num_pipelines: NonZeroUsize::new(1).unwrap(), - enabled: true, - source_params: SourceParams::file(PathBuf::from(test_file)), - transform_config: None, - input_format: SourceInputFormat::Json, - }; + + let universe = Universe::new(); + let (merge_planner_mailbox, _) = universe.create_test_mailbox(); let storage = Arc::new(RamStorage::default()); let split_store = IndexingSplitStore::create_without_local_store_for_test(storage.clone()); - let (merge_planner_mailbox, _) = universe.create_test_mailbox(); - let event_broker = EventBroker::default(); let pipeline_params = IndexingPipelineParams { pipeline_id, doc_mapper: Arc::new(default_doc_mapper_for_test()), @@ -730,7 +713,7 @@ mod tests { max_concurrent_split_uploads_merge: 5, cooperative_indexing_permits: None, merge_planner_mailbox, - event_broker, + event_broker: EventBroker::default(), }; let pipeline = IndexingPipeline::new(pipeline_params); let (_pipeline_mailbox, pipeline_handle) = universe.spawn_builder().spawn(pipeline); @@ -762,16 +745,36 @@ mod tests { } async fn indexing_pipeline_simple(test_file: &str) -> anyhow::Result<()> { + let node_id = "test-node".to_string(); let index_uid: IndexUid = IndexUid::for_test("test-index", 1); + let pipeline_id = IndexingPipelineId { + node_id, + index_uid: index_uid.clone(), + source_id: "test-source".to_string(), + pipeline_uid: PipelineUid::for_test(0u128), + }; + let source_config = SourceConfig { + source_id: "test-source".to_string(), + num_pipelines: NonZeroUsize::MIN, + enabled: true, + source_params: SourceParams::file(PathBuf::from(test_file)), + transform_config: None, + input_format: SourceInputFormat::Json, + }; + let source_config_clone = source_config.clone(); + let mut mock_metastore = MockMetastoreService::new(); mock_metastore .expect_index_metadata() .withf(|index_metadata_request| { - index_metadata_request.index_id.as_ref().unwrap() == "test-index" + index_metadata_request.index_uid.as_ref().unwrap() == &("test-index", 1) }) - .returning(|_| { - let index_metadata = + .returning(move |_| { + let mut index_metadata = IndexMetadata::for_test("test-index", "ram:///indexes/test-index"); + index_metadata + .add_source(source_config_clone.clone()) + .unwrap(); Ok(IndexMetadataResponse::try_from_index_metadata(&index_metadata).unwrap()) }); let index_uid_clone = index_uid.clone(); @@ -800,22 +803,8 @@ mod tests { .ends_with(":(00000000000000000000..00000000000000001030])") }) .returning(|_| Ok(EmptyResponse {})); + let universe = Universe::new(); - let node_id = "test-node"; - let pipeline_id = IndexingPipelineId { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - node_id: node_id.to_string(), - pipeline_uid: PipelineUid::for_test(0u128), - }; - let source_config = SourceConfig { - source_id: "test-source".to_string(), - num_pipelines: NonZeroUsize::new(1).unwrap(), - enabled: true, - source_params: SourceParams::file(PathBuf::from(test_file)), - transform_config: None, - input_format: SourceInputFormat::Json, - }; let storage = Arc::new(RamStorage::default()); let split_store = IndexingSplitStore::create_without_local_store_for_test(storage.clone()); let (merge_planner_mailbox, _) = universe.create_test_mailbox(); @@ -861,38 +850,44 @@ mod tests { #[tokio::test] async fn test_merge_pipeline_does_not_stop_on_indexing_pipeline_failure() { - let mut mock_metastore = MockMetastoreService::new(); - mock_metastore - .expect_index_metadata() - .withf(|index_metadata_request| { - index_metadata_request.index_id.as_ref().unwrap() == "test-index" - }) - .returning(|_| { - let index_metadata = - IndexMetadata::for_test("test-index", "ram:///indexes/test-index"); - Ok(IndexMetadataResponse::try_from_index_metadata(&index_metadata).unwrap()) - }); - mock_metastore - .expect_list_splits() - .returning(|_| Ok(ServiceStream::empty())); - let universe = Universe::with_accelerated_time(); - let node_id = "test-node"; - let doc_mapper = Arc::new(default_doc_mapper_for_test()); + let node_id = "test-node".to_string(); let pipeline_id = IndexingPipelineId { + node_id, index_uid: IndexUid::new_with_random_ulid("test-index"), source_id: "test-source".to_string(), - node_id: node_id.to_string(), pipeline_uid: PipelineUid::for_test(0u128), }; let source_config = SourceConfig { source_id: "test-source".to_string(), - num_pipelines: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::MIN, enabled: true, source_params: SourceParams::Void(VoidSourceParams), transform_config: None, input_format: SourceInputFormat::Json, }; + let source_config_clone = source_config.clone(); + + let mut mock_metastore = MockMetastoreService::new(); + mock_metastore + .expect_index_metadata() + .withf(|index_metadata_request| { + index_metadata_request.index_uid.as_ref().unwrap() == &("test-index", 2) + }) + .returning(move |_| { + let mut index_metadata = + IndexMetadata::for_test("test-index", "ram:///indexes/test-index"); + index_metadata + .add_source(source_config_clone.clone()) + .unwrap(); + Ok(IndexMetadataResponse::try_from_index_metadata(&index_metadata).unwrap()) + }); + mock_metastore + .expect_list_splits() + .returning(|_| Ok(ServiceStream::empty())); let metastore = MetastoreServiceClient::from_mock(mock_metastore); + + let universe = Universe::with_accelerated_time(); + let doc_mapper = Arc::new(default_doc_mapper_for_test()); let storage = Arc::new(RamStorage::default()); let split_store = IndexingSplitStore::create_without_local_store_for_test(storage.clone()); let merge_pipeline_params = MergePipelineParams { @@ -957,16 +952,37 @@ mod tests { } async fn indexing_pipeline_all_failures_handling(test_file: &str) -> anyhow::Result<()> { + let node_id = "test-node".to_string(); let index_uid: IndexUid = IndexUid::for_test("test-index", 2); + let pipeline_id = IndexingPipelineId { + node_id, + index_uid: index_uid.clone(), + source_id: "test-source".to_string(), + pipeline_uid: PipelineUid::for_test(0u128), + }; + let source_config = SourceConfig { + source_id: "test-source".to_string(), + num_pipelines: NonZeroUsize::MIN, + enabled: true, + source_params: SourceParams::file(PathBuf::from(test_file)), + transform_config: None, + input_format: SourceInputFormat::Json, + }; + let source_config_clone = source_config.clone(); + let mut mock_metastore = MockMetastoreService::new(); mock_metastore .expect_index_metadata() .withf(|index_metadata_request| { - index_metadata_request.index_id.as_ref().unwrap() == "test-index" + index_metadata_request.index_uid.as_ref().unwrap() == &("test-index", 2) }) - .returning(|_| { - let index_metadata = + .returning(move |_| { + let mut index_metadata = IndexMetadata::for_test("test-index", "ram:///indexes/test-index"); + index_metadata + .add_source(source_config_clone.clone()) + .unwrap(); + Ok(IndexMetadataResponse::try_from_index_metadata(&index_metadata).unwrap()) }); let index_uid_clone = index_uid.clone(); @@ -995,21 +1011,6 @@ mod tests { }) .returning(|_| Ok(EmptyResponse {})); let universe = Universe::new(); - let node_id = "test-node"; - let pipeline_id = IndexingPipelineId { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - node_id: node_id.to_string(), - pipeline_uid: PipelineUid::for_test(0u128), - }; - let source_config = SourceConfig { - source_id: "test-source".to_string(), - num_pipelines: NonZeroUsize::new(1).unwrap(), - enabled: true, - source_params: SourceParams::file(PathBuf::from(test_file)), - transform_config: None, - input_format: SourceInputFormat::Json, - }; let storage = Arc::new(RamStorage::default()); let split_store = IndexingSplitStore::create_without_local_store_for_test(storage.clone()); let (merge_planner_mailbox, _) = universe.create_test_mailbox(); diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index 70a7433e680..04dd2124190 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -975,7 +975,7 @@ mod tests { // Test `spawn_pipeline`. let source_config_0 = SourceConfig { source_id: "test-indexing-service--source-0".to_string(), - num_pipelines: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::MIN, enabled: true, source_params: SourceParams::void(), transform_config: None, @@ -1050,19 +1050,9 @@ mod tests { let index_uri = format!("ram:///indexes/{index_id}"); let index_config = IndexConfig::for_test(&index_id, &index_uri); - let create_index_request = - CreateIndexRequest::try_from_index_config(&index_config).unwrap(); - metastore.create_index(create_index_request).await.unwrap(); - - let universe = Universe::new(); - let temp_dir = tempfile::tempdir().unwrap(); - let (indexing_service, indexing_server_handle) = - spawn_indexing_service_for_test(temp_dir.path(), &universe, metastore, cluster).await; - - // Test `supervise_pipelines` let source_config = SourceConfig { source_id: "test-indexing-service--source".to_string(), - num_pipelines: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::MIN, enabled: true, source_params: SourceParams::Vec(VecSourceParams { docs: Vec::new(), @@ -1072,6 +1062,18 @@ mod tests { transform_config: None, input_format: SourceInputFormat::Json, }; + let create_index_request = CreateIndexRequest::try_from_index_and_source_configs( + &index_config, + &[source_config.clone()], + ) + .unwrap(); + metastore.create_index(create_index_request).await.unwrap(); + + let universe = Universe::new(); + let temp_dir = tempfile::tempdir().unwrap(); + let (indexing_service, indexing_server_handle) = + spawn_indexing_service_for_test(temp_dir.path(), &universe, metastore, cluster).await; + indexing_service .ask_for_res(SpawnPipeline { index_id: index_id.clone(), @@ -1132,7 +1134,7 @@ mod tests { // Test `apply plan`. let source_config_1 = SourceConfig { source_id: "test-indexing-service--source-1".to_string(), - num_pipelines: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::MIN, enabled: true, source_params: SourceParams::void(), transform_config: None, @@ -1337,7 +1339,7 @@ mod tests { let source_config = SourceConfig { source_id: "test-indexing-service--source".to_string(), - num_pipelines: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::MIN, enabled: true, source_params: SourceParams::void(), transform_config: None, @@ -1466,7 +1468,7 @@ mod tests { let mut index_metadata = IndexMetadata::for_test(&index_id, &index_uri); let source_config = SourceConfig { source_id: "test-indexing-service--source".to_string(), - num_pipelines: NonZeroUsize::new(1).unwrap(), + num_pipelines: NonZeroUsize::MIN, enabled: true, source_params: SourceParams::void(), transform_config: None, diff --git a/quickwit/quickwit-indexing/src/source/file_source.rs b/quickwit/quickwit-indexing/src/source/file_source.rs index b84153aa355..0faf35b4f83 100644 --- a/quickwit/quickwit-indexing/src/source/file_source.rs +++ b/quickwit/quickwit-indexing/src/source/file_source.rs @@ -17,9 +17,9 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use std::borrow::Borrow; use std::ffi::OsStr; use std::path::Path; -use std::sync::Arc; use std::time::Duration; use std::{fmt, io}; @@ -30,7 +30,7 @@ use bytes::Bytes; use quickwit_actors::{ActorExitStatus, Mailbox}; use quickwit_common::uri::Uri; use quickwit_config::FileSourceParams; -use quickwit_metastore::checkpoint::{PartitionId, SourceCheckpoint}; +use quickwit_metastore::checkpoint::PartitionId; use quickwit_proto::metastore::SourceType; use quickwit_proto::types::Position; use serde::Serialize; @@ -39,7 +39,7 @@ use tracing::info; use super::BatchBuilder; use crate::actors::DocProcessor; -use crate::source::{Source, SourceContext, SourceRuntimeArgs, TypedSourceFactory}; +use crate::source::{Source, SourceContext, SourceRuntime, TypedSourceFactory}; /// Number of bytes after which a new batch is cut. pub(crate) const BATCH_NUM_BYTES_LIMIT: u64 = 500_000u64; @@ -113,7 +113,7 @@ impl Source for FileSource { .await?; } if reached_eof { - info!("EOF"); + info!("reached end of file"); ctx.send_exit_with_success(doc_processor_mailbox).await?; return Err(ActorExitStatus::Success); } @@ -121,7 +121,7 @@ impl Source for FileSource { } fn name(&self) -> String { - format!("FileSource{{source_id={}}}", self.source_id) + format!("{:?}", self) } fn observable_state(&self) -> serde_json::Value { @@ -136,15 +136,15 @@ impl TypedSourceFactory for FileSourceFactory { type Source = FileSource; type Params = FileSourceParams; - // TODO handle checkpoint for files. async fn typed_create_source( - ctx: Arc, + source_runtime: SourceRuntime, params: FileSourceParams, - checkpoint: SourceCheckpoint, ) -> anyhow::Result { + let checkpoint = source_runtime.fetch_checkpoint().await?; let mut offset = 0; + let reader: FileSourceReader = if let Some(filepath) = ¶ms.filepath { - let partition_id = PartitionId::from(filepath.to_string_lossy().to_string()); + let partition_id = PartitionId::from(filepath.to_string_lossy().borrow()); offset = checkpoint .position_for_partition(&partition_id) .map(|position| { @@ -154,7 +154,7 @@ impl TypedSourceFactory for FileSourceFactory { }) .unwrap_or(0); let (dir_uri, file_name) = dir_and_filename(filepath)?; - let storage = ctx.storage_resolver.resolve(&dir_uri).await?; + let storage = source_runtime.storage_resolver.resolve(&dir_uri).await?; let file_size = storage.file_num_bytes(file_name).await?.try_into().unwrap(); // If it's a gzip file, we can't seek to a specific offset, we need to start from the // beginning of the file, decompress and skip the first `offset` bytes. @@ -172,7 +172,7 @@ impl TypedSourceFactory for FileSourceFactory { FileSourceReader::new(Box::new(tokio::io::stdin()), 0) }; let file_source = FileSource { - source_id: ctx.source_id().to_string(), + source_id: source_runtime.source_id().to_string(), counters: FileSourceCounters { previous_offset: offset as u64, current_offset: offset as u64, @@ -239,17 +239,16 @@ pub(crate) fn dir_and_filename(filepath: &Path) -> anyhow::Result<(Uri, &Path)> mod tests { use std::io::{Cursor, Write}; use std::num::NonZeroUsize; - use std::path::PathBuf; use async_compression::tokio::write::GzipEncoder; use quickwit_actors::{Command, Universe}; use quickwit_config::{SourceConfig, SourceInputFormat, SourceParams}; - use quickwit_metastore::checkpoint::{SourceCheckpoint, SourceCheckpointDelta}; - use quickwit_metastore::metastore_for_test; + use quickwit_metastore::checkpoint::SourceCheckpointDelta; use quickwit_proto::types::IndexUid; use super::*; use crate::models::RawDocBatch; + use crate::source::tests::SourceRuntimeBuilder; use crate::source::SourceActor; #[tokio::test] @@ -274,19 +273,11 @@ mod tests { transform_config: None, input_format: SourceInputFormat::Json, }; - let metastore = metastore_for_test(); - let file_source = FileSourceFactory::typed_create_source( - SourceRuntimeArgs::for_test( - IndexUid::new_with_random_ulid("test-index"), - source_config, - metastore, - PathBuf::from("./queues"), - ), - params, - SourceCheckpoint::default(), - ) - .await - .unwrap(); + let index_uid = IndexUid::new_with_random_ulid("test-index"); + let source_runtime = SourceRuntimeBuilder::new(index_uid, source_config).build(); + let file_source = FileSourceFactory::typed_create_source(source_runtime, params) + .await + .unwrap(); let file_source_actor = SourceActor { source: Box::new(file_source), doc_processor_mailbox, @@ -356,21 +347,13 @@ mod tests { transform_config: None, input_format: SourceInputFormat::Json, }; - let metastore = metastore_for_test(); - let source = FileSourceFactory::typed_create_source( - SourceRuntimeArgs::for_test( - IndexUid::new_with_random_ulid("test-index"), - source_config, - metastore, - PathBuf::from("./queues"), - ), - params, - SourceCheckpoint::default(), - ) - .await - .unwrap(); + let index_uid = IndexUid::new_with_random_ulid("test-index"); + let source_runtime = SourceRuntimeBuilder::new(index_uid, source_config).build(); + let file_source = FileSourceFactory::typed_create_source(source_runtime, params) + .await + .unwrap(); let file_source_actor = SourceActor { - source: Box::new(source), + source: Box::new(file_source), doc_processor_mailbox, }; let (_file_source_mailbox, file_source_handle) = @@ -446,16 +429,6 @@ mod tests { temp_file.flush().unwrap(); let params = FileSourceParams::file(&temp_file_path); - let mut checkpoint = SourceCheckpoint::default(); - let partition_id = PartitionId::from(temp_file_path.to_string_lossy().to_string()); - let checkpoint_delta = SourceCheckpointDelta::from_partition_delta( - partition_id, - Position::offset(0u64), - Position::offset(4u64), - ) - .unwrap(); - checkpoint.try_apply_delta(checkpoint_delta).unwrap(); - let source_config = SourceConfig { source_id: "test-file-source".to_string(), num_pipelines: NonZeroUsize::new(1).unwrap(), @@ -464,21 +437,25 @@ mod tests { transform_config: None, input_format: SourceInputFormat::Json, }; - let metastore = metastore_for_test(); - let source = FileSourceFactory::typed_create_source( - SourceRuntimeArgs::for_test( - IndexUid::new_with_random_ulid("test-index"), - source_config, - metastore, - PathBuf::from("./queues"), - ), - params, - checkpoint, + let partition_id = PartitionId::from(temp_file_path.to_string_lossy().borrow()); + let source_checkpoint_delta = SourceCheckpointDelta::from_partition_delta( + partition_id, + Position::Beginning, + Position::offset(4u64), ) - .await .unwrap(); + + let index_uid = IndexUid::new_with_random_ulid("test-index"); + let source_runtime = SourceRuntimeBuilder::new(index_uid, source_config) + .with_mock_metastore(Some(source_checkpoint_delta)) + .with_queues_dir(temp_file_path) + .build(); + + let file_source = FileSourceFactory::typed_create_source(source_runtime, params) + .await + .unwrap(); let file_source_actor = SourceActor { - source: Box::new(source), + source: Box::new(file_source), doc_processor_mailbox, }; let (_file_source_mailbox, file_source_handle) = diff --git a/quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs b/quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs index bf0ccb72702..7166f1d1fb1 100644 --- a/quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs +++ b/quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs @@ -17,7 +17,6 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::sync::Arc; use std::time::{Duration, Instant}; use std::{fmt, mem}; @@ -40,7 +39,7 @@ use tracing::{debug, info, warn}; use super::{SourceActor, BATCH_NUM_BYTES_LIMIT, EMIT_BATCHES_TIMEOUT}; use crate::actors::DocProcessor; -use crate::source::{BatchBuilder, Source, SourceContext, SourceRuntimeArgs, TypedSourceFactory}; +use crate::source::{BatchBuilder, Source, SourceContext, SourceRuntime, TypedSourceFactory}; const DEFAULT_MAX_MESSAGES_PER_PULL: i32 = 1_000; @@ -52,11 +51,10 @@ impl TypedSourceFactory for GcpPubSubSourceFactory { type Params = PubSubSourceParams; async fn typed_create_source( - ctx: Arc, - params: PubSubSourceParams, - _checkpoint: SourceCheckpoint, // TODO: Use checkpoint! + source_runtime: SourceRuntime, + source_params: PubSubSourceParams, ) -> anyhow::Result { - GcpPubSubSource::try_new(ctx, params).await + GcpPubSubSource::try_new(source_runtime, source_params).await } } @@ -75,7 +73,7 @@ pub struct GcpPubSubSourceState { } pub struct GcpPubSubSource { - ctx: Arc, + source_runtime: SourceRuntime, subscription_name: String, subscription: Subscription, state: GcpPubSubSourceState, @@ -88,8 +86,8 @@ impl fmt::Debug for GcpPubSubSource { fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result { formatter .debug_struct("GcpPubSubSource") - .field("index_id", &self.ctx.index_id()) - .field("source_id", &self.ctx.source_id()) + .field("index_id", &self.source_runtime.index_id()) + .field("source_id", &self.source_runtime.source_id()) .field("subscription", &self.subscription) .finish() } @@ -97,16 +95,16 @@ impl fmt::Debug for GcpPubSubSource { impl GcpPubSubSource { pub async fn try_new( - ctx: Arc, - params: PubSubSourceParams, + source_runtime: SourceRuntime, + source_params: PubSubSourceParams, ) -> anyhow::Result { - let subscription_name = params.subscription; - let backfill_mode_enabled = params.enable_backfill_mode; - let max_messages_per_pull = params + let subscription_name = source_params.subscription; + let backfill_mode_enabled = source_params.enable_backfill_mode; + let max_messages_per_pull = source_params .max_messages_per_pull .unwrap_or(DEFAULT_MAX_MESSAGES_PER_PULL); - let mut client_config: ClientConfig = match params.credentials_file { + let mut client_config: ClientConfig = match source_params.credentials_file { Some(credentials_file) => { let credentials = CredentialsFile::new_from_file(credentials_file.clone()) .await @@ -121,8 +119,8 @@ impl GcpPubSubSource { } .context("failed to create GCP PubSub client config")?; - if params.project_id.is_some() { - client_config.project_id = params.project_id + if source_params.project_id.is_some() { + client_config.project_id = source_params.project_id } let client = Client::new(client_config) @@ -134,17 +132,17 @@ impl GcpPubSubSource { let partition_id = PartitionId::from(partition_id); info!( - index_id=%ctx.index_id(), - source_id=%ctx.source_id(), + index_id=%source_runtime.index_id(), + source_id=%source_runtime.source_id(), subscription=%subscription_name, max_messages_per_pull=%max_messages_per_pull, - "Starting GCP PubSub source." + "starting GCP PubSub source" ); if !subscription.exists(Some(RetrySetting::default())).await? { anyhow::bail!("GCP PubSub subscription `{subscription_name}` does not exist"); } Ok(Self { - ctx, + source_runtime, subscription_name, subscription, state: GcpPubSubSourceState::default(), @@ -223,13 +221,13 @@ impl Source for GcpPubSubSource { } fn name(&self) -> String { - format!("GcpPubSubSource{{source_id={}}}", self.ctx.source_id()) + format!("{:?}", self) } fn observable_state(&self) -> JsonValue { json!({ - "index_id": self.ctx.index_id(), - "source_id": self.ctx.source_id(), + "index_id": self.source_runtime.index_id(), + "source_id": self.source_runtime.source_id(), "subscription": self.subscription_name, "num_bytes_processed": self.state.num_bytes_processed, "num_messages_processed": self.state.num_messages_processed, @@ -289,20 +287,19 @@ impl GcpPubSubSource { mod gcp_pubsub_emulator_tests { use std::env::var; use std::num::NonZeroUsize; - use std::path::PathBuf; use google_cloud_googleapis::pubsub::v1::PubsubMessage; use google_cloud_pubsub::publisher::Publisher; use google_cloud_pubsub::subscription::SubscriptionConfig; use quickwit_actors::Universe; use quickwit_config::{SourceConfig, SourceInputFormat, SourceParams}; - use quickwit_metastore::metastore_for_test; use quickwit_proto::types::IndexUid; use serde_json::json; use super::*; use crate::models::RawDocBatch; use crate::source::quickwit_supported_sources; + use crate::source::tests::SourceRuntimeBuilder; static GCP_TEST_PROJECT: &str = "quickwit-emulator"; @@ -354,20 +351,16 @@ mod gcp_pubsub_emulator_tests { let index_id = append_random_suffix("test-gcp-pubsub-source--invalid-subscription--index"); let index_uid = IndexUid::new_with_random_ulid(&index_id); - let metastore = metastore_for_test(); let SourceParams::PubSub(params) = source_config.clone().source_params else { panic!( "Expected `SourceParams::GcpPubSub` source params, got {:?}", source_config.source_params ); }; - let ctx = SourceRuntimeArgs::for_test( - index_uid, - source_config, - metastore, - PathBuf::from("./queues"), - ); - GcpPubSubSource::try_new(ctx, params).await.unwrap_err(); + let source_runtime = SourceRuntimeBuilder::new(index_uid, source_config).build(); + GcpPubSubSource::try_new(source_runtime, params) + .await + .unwrap_err(); } #[ignore] @@ -383,7 +376,6 @@ mod gcp_pubsub_emulator_tests { let source_id = source_config.source_id.clone(); let source_loader = quickwit_supported_sources(); - let metastore = metastore_for_test(); let index_id: String = append_random_suffix("test-gcp-pubsub-source--index"); let index_uid = IndexUid::new_with_random_ulid(&index_id); @@ -399,18 +391,8 @@ mod gcp_pubsub_emulator_tests { for awaiter in awaiters { awaiter.get().await.unwrap(); } - let source = source_loader - .load_source( - SourceRuntimeArgs::for_test( - index_uid, - source_config, - metastore, - PathBuf::from("./queues"), - ), - SourceCheckpoint::default(), - ) - .await - .unwrap(); + let source_runtime = SourceRuntimeBuilder::new(index_uid, source_config).build(); + let source = source_loader.load_source(source_runtime).await.unwrap(); let (doc_processor_mailbox, doc_processor_inbox) = universe.create_test_mailbox(); let source_actor = SourceActor { diff --git a/quickwit/quickwit-indexing/src/source/ingest/mod.rs b/quickwit/quickwit-indexing/src/source/ingest/mod.rs index f912ba48063..e6c53b0fcf7 100644 --- a/quickwit/quickwit-indexing/src/source/ingest/mod.rs +++ b/quickwit/quickwit-indexing/src/source/ingest/mod.rs @@ -19,7 +19,6 @@ use std::collections::BTreeSet; use std::fmt; -use std::sync::Arc; use std::time::Duration; use anyhow::Context; @@ -51,8 +50,8 @@ use tracing::{debug, error, info, warn}; use ulid::Ulid; use super::{ - BatchBuilder, Source, SourceContext, SourceRuntimeArgs, TypedSourceFactory, - BATCH_NUM_BYTES_LIMIT, EMIT_BATCHES_TIMEOUT, + BatchBuilder, Source, SourceContext, SourceRuntime, TypedSourceFactory, BATCH_NUM_BYTES_LIMIT, + EMIT_BATCHES_TIMEOUT, }; use crate::actors::DocProcessor; use crate::models::{LocalShardPositionsUpdate, NewPublishLock, NewPublishToken, PublishLock}; @@ -65,9 +64,8 @@ impl TypedSourceFactory for IngestSourceFactory { type Params = (); async fn typed_create_source( - runtime_args: Arc, + source_runtime: SourceRuntime, _params: Self::Params, - _checkpoint: SourceCheckpoint, ) -> anyhow::Result { // Retry parameters for the fetch stream: retry indefinitely until the shard is complete or // unassigned. @@ -76,7 +74,7 @@ impl TypedSourceFactory for IngestSourceFactory { base_delay: Duration::from_secs(5), max_delay: Duration::from_secs(10 * 60), // 10 minutes }; - IngestSource::try_new(runtime_args, retry_params).await + IngestSource::try_new(source_runtime, retry_params).await } } @@ -161,20 +159,20 @@ impl fmt::Debug for IngestSource { impl IngestSource { pub async fn try_new( - runtime_args: Arc, + source_runtime: SourceRuntime, retry_params: RetryParams, ) -> anyhow::Result { - let self_node_id: NodeId = runtime_args.node_id().into(); + let self_node_id: NodeId = source_runtime.node_id().into(); let client_id = ClientId::new( self_node_id.clone(), SourceUid { - index_uid: runtime_args.index_uid().clone(), - source_id: runtime_args.source_id().to_string(), + index_uid: source_runtime.index_uid().clone(), + source_id: source_runtime.source_id().to_string(), }, - runtime_args.pipeline_uid(), + source_runtime.pipeline_uid(), ); - let metastore = runtime_args.metastore.clone(); - let ingester_pool = runtime_args.ingester_pool.clone(); + let metastore = source_runtime.metastore.clone(); + let ingester_pool = source_runtime.ingester_pool.clone(); let assigned_shards = FnvHashMap::default(); let fetch_stream = MultiFetchStream::new( self_node_id, @@ -195,7 +193,7 @@ impl IngestSource { fetch_stream, publish_lock, publish_token, - event_broker: runtime_args.event_broker.clone(), + event_broker: source_runtime.event_broker.clone(), }) } @@ -913,7 +911,7 @@ mod tests { let event_broker = EventBroker::default(); - let runtime_args: Arc = Arc::new(SourceRuntimeArgs { + let source_runtime = SourceRuntime { pipeline_id, source_config, metastore: MetastoreServiceClient::from_mock(mock_metastore), @@ -921,9 +919,9 @@ mod tests { queues_dir_path: PathBuf::from("./queues"), storage_resolver: StorageResolver::for_test(), event_broker, - }); + }; let retry_params = RetryParams::no_retries(); - let mut source = IngestSource::try_new(runtime_args, retry_params) + let mut source = IngestSource::try_new(source_runtime, retry_params) .await .unwrap(); @@ -1112,7 +1110,7 @@ mod tests { }) .forever(); - let runtime_args = Arc::new(SourceRuntimeArgs { + let source_runtime = SourceRuntime { pipeline_id, source_config, metastore: MetastoreServiceClient::from_mock(mock_metastore), @@ -1120,9 +1118,9 @@ mod tests { queues_dir_path: PathBuf::from("./queues"), storage_resolver: StorageResolver::for_test(), event_broker, - }); + }; let retry_params = RetryParams::for_test(); - let mut source = IngestSource::try_new(runtime_args, retry_params) + let mut source = IngestSource::try_new(source_runtime, retry_params) .await .unwrap(); @@ -1272,7 +1270,7 @@ mod tests { }) .forever(); - let runtime_args = Arc::new(SourceRuntimeArgs { + let source_runtime = SourceRuntime { pipeline_id, source_config, metastore: MetastoreServiceClient::from_mock(mock_metastore), @@ -1280,9 +1278,9 @@ mod tests { queues_dir_path: PathBuf::from("./queues"), storage_resolver: StorageResolver::for_test(), event_broker, - }); + }; let retry_params = RetryParams::for_test(); - let mut source = IngestSource::try_new(runtime_args, retry_params) + let mut source = IngestSource::try_new(source_runtime, retry_params) .await .unwrap(); @@ -1337,7 +1335,7 @@ mod tests { let ingester_pool = IngesterPool::default(); let event_broker = EventBroker::default(); - let runtime_args = Arc::new(SourceRuntimeArgs { + let source_runtime = SourceRuntime { pipeline_id, source_config, metastore: MetastoreServiceClient::from_mock(mock_metastore), @@ -1345,9 +1343,9 @@ mod tests { queues_dir_path: PathBuf::from("./queues"), storage_resolver: StorageResolver::for_test(), event_broker, - }); + }; let retry_params = RetryParams::for_test(); - let mut source = IngestSource::try_new(runtime_args, retry_params) + let mut source = IngestSource::try_new(source_runtime, retry_params) .await .unwrap(); @@ -1568,7 +1566,7 @@ mod tests { ingester_pool.insert("test-ingester-0".into(), ingester_0.clone()); let event_broker = EventBroker::default(); - let runtime_args = Arc::new(SourceRuntimeArgs { + let source_runtime = SourceRuntime { pipeline_id, source_config, metastore: MetastoreServiceClient::from_mock(mock_metastore), @@ -1576,9 +1574,9 @@ mod tests { queues_dir_path: PathBuf::from("./queues"), storage_resolver: StorageResolver::for_test(), event_broker, - }); + }; let retry_params = RetryParams::for_test(); - let mut source = IngestSource::try_new(runtime_args, retry_params) + let mut source = IngestSource::try_new(source_runtime, retry_params) .await .unwrap(); @@ -1722,7 +1720,7 @@ mod tests { }) .forever(); - let runtime_args = Arc::new(SourceRuntimeArgs { + let source_runtime = SourceRuntime { pipeline_id, source_config, metastore: MetastoreServiceClient::from_mock(mock_metastore), @@ -1730,9 +1728,9 @@ mod tests { queues_dir_path: PathBuf::from("./queues"), storage_resolver: StorageResolver::for_test(), event_broker, - }); + }; let retry_params = RetryParams::for_test(); - let mut source = IngestSource::try_new(runtime_args, retry_params) + let mut source = IngestSource::try_new(source_runtime, retry_params) .await .unwrap(); diff --git a/quickwit/quickwit-indexing/src/source/ingest_api_source.rs b/quickwit/quickwit-indexing/src/source/ingest_api_source.rs index 27573384aff..9f1b30b954b 100644 --- a/quickwit/quickwit-indexing/src/source/ingest_api_source.rs +++ b/quickwit/quickwit-indexing/src/source/ingest_api_source.rs @@ -18,7 +18,6 @@ // along with this program. If not, see . use std::fmt; -use std::sync::Arc; use std::time::Duration; use anyhow::bail; @@ -37,7 +36,7 @@ use tracing::{error, info}; use super::{BatchBuilder, Source, SourceActor, SourceContext, TypedSourceFactory}; use crate::actors::DocProcessor; -use crate::source::SourceRuntimeArgs; +use crate::source::SourceRuntime; /// Wait time for SourceActor before pooling for new documents. /// TODO: Think of better way, maybe increment this (i.e wait longer) as time @@ -58,7 +57,7 @@ pub struct IngestApiSourceCounters { } pub struct IngestApiSource { - runtime_args: Arc, + source_runtime: SourceRuntime, source_id: String, partition_id: PartitionId, ingest_api_service: Mailbox, @@ -72,23 +71,20 @@ impl fmt::Debug for IngestApiSource { } impl IngestApiSource { - pub async fn try_new( - runtime_args: Arc, - checkpoint: SourceCheckpoint, - ) -> anyhow::Result { - let source_id = runtime_args.source_id().to_string(); - let queues_dir_path = runtime_args.queues_dir_path.as_path(); + pub async fn try_new(source_runtime: SourceRuntime) -> anyhow::Result { + let source_id = source_runtime.source_id().to_string(); + let queues_dir_path = source_runtime.queues_dir_path.as_path(); let ingest_api_service = get_ingest_api_service(queues_dir_path).await?; let partition_id: PartitionId = ingest_api_service.ask(GetPartitionId).await?.into(); // Ensure a queue for this index exists. let create_queue_req = CreateQueueIfNotExistsRequest { - queue_id: runtime_args.index_id().to_string(), + queue_id: source_runtime.index_id().to_string(), }; match ingest_api_service.ask_for_res(create_queue_req).await { Ok(response) if response.created => { info!( - index_id = runtime_args.index_id(), + index_id = source_runtime.index_id(), %partition_id, "created queue successfully" ); @@ -96,7 +92,7 @@ impl IngestApiSource { Ok(_) => {} Err(error) => { error!( - index_id = runtime_args.index_id(), + index_id = source_runtime.index_id(), %partition_id, %error, "failed to create queue" @@ -104,12 +100,13 @@ impl IngestApiSource { bail!(error); } } + let checkpoint = source_runtime.fetch_checkpoint().await?; let previous_offset: Option = checkpoint .position_for_partition(&partition_id) .map(|position| position.as_u64().expect("offset should be stored as u64")); let current_offset = previous_offset; let ingest_api_source = IngestApiSource { - runtime_args, + source_runtime, source_id, partition_id, ingest_api_service, @@ -128,7 +125,7 @@ impl IngestApiSource { ctx: &ActorContext, ) -> anyhow::Result<()> { let suggest_truncate_req = SuggestTruncateRequest { - index_id: self.runtime_args.index_id().to_string(), + index_id: self.source_runtime.index_id().to_string(), up_to_position_included, }; ctx.ask_for_res(&self.ingest_api_service, suggest_truncate_req) @@ -163,7 +160,7 @@ impl Source for IngestApiSource { ctx: &SourceContext, ) -> Result { let fetch_req = FetchRequest { - index_id: self.runtime_args.index_id().to_string(), + index_id: self.source_runtime.index_id().to_string(), start_after: self.counters.current_offset, num_bytes_limit: None, }; @@ -246,11 +243,10 @@ impl TypedSourceFactory for IngestApiSourceFactory { type Params = (); async fn typed_create_source( - ctx: Arc, + source_runtime: SourceRuntime, _: (), - checkpoint: SourceCheckpoint, ) -> anyhow::Result { - IngestApiSource::try_new(ctx, checkpoint).await + IngestApiSource::try_new(source_runtime).await } } @@ -266,12 +262,12 @@ mod tests { IngestApiConfig, SourceConfig, SourceInputFormat, SourceParams, INGEST_API_SOURCE_ID, }; use quickwit_ingest::{init_ingest_api, CommitType, DocBatchBuilder, IngestRequest}; - use quickwit_metastore::checkpoint::{SourceCheckpoint, SourceCheckpointDelta}; - use quickwit_metastore::metastore_for_test; + use quickwit_metastore::checkpoint::SourceCheckpointDelta; use quickwit_proto::types::IndexUid; use super::*; use crate::models::RawDocBatch; + use crate::source::tests::SourceRuntimeBuilder; use crate::source::SourceActor; fn make_ingest_request( @@ -313,7 +309,6 @@ mod tests { #[tokio::test] async fn test_ingest_api_source() -> anyhow::Result<()> { let universe = Universe::with_accelerated_time(); - let metastore = metastore_for_test(); let index_id = append_random_suffix("test-ingest-api-source"); let index_uid = IndexUid::new_with_random_ulid(&index_id); let temp_dir = tempfile::tempdir()?; @@ -323,13 +318,10 @@ mod tests { init_ingest_api(&universe, queues_dir_path, &IngestApiConfig::default()).await?; let (doc_processor_mailbox, doc_processor_inbox) = universe.create_test_mailbox(); let source_config = make_source_config(); - let ctx = SourceRuntimeArgs::for_test( - index_uid, - source_config, - metastore, - queues_dir_path.to_path_buf(), - ); - let ingest_api_source = IngestApiSource::try_new(ctx, SourceCheckpoint::default()).await?; + let source_runtime = SourceRuntimeBuilder::new(index_uid, source_config) + .with_queues_dir(queues_dir_path) + .build(); + let ingest_api_source = IngestApiSource::try_new(source_runtime).await?; let ingest_api_source_actor = SourceActor { source: Box::new(ingest_api_source), doc_processor_mailbox, @@ -402,7 +394,6 @@ mod tests { #[tokio::test] async fn test_ingest_api_source_resume_from_checkpoint() -> anyhow::Result<()> { let universe = Universe::with_accelerated_time(); - let metastore = metastore_for_test(); let index_id = append_random_suffix("test-ingest-api-source"); let index_uid = IndexUid::new_with_random_ulid(&index_id); let temp_dir = tempfile::tempdir()?; @@ -425,24 +416,21 @@ mod tests { .map_err(|err| anyhow::anyhow!(err.to_string()))?; let (doc_processor_mailbox, doc_processor_inbox) = universe.create_test_mailbox(); - let mut checkpoint = SourceCheckpoint::default(); let partition_id: PartitionId = ingest_api_service.ask(GetPartitionId).await?.into(); let checkpoint_delta = SourceCheckpointDelta::from_partition_delta( partition_id.clone(), - Position::offset(0u64), + Position::Beginning, Position::offset(1200u64), ) .unwrap(); - checkpoint.try_apply_delta(checkpoint_delta).unwrap(); let source_config = make_source_config(); - let ctx = SourceRuntimeArgs::for_test( - index_uid, - source_config, - metastore, - queues_dir_path.to_path_buf(), - ); - let ingest_api_source = IngestApiSource::try_new(ctx, checkpoint).await?; + let source_runtime = SourceRuntimeBuilder::new(index_uid, source_config) + .with_mock_metastore(Some(checkpoint_delta)) + .with_queues_dir(queues_dir_path) + .build(); + + let ingest_api_source = IngestApiSource::try_new(source_runtime).await?; let ingest_api_source_actor = SourceActor { source: Box::new(ingest_api_source), doc_processor_mailbox, @@ -481,7 +469,6 @@ mod tests { #[tokio::test] async fn test_ingest_api_source_with_one_doc() -> anyhow::Result<()> { let universe = Universe::with_accelerated_time(); - let metastore = metastore_for_test(); let index_id = append_random_suffix("test-ingest-api-source"); let index_uid = IndexUid::new_with_random_ulid(&index_id); let temp_dir = tempfile::tempdir()?; @@ -491,13 +478,11 @@ mod tests { let (doc_processor_mailbox, doc_processor_inbox) = universe.create_test_mailbox(); let source_config = make_source_config(); - let ctx = SourceRuntimeArgs::for_test( - index_uid, - source_config, - metastore, - queues_dir_path.to_path_buf(), - ); - let ingest_api_source = IngestApiSource::try_new(ctx, SourceCheckpoint::default()).await?; + let source_runtime = SourceRuntimeBuilder::new(index_uid, source_config) + .with_queues_dir(queues_dir_path) + .build(); + + let ingest_api_source = IngestApiSource::try_new(source_runtime).await?; let ingest_api_source_actor = SourceActor { source: Box::new(ingest_api_source), doc_processor_mailbox, @@ -535,7 +520,6 @@ mod tests { #[tokio::test] async fn test_ingest_api_source_with_force_commit() -> anyhow::Result<()> { let universe = Universe::with_accelerated_time(); - let metastore = metastore_for_test(); let index_id = append_random_suffix("test-ingest-api-source"); let index_uid = IndexUid::new_with_random_ulid(&index_id); let temp_dir = tempfile::tempdir()?; @@ -545,13 +529,11 @@ mod tests { init_ingest_api(&universe, queues_dir_path, &IngestApiConfig::default()).await?; let (doc_processor_mailbox, doc_processor_inbox) = universe.create_test_mailbox(); let source_config = make_source_config(); - let ctx = SourceRuntimeArgs::for_test( - index_uid, - source_config, - metastore, - queues_dir_path.to_path_buf(), - ); - let ingest_api_source = IngestApiSource::try_new(ctx, SourceCheckpoint::default()).await?; + let source_runtime = SourceRuntimeBuilder::new(index_uid, source_config) + .with_queues_dir(queues_dir_path) + .build(); + + let ingest_api_source = IngestApiSource::try_new(source_runtime).await?; let ingest_api_source_actor = SourceActor { source: Box::new(ingest_api_source), doc_processor_mailbox, @@ -602,7 +584,6 @@ mod tests { #[tokio::test] async fn test_ingest_api_source_with_wait() -> anyhow::Result<()> { let universe = Universe::with_accelerated_time(); - let metastore = metastore_for_test(); let index_id = append_random_suffix("test-ingest-api-source"); let index_uid = IndexUid::new_with_random_ulid(&index_id); let temp_dir = tempfile::tempdir()?; @@ -612,13 +593,11 @@ mod tests { init_ingest_api(&universe, queues_dir_path, &IngestApiConfig::default()).await?; let (doc_processor_mailbox, doc_processor_inbox) = universe.create_test_mailbox(); let source_config = make_source_config(); - let ctx = SourceRuntimeArgs::for_test( - index_uid, - source_config, - metastore, - queues_dir_path.to_path_buf(), - ); - let ingest_api_source = IngestApiSource::try_new(ctx, SourceCheckpoint::default()).await?; + let source_runtime = SourceRuntimeBuilder::new(index_uid, source_config) + .with_queues_dir(queues_dir_path) + .build(); + + let ingest_api_source = IngestApiSource::try_new(source_runtime).await?; let ingest_api_source_actor = SourceActor { source: Box::new(ingest_api_source), doc_processor_mailbox, @@ -665,7 +644,6 @@ mod tests { #[tokio::test] async fn test_ingest_api_source_truncate_on_initialize() -> anyhow::Result<()> { let universe = Universe::with_accelerated_time(); - let metastore = metastore_for_test(); let index_id = append_random_suffix("test-ingest-api-source"); let index_uid = IndexUid::new_with_random_ulid(&index_id); let temp_dir = tempfile::tempdir()?; @@ -675,12 +653,9 @@ mod tests { init_ingest_api(&universe, queues_dir_path, &IngestApiConfig::default()).await?; let (doc_processor_mailbox, _doc_processor_inbox) = universe.create_test_mailbox(); let source_config = make_source_config(); - let ctx = SourceRuntimeArgs::for_test( - index_uid, - source_config, - metastore, - queues_dir_path.to_path_buf(), - ); + let _source_runtime = SourceRuntimeBuilder::new(index_uid.clone(), source_config.clone()) + .with_queues_dir(queues_dir_path) + .build(); let create_queue_req = CreateQueueIfNotExistsRequest { queue_id: index_id.clone(), @@ -705,10 +680,20 @@ mod tests { .unwrap(); assert_eq!(first_position, Some(0)); - let partition_id = ingest_api_service.ask(GetPartitionId).await?.into(); - let mut source_checkpoint = SourceCheckpoint::default(); - source_checkpoint.add_partition(partition_id, Position::offset(10u64)); - let ingest_api_source = IngestApiSource::try_new(ctx, source_checkpoint).await?; + let partition_id: PartitionId = ingest_api_service.ask(GetPartitionId).await?.into(); + let checkpoint_delta = SourceCheckpointDelta::from_partition_delta( + partition_id.clone(), + Position::Beginning, + Position::offset(10u64), + ) + .unwrap(); + + let source_runtime = SourceRuntimeBuilder::new(index_uid, source_config) + .with_mock_metastore(Some(checkpoint_delta)) + .with_queues_dir(queues_dir_path) + .build(); + + let ingest_api_source = IngestApiSource::try_new(source_runtime).await?; let ingest_api_source_actor = SourceActor { source: Box::new(ingest_api_source), doc_processor_mailbox, diff --git a/quickwit/quickwit-indexing/src/source/kafka_source.rs b/quickwit/quickwit-indexing/src/source/kafka_source.rs index b1181da8b6f..b18c1ef9e83 100644 --- a/quickwit/quickwit-indexing/src/source/kafka_source.rs +++ b/quickwit/quickwit-indexing/src/source/kafka_source.rs @@ -19,7 +19,6 @@ use std::collections::HashMap; use std::fmt; -use std::sync::Arc; use std::time::{Duration, Instant}; use anyhow::{anyhow, bail, Context}; @@ -30,8 +29,7 @@ use oneshot; use quickwit_actors::{ActorExitStatus, Mailbox}; use quickwit_config::KafkaSourceParams; use quickwit_metastore::checkpoint::{PartitionId, SourceCheckpoint}; -use quickwit_metastore::IndexMetadataResponseExt; -use quickwit_proto::metastore::{IndexMetadataRequest, MetastoreService, SourceType}; +use quickwit_proto::metastore::SourceType; use quickwit_proto::types::{IndexUid, Position}; use rdkafka::config::{ClientConfig, RDKafkaLogLevel}; use rdkafka::consumer::{ @@ -50,8 +48,8 @@ use tracing::{debug, info, warn}; use crate::actors::DocProcessor; use crate::models::{NewPublishLock, PublishLock}; use crate::source::{ - BatchBuilder, Source, SourceContext, SourceRuntimeArgs, TypedSourceFactory, - BATCH_NUM_BYTES_LIMIT, EMIT_BATCHES_TIMEOUT, + BatchBuilder, Source, SourceContext, SourceRuntime, TypedSourceFactory, BATCH_NUM_BYTES_LIMIT, + EMIT_BATCHES_TIMEOUT, }; type GroupId = String; @@ -65,11 +63,10 @@ impl TypedSourceFactory for KafkaSourceFactory { type Params = KafkaSourceParams; async fn typed_create_source( - ctx: Arc, + source_runtime: SourceRuntime, params: KafkaSourceParams, - checkpoint: SourceCheckpoint, ) -> anyhow::Result { - KafkaSource::try_new(ctx, params, checkpoint).await + KafkaSource::try_new(source_runtime, params).await } } @@ -212,7 +209,7 @@ pub struct KafkaSourceState { /// A `KafkaSource` consumes a topic and forwards its messages to an `Indexer`. pub struct KafkaSource { - ctx: Arc, + source_runtime: SourceRuntime, topic: String, group_id: GroupId, state: KafkaSourceState, @@ -227,8 +224,8 @@ impl fmt::Debug for KafkaSource { fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result { formatter .debug_struct("KafkaSource") - .field("index_id", &self.ctx.index_id()) - .field("source_id", &self.ctx.source_id()) + .field("index_uid", self.source_runtime.index_uid()) + .field("source_id", &self.source_runtime.source_id()) .field("topic", &self.topic) .finish() } @@ -237,17 +234,20 @@ impl fmt::Debug for KafkaSource { impl KafkaSource { /// Instantiates a new `KafkaSource`. pub async fn try_new( - ctx: Arc, - params: KafkaSourceParams, - _ignored_checkpoint: SourceCheckpoint, + source_runtime: SourceRuntime, + source_params: KafkaSourceParams, ) -> anyhow::Result { - let topic = params.topic.clone(); - let backfill_mode_enabled = params.enable_backfill_mode; + let topic = source_params.topic.clone(); + let backfill_mode_enabled = source_params.enable_backfill_mode; let (events_tx, events_rx) = mpsc::channel(100); let (truncate_tx, truncate_rx) = watch::channel(SourceCheckpoint::default()); - let (client_config, consumer, group_id) = - create_consumer(ctx.index_uid(), ctx.source_id(), params, events_tx.clone())?; + let (client_config, consumer, group_id) = create_consumer( + source_runtime.index_uid(), + source_runtime.source_id(), + source_params, + events_tx.clone(), + )?; let native_client_config = client_config.create_native_config()?; let session_timeout_ms = native_client_config .get("session.timeout.ms")? @@ -261,13 +261,13 @@ impl KafkaSource { let publish_lock = PublishLock::default(); info!( - index_id=%ctx.index_id(), - source_id=%ctx.source_id(), - topic=%topic, - group_id=%group_id, - max_poll_interval_ms=%max_poll_interval_ms, - session_timeout_ms=%session_timeout_ms, - "Starting Kafka source." + index_uid=%source_runtime.index_uid(), + source_id=%source_runtime.source_id(), + topic, + group_id, + max_poll_interval_ms, + session_timeout_ms, + "starting Kafka source" ); if max_poll_interval_ms <= 60_000 { warn!( @@ -277,7 +277,7 @@ impl KafkaSource { ); } Ok(KafkaSource { - ctx, + source_runtime, topic, group_id, state: KafkaSourceState::default(), @@ -342,22 +342,9 @@ impl KafkaSource { partitions: &[i32], assignment_tx: oneshot::Sender>, ) -> anyhow::Result<()> { - let index_metadata_request = - IndexMetadataRequest::for_index_uid(self.ctx.index_uid().clone()); - let index_metadata = ctx - .protect_future( - self.ctx - .metastore - .clone() - .index_metadata(index_metadata_request), - ) - .await? - .deserialize_index_metadata()?; - let checkpoint = index_metadata - .checkpoint - .source_checkpoint(self.ctx.source_id()) - .cloned() - .unwrap_or_default(); + let checkpoint = ctx + .protect_future(self.source_runtime.fetch_checkpoint()) + .await?; self.state.assigned_partitions.clear(); self.state.current_positions.clear(); @@ -394,12 +381,12 @@ impl KafkaSource { next_offsets.push((partition, next_offset)); } info!( - index_id=%self.ctx.index_id(), - source_id=%self.ctx.source_id(), + index_id=%self.source_runtime.index_id(), + source_id=%self.source_runtime.source_id(), topic=%self.topic, group_id=%self.group_id, partitions=?partitions, - "New partition assignment after rebalance.", + "new partition assignment after rebalance", ); assignment_tx .send(next_offsets) @@ -537,7 +524,7 @@ impl Source for KafkaSource { } fn name(&self) -> String { - format!("KafkaSource{{source_id={}}}", self.ctx.source_id()) + format!("{:?}", self) } fn observable_state(&self) -> JsonValue { @@ -546,8 +533,8 @@ impl Source for KafkaSource { let current_positions: Vec<(&i32, &Position)> = self.state.current_positions.iter().sorted().collect(); json!({ - "index_id": self.ctx.index_id(), - "source_id": self.ctx.source_id(), + "index_id": self.source_runtime.index_id(), + "source_id": self.source_runtime.source_id(), "topic": self.topic, "assigned_partitions": assigned_partitions, "current_positions": current_positions, @@ -774,7 +761,6 @@ fn message_payload_to_doc(message: &BorrowedMessage) -> Option { #[cfg(all(test, feature = "kafka-broker-tests"))] mod kafka_broker_tests { use std::num::NonZeroUsize; - use std::path::PathBuf; use quickwit_actors::{ActorContext, Universe}; use quickwit_common::rand::append_random_suffix; @@ -796,6 +782,7 @@ mod kafka_broker_tests { use super::*; use crate::new_split_id; + use crate::source::tests::SourceRuntimeBuilder; use crate::source::{quickwit_supported_sources, RawDocBatch, SourceActor}; fn create_base_consumer(group_id: &str) -> BaseConsumer { @@ -929,13 +916,16 @@ mod kafka_broker_tests { async fn setup_index( mut metastore: MetastoreServiceClient, index_id: &str, - source_id: &str, + source_config: &SourceConfig, partition_deltas: &[(u64, i64, i64)], ) -> IndexUid { let index_uri = format!("ram:///indexes/{index_id}"); let index_config = IndexConfig::for_test(index_id, &index_uri); - let create_index_request = - CreateIndexRequest::try_from_index_config(&index_config).unwrap(); + let create_index_request = CreateIndexRequest::try_from_index_and_source_configs( + &index_config, + &[source_config.clone()], + ) + .unwrap(); let index_uid: IndexUid = metastore .create_index(create_index_request) .await @@ -970,7 +960,7 @@ mod kafka_broker_tests { .unwrap(); } let checkpoint_delta = IndexCheckpointDelta { - source_id: source_id.to_string(), + source_id: source_config.source_id.to_string(), source_delta, }; let checkpoint_delta_json = serde_json::to_string(&checkpoint_delta).unwrap(); @@ -994,7 +984,6 @@ mod kafka_broker_tests { let topic = append_random_suffix("test-kafka-source--process-message--topic"); create_topic(&admin_client, &topic, 2).await.unwrap(); - let metastore = metastore_for_test(); let index_id = append_random_suffix("test-kafka-source--process-message--index"); let index_uid = IndexUid::new_with_random_ulid(&index_id); let (_source_id, source_config) = get_source_config(&topic, "earliest"); @@ -1004,16 +993,8 @@ mod kafka_broker_tests { source_config.source_params ); }; - let ctx = SourceRuntimeArgs::for_test( - index_uid, - source_config, - metastore, - PathBuf::from("./queues"), - ); - let ignored_checkpoint = SourceCheckpoint::default(); - let mut kafka_source = KafkaSource::try_new(ctx, params, ignored_checkpoint) - .await - .unwrap(); + let source_runtime = SourceRuntimeBuilder::new(index_uid, source_config).build(); + let mut kafka_source = KafkaSource::try_new(source_runtime, params).await.unwrap(); let partition_id_1 = PartitionId::from(1u64); let partition_id_2 = PartitionId::from(2u64); @@ -1125,9 +1106,10 @@ mod kafka_broker_tests { let metastore = metastore_for_test(); let index_id = append_random_suffix("test-kafka-source--process-assign-partitions--index"); - let (source_id, source_config) = get_source_config(&topic, "earliest"); + let (_source_id, source_config) = get_source_config(&topic, "earliest"); - let index_uid = setup_index(metastore.clone(), &index_id, &source_id, &[(2, -1, 42)]).await; + let index_uid = + setup_index(metastore.clone(), &index_id, &source_config, &[(2, -1, 42)]).await; let SourceParams::Kafka(params) = source_config.clone().source_params else { panic!( @@ -1135,16 +1117,10 @@ mod kafka_broker_tests { source_config.source_params ); }; - let ctx = SourceRuntimeArgs::for_test( - index_uid, - source_config, - metastore, - PathBuf::from("./queues"), - ); - let ignored_checkpoint = SourceCheckpoint::default(); - let mut kafka_source = KafkaSource::try_new(ctx, params, ignored_checkpoint) - .await - .unwrap(); + let source_runtime = SourceRuntimeBuilder::new(index_uid, source_config) + .with_metastore(metastore) + .build(); + let mut kafka_source = KafkaSource::try_new(source_runtime, params).await.unwrap(); kafka_source.state.num_inactive_partitions = 1; let universe = Universe::with_accelerated_time(); @@ -1183,7 +1159,6 @@ mod kafka_broker_tests { let topic = append_random_suffix("test-kafka-source--process-revoke-partitions--topic"); create_topic(&admin_client, &topic, 1).await.unwrap(); - let metastore = metastore_for_test(); let index_id = append_random_suffix("test-kafka-source--process-revoke--partitions--index"); let index_uid = IndexUid::new_with_random_ulid(&index_id); let (_source_id, source_config) = get_source_config(&topic, "earliest"); @@ -1193,16 +1168,8 @@ mod kafka_broker_tests { source_config.source_params ); }; - let ctx = SourceRuntimeArgs::for_test( - index_uid, - source_config, - metastore, - PathBuf::from("./queues"), - ); - let ignored_checkpoint = SourceCheckpoint::default(); - let mut kafka_source = KafkaSource::try_new(ctx, params, ignored_checkpoint) - .await - .unwrap(); + let source_runtime = SourceRuntimeBuilder::new(index_uid, source_config).build(); + let mut kafka_source = KafkaSource::try_new(source_runtime, params).await.unwrap(); let universe = Universe::with_accelerated_time(); let (source_mailbox, _source_inbox) = universe.create_test_mailbox(); @@ -1241,7 +1208,6 @@ mod kafka_broker_tests { let topic = append_random_suffix("test-kafka-source--process-partition-eof--topic"); create_topic(&admin_client, &topic, 1).await.unwrap(); - let metastore = metastore_for_test(); let index_id = append_random_suffix("test-kafka-source--process-partition-eof--index"); let index_uid = IndexUid::new_with_random_ulid(&index_id); let (_source_id, source_config) = get_source_config(&topic, "earliest"); @@ -1251,16 +1217,8 @@ mod kafka_broker_tests { source_config.source_params ); }; - let ctx = SourceRuntimeArgs::for_test( - index_uid, - source_config, - metastore, - PathBuf::from("./queues"), - ); - let ignored_checkpoint = SourceCheckpoint::default(); - let mut kafka_source = KafkaSource::try_new(ctx, params, ignored_checkpoint) - .await - .unwrap(); + let source_runtime = SourceRuntimeBuilder::new(index_uid, source_config).build(); + let mut kafka_source = KafkaSource::try_new(source_runtime, params).await.unwrap(); let partition_id_1 = PartitionId::from(1u64); kafka_source.state.assigned_partitions = HashMap::from_iter([(1, partition_id_1)]); @@ -1282,9 +1240,9 @@ mod kafka_broker_tests { let metastore = metastore_for_test(); let index_id = append_random_suffix("test-kafka-source--suggest-truncate--index"); - let (source_id, source_config) = get_source_config(&topic, "earliest"); - - let index_uid = setup_index(metastore.clone(), &index_id, &source_id, &[(2, -1, 42)]).await; + let (_source_id, source_config) = get_source_config(&topic, "earliest"); + let index_uid = + setup_index(metastore.clone(), &index_id, &source_config, &[(2, -1, 42)]).await; let SourceParams::Kafka(params) = source_config.clone().source_params else { panic!( @@ -1292,16 +1250,10 @@ mod kafka_broker_tests { source_config.source_params ); }; - let ctx = SourceRuntimeArgs::for_test( - index_uid, - source_config, - metastore, - PathBuf::from("./queues"), - ); - let ignored_checkpoint = SourceCheckpoint::default(); - let mut kafka_source = KafkaSource::try_new(ctx, params, ignored_checkpoint) - .await - .unwrap(); + let source_runtime = SourceRuntimeBuilder::new(index_uid, source_config) + .with_metastore(metastore) + .build(); + let mut kafka_source = KafkaSource::try_new(source_runtime, params).await.unwrap(); let universe = Universe::with_accelerated_time(); let (source_mailbox, _source_inbox) = universe.create_test_mailbox(); @@ -1369,18 +1321,11 @@ mod kafka_broker_tests { let metastore = metastore_for_test(); let index_id = append_random_suffix("test-kafka-source--index"); let (source_id, source_config) = get_source_config(&topic, "earliest"); - let index_uid = setup_index(metastore.clone(), &index_id, &source_id, &[]).await; - let source = source_loader - .load_source( - SourceRuntimeArgs::for_test( - index_uid, - source_config, - metastore, - PathBuf::from("./queues"), - ), - SourceCheckpoint::default(), - ) - .await?; + let index_uid = setup_index(metastore.clone(), &index_id, &source_config, &[]).await; + let source_runtime = SourceRuntimeBuilder::new(index_uid, source_config) + .with_metastore(metastore) + .build(); + let source = source_loader.load_source(source_runtime).await?; let (doc_processor_mailbox, doc_processor_inbox) = universe.create_test_mailbox(); let source_actor = SourceActor { source, @@ -1428,19 +1373,12 @@ mod kafka_broker_tests { let metastore = metastore_for_test(); let index_id = append_random_suffix("test-kafka-source--index"); let (source_id, source_config) = get_source_config(&topic, "earliest"); - let index_uid = setup_index(metastore.clone(), &index_id, &source_id, &[]).await; - let source = source_loader - .load_source( - SourceRuntimeArgs::for_test( - index_uid, - source_config, - metastore, - PathBuf::from("./queues"), - ), - SourceCheckpoint::default(), - ) - .await?; + let index_uid = setup_index(metastore.clone(), &index_id, &source_config, &[]).await; + let source_runtime = SourceRuntimeBuilder::new(index_uid, source_config) + .with_metastore(metastore) + .build(); let (doc_processor_mailbox, doc_processor_inbox) = universe.create_test_mailbox(); + let source = source_loader.load_source(source_runtime).await?; let source_actor = SourceActor { source, doc_processor_mailbox: doc_processor_mailbox.clone(), @@ -1495,22 +1433,14 @@ mod kafka_broker_tests { let index_uid = setup_index( metastore.clone(), &index_id, - &source_id, + &source_config, &[(0, -1, 0), (1, -1, 2)], ) .await; - let source = source_loader - .load_source( - SourceRuntimeArgs::for_test( - index_uid, - source_config, - metastore, - PathBuf::from("./queues"), - ), - SourceCheckpoint::default(), - ) - .await?; - + let source_runtime = SourceRuntimeBuilder::new(index_uid, source_config) + .with_metastore(metastore) + .build(); + let source = source_loader.load_source(source_runtime).await?; let (doc_processor_mailbox, doc_processor_inbox) = universe.create_test_mailbox(); let source_actor = SourceActor { source, @@ -1559,18 +1489,11 @@ mod kafka_broker_tests { let metastore = metastore_for_test(); let index_id = append_random_suffix("test-kafka-source--index"); let (source_id, source_config) = get_source_config(&topic, "latest"); - let index_uid = setup_index(metastore.clone(), &index_id, &source_id, &[]).await; - let source = source_loader - .load_source( - SourceRuntimeArgs::for_test( - index_uid, - source_config, - metastore, - PathBuf::from("./queues"), - ), - SourceCheckpoint::default(), - ) - .await?; + let index_uid = setup_index(metastore.clone(), &index_id, &source_config, &[]).await; + let source_runtime = SourceRuntimeBuilder::new(index_uid, source_config) + .with_metastore(metastore) + .build(); + let source = source_loader.load_source(source_runtime).await?; let (doc_processor_mailbox, doc_processor_inbox) = universe.create_test_mailbox(); let source_actor = SourceActor { source, diff --git a/quickwit/quickwit-indexing/src/source/kinesis/kinesis_source.rs b/quickwit/quickwit-indexing/src/source/kinesis/kinesis_source.rs index 355dae24335..6152c53f3db 100644 --- a/quickwit/quickwit-indexing/src/source/kinesis/kinesis_source.rs +++ b/quickwit/quickwit-indexing/src/source/kinesis/kinesis_source.rs @@ -19,7 +19,6 @@ use std::collections::HashMap; use std::fmt; -use std::sync::Arc; use std::time::Duration; use anyhow::{bail, Context}; @@ -44,8 +43,8 @@ use super::shard_consumer::{ShardConsumer, ShardConsumerHandle, ShardConsumerMes use crate::actors::DocProcessor; use crate::source::kinesis::helpers::get_kinesis_client; use crate::source::{ - BatchBuilder, Source, SourceContext, SourceRuntimeArgs, TypedSourceFactory, - BATCH_NUM_BYTES_LIMIT, EMIT_BATCHES_TIMEOUT, + BatchBuilder, Source, SourceContext, SourceRuntime, TypedSourceFactory, BATCH_NUM_BYTES_LIMIT, + EMIT_BATCHES_TIMEOUT, }; type ShardId = String; @@ -59,17 +58,16 @@ impl TypedSourceFactory for KinesisSourceFactory { type Params = KinesisSourceParams; async fn typed_create_source( - ctx: Arc, - params: KinesisSourceParams, - checkpoint: SourceCheckpoint, + source_runtime: SourceRuntime, + source_params: KinesisSourceParams, ) -> anyhow::Result { - KinesisSource::try_new(ctx.source_id().to_string(), params, checkpoint).await + KinesisSource::try_new(source_runtime, source_params).await } } struct ShardConsumerState { partition_id: PartitionId, - position: Position, + current_position: Position, lag_millis: Option, _shard_consumer_handle: ShardConsumerHandle, } @@ -87,12 +85,10 @@ pub struct KinesisSourceState { } pub struct KinesisSource { - // Source ID - source_id: String, + // Runtime arguments. + source_runtime: SourceRuntime, // Target stream to consume. stream_name: String, - // Initialization checkpoint. - checkpoint: SourceCheckpoint, kinesis_client: KinesisClient, // Retry parameters (max attempts, max delay, ...). retry_params: RetryParams, @@ -106,51 +102,54 @@ pub struct KinesisSource { impl fmt::Debug for KinesisSource { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!( - f, - "KinesisSource {{ source_id: {}, stream_name: {} }}", - self.source_id, self.stream_name - ) + f.debug_struct("KinesisSource") + .field("index_uid", self.source_runtime.index_uid()) + .field("source_id", &self.source_runtime.source_id()) + .field("stream_Name", &self.stream_name) + .finish() } } impl KinesisSource { /// Instantiates a new `KinesisSource`. pub async fn try_new( - source_id: String, - params: KinesisSourceParams, - checkpoint: SourceCheckpoint, + source_runtime: SourceRuntime, + source_params: KinesisSourceParams, ) -> anyhow::Result { - let stream_name = params.stream_name; - let backfill_mode_enabled = params.enable_backfill_mode; - let region = get_region(params.region_or_endpoint).await?; + let stream_name = source_params.stream_name; + let backfill_mode_enabled = source_params.enable_backfill_mode; + let region = get_region(source_params.region_or_endpoint).await?; let kinesis_client = get_kinesis_client(region).await?; let (shard_consumers_tx, shard_consumers_rx) = mpsc::channel(1_000); let state = KinesisSourceState::default(); let retry_params = RetryParams::aggressive(); - Ok(KinesisSource { - source_id, + let kinesis_source = KinesisSource { + source_runtime, stream_name, - checkpoint, kinesis_client, shard_consumers_tx, shard_consumers_rx, state, backfill_mode_enabled, retry_params, - }) + }; + Ok(kinesis_source) } - fn spawn_shard_consumer(&mut self, ctx: &SourceContext, shard_id: ShardId) { + fn spawn_shard_consumer( + &mut self, + ctx: &SourceContext, + shard_id: ShardId, + checkpoint: &SourceCheckpoint, + ) { assert!(!self.state.shard_consumers.contains_key(&shard_id)); let partition_id = PartitionId::from(shard_id.as_str()); - let position = self - .checkpoint + let from_position = checkpoint .position_for_partition(&partition_id) .cloned() .unwrap_or(Position::Beginning); - let from_sequence_number_exclusive = match &position { + let from_sequence_number_exclusive = match &from_position { Position::Beginning => None, Position::Offset(offset) => Some(offset.to_string()), Position::Eof(_) => panic!("position of a Kinesis shard should never be EOF"), @@ -167,7 +166,7 @@ impl KinesisSource { let _shard_consumer_handle = shard_consumer.spawn(ctx); let shard_consumer_state = ShardConsumerState { partition_id, - position, + current_position: from_position, lag_millis: None, _shard_consumer_handle, }; @@ -192,8 +191,14 @@ impl Source for KinesisSource { None, )) .await?; + let checkpoint = self + .source_runtime + .fetch_checkpoint() + .await + .context("failed to fetch checkpoint")?; + for shard in shards { - self.spawn_shard_consumer(ctx, shard.shard_id); + self.spawn_shard_consumer(ctx, shard.shard_id, &checkpoint); } info!( stream_name = %self.stream_name, @@ -218,8 +223,10 @@ impl Source for KinesisSource { // The source always carries a sender for this channel. match message_opt.expect("Channel unexpectedly closed.") { ShardConsumerMessage::ChildShards(shard_ids) => { + let checkpoint = self.source_runtime.fetch_checkpoint().await.context("failed to fetch checkpoint")?; + for shard_id in shard_ids { - self.spawn_shard_consumer(ctx, shard_id); + self.spawn_shard_consumer(ctx, shard_id, &checkpoint); } } ShardConsumerMessage::Records { shard_id, records, lag_millis } => { @@ -254,7 +261,7 @@ impl Source for KinesisSource { let partition_id = shard_consumer_state.partition_id.clone(); let current_position = Position::from(record.sequence_number); - let previous_position = std::mem::replace(&mut shard_consumer_state.position, current_position.clone()); + let previous_position = std::mem::replace(&mut shard_consumer_state.current_position, current_position.clone()); batch_builder.checkpoint_delta.record_partition_delta( partition_id, @@ -310,7 +317,7 @@ impl Source for KinesisSource { } fn name(&self) -> String { - format!("KinesisSource{{source_id={}}}", self.source_id) + format!("{:?}", self) } fn observable_state(&self) -> JsonValue { @@ -318,7 +325,9 @@ impl Source for KinesisSource { .state .shard_consumers .iter() - .map(|(shard_id, shard_consumer_state)| (shard_id, &shard_consumer_state.position)) + .map(|(shard_id, shard_consumer_state)| { + (shard_id, &shard_consumer_state.current_position) + }) .sorted() .collect(); json!({ @@ -351,14 +360,18 @@ pub(super) async fn get_region( #[cfg(all(test, feature = "kinesis-localstack-tests"))] mod tests { + use quickwit_actors::Universe; + use quickwit_config::{SourceConfig, SourceParams}; use quickwit_metastore::checkpoint::SourceCheckpointDelta; + use quickwit_proto::types::IndexUid; use super::*; use crate::models::RawDocBatch; use crate::source::kinesis::helpers::tests::{ make_shard_id, put_records_into_shards, setup, teardown, }; + use crate::source::tests::SourceRuntimeBuilder; use crate::source::SourceActor; // Sequence number @@ -382,17 +395,21 @@ mod tests { let universe = Universe::with_accelerated_time(); let (doc_processor_mailbox, doc_processor_inbox) = universe.create_test_mailbox(); let (kinesis_client, stream_name) = setup("test-kinesis-source", 3).await.unwrap(); - let params = KinesisSourceParams { + let index_id = "test-kinesis-index"; + let index_uid = IndexUid::new_with_random_ulid(index_id); + let kinesis_params = KinesisSourceParams { stream_name: stream_name.clone(), region_or_endpoint: Some(RegionOrEndpoint::Endpoint( "http://localhost:4566".to_string(), )), enable_backfill_mode: true, }; + let source_params = SourceParams::Kinesis(kinesis_params.clone()); + let source_config = SourceConfig::for_test("test-kinesis-source", source_params); + let source_runtime = SourceRuntimeBuilder::new(index_uid, source_config).build(); { - let checkpoint = SourceCheckpoint::default(); let kinesis_source = - KinesisSource::try_new("my-kinesis-source".to_string(), params.clone(), checkpoint) + KinesisSource::try_new(source_runtime.clone(), kinesis_params.clone()) .await .unwrap(); let actor = SourceActor { @@ -444,9 +461,8 @@ mod tests { .map(|(shard_id, seqno)| (*shard_id, Position::from(seqno.clone()))) .collect(); { - let checkpoint = SourceCheckpoint::default(); let kinesis_source = - KinesisSource::try_new("my-kinesis-source".to_string(), params.clone(), checkpoint) + KinesisSource::try_new(source_runtime.clone(), kinesis_params.clone()) .await .unwrap(); let actor = SourceActor { @@ -503,7 +519,7 @@ mod tests { sequence_numbers.get(&1).unwrap().first().unwrap().clone(); let from_sequence_number_exclusive_shard_2 = sequence_numbers.get(&2).unwrap().last().unwrap().clone(); - let checkpoint: SourceCheckpoint = vec![ + let _checkpoint: SourceCheckpoint = vec![ ( make_shard_id(1), from_sequence_number_exclusive_shard_1.clone(), @@ -516,10 +532,9 @@ mod tests { .into_iter() .map(|(partition_id, offset)| (PartitionId::from(partition_id), Position::from(offset))) .collect(); - let kinesis_source = - KinesisSource::try_new("my-kinesis-source".to_string(), params.clone(), checkpoint) - .await - .unwrap(); + let kinesis_source = KinesisSource::try_new(source_runtime, kinesis_params) + .await + .unwrap(); let actor = SourceActor { source: Box::new(kinesis_source), doc_processor_mailbox: doc_processor_mailbox.clone(), diff --git a/quickwit/quickwit-indexing/src/source/mod.rs b/quickwit/quickwit-indexing/src/source/mod.rs index ed01d8041af..f42fbd444a9 100644 --- a/quickwit/quickwit-indexing/src/source/mod.rs +++ b/quickwit/quickwit-indexing/src/source/mod.rs @@ -96,8 +96,12 @@ use quickwit_common::runtimes::RuntimeType; use quickwit_config::{SourceConfig, SourceParams}; use quickwit_ingest::IngesterPool; use quickwit_metastore::checkpoint::{SourceCheckpoint, SourceCheckpointDelta}; +use quickwit_metastore::IndexMetadataResponseExt; use quickwit_proto::indexing::IndexingPipelineId; -use quickwit_proto::metastore::{MetastoreServiceClient, SourceType}; +use quickwit_proto::metastore::{ + IndexMetadataRequest, MetastoreError, MetastoreResult, MetastoreService, + MetastoreServiceClient, SourceType, +}; use quickwit_proto::types::{IndexUid, PipelineUid, ShardId}; use quickwit_storage::StorageResolver; use serde_json::Value as JsonValue; @@ -129,7 +133,8 @@ const BATCH_NUM_BYTES_LIMIT: u64 = ByteSize::mib(5).as_u64(); const EMIT_BATCHES_TIMEOUT: Duration = Duration::from_millis(if cfg!(test) { 100 } else { 1_000 }); /// Runtime configuration used during execution of a source actor. -pub struct SourceRuntimeArgs { +#[derive(Clone)] +pub struct SourceRuntime { pub pipeline_id: IndexingPipelineId, pub source_config: SourceConfig, pub metastore: MetastoreServiceClient, @@ -140,7 +145,7 @@ pub struct SourceRuntimeArgs { pub event_broker: EventBroker, } -impl SourceRuntimeArgs { +impl SourceRuntime { pub fn node_id(&self) -> &str { &self.pipeline_id.node_id } @@ -161,28 +166,26 @@ impl SourceRuntimeArgs { self.pipeline_id.pipeline_uid } - #[cfg(test)] - fn for_test( - index_uid: IndexUid, - source_config: SourceConfig, - metastore: MetastoreServiceClient, - queues_dir_path: PathBuf, - ) -> std::sync::Arc { - use std::sync::Arc; - let pipeline_id = IndexingPipelineId { - node_id: "test-node".to_string(), - index_uid, - source_id: source_config.source_id.clone(), - pipeline_uid: PipelineUid::for_test(0u128), - }; - Arc::new(SourceRuntimeArgs { - pipeline_id, - metastore, - ingester_pool: IngesterPool::default(), - queues_dir_path, - source_config, - storage_resolver: StorageResolver::for_test(), - event_broker: EventBroker::default(), + pub async fn fetch_checkpoint(&self) -> MetastoreResult { + let index_uid = self.index_uid().clone(); + let request = IndexMetadataRequest::for_index_uid(index_uid); + let response = self.metastore.clone().index_metadata(request).await?; + let index_metadata = response.deserialize_index_metadata()?; + + if let Some(checkpoint) = index_metadata + .checkpoint + .source_checkpoint(self.source_id()) + .cloned() + { + return Ok(checkpoint); + } + Err(MetastoreError::Internal { + message: format!( + "could not find checkpoint for index `{}` and source `{}`", + self.index_uid(), + self.source_id() + ), + cause: "".to_string(), }) } } @@ -385,19 +388,19 @@ pub fn quickwit_supported_sources() -> &'static SourceLoader { static SOURCE_LOADER: OnceCell = OnceCell::new(); SOURCE_LOADER.get_or_init(|| { let mut source_factory = SourceLoader::default(); - source_factory.add_source("file", FileSourceFactory); + source_factory.add_source(SourceType::File, FileSourceFactory); #[cfg(feature = "gcp-pubsub")] - source_factory.add_source("pubsub", GcpPubSubSourceFactory); - source_factory.add_source("ingest-api", IngestApiSourceFactory); - source_factory.add_source("ingest", IngestSourceFactory); + source_factory.add_source(SourceType::PubSub, GcpPubSubSourceFactory); + source_factory.add_source(SourceType::IngestV1, IngestApiSourceFactory); + source_factory.add_source(SourceType::IngestV2, IngestSourceFactory); #[cfg(feature = "kafka")] - source_factory.add_source("kafka", KafkaSourceFactory); + source_factory.add_source(SourceType::Kafka, KafkaSourceFactory); #[cfg(feature = "kinesis")] - source_factory.add_source("kinesis", KinesisSourceFactory); + source_factory.add_source(SourceType::Kinesis, KinesisSourceFactory); #[cfg(feature = "pulsar")] - source_factory.add_source("pulsar", PulsarSourceFactory); - source_factory.add_source("vec", VecSourceFactory); - source_factory.add_source("void", VoidSourceFactory); + source_factory.add_source(SourceType::Pulsar, PulsarSourceFactory); + source_factory.add_source(SourceType::Vec, VecSourceFactory); + source_factory.add_source(SourceType::Void, VoidSourceFactory); source_factory }) } @@ -541,9 +544,108 @@ mod tests { use std::num::NonZeroUsize; use quickwit_config::{SourceInputFormat, VecSourceParams}; + use quickwit_metastore::checkpoint::IndexCheckpointDelta; + use quickwit_metastore::IndexMetadata; + use quickwit_proto::metastore::{IndexMetadataResponse, MockMetastoreService}; use super::*; + pub struct SourceRuntimeBuilder { + index_uid: IndexUid, + source_config: SourceConfig, + metastore_opt: Option, + queues_dir_path_opt: Option, + } + + impl SourceRuntimeBuilder { + pub fn new(index_uid: IndexUid, source_config: SourceConfig) -> Self { + SourceRuntimeBuilder { + index_uid, + source_config, + metastore_opt: None, + queues_dir_path_opt: None, + } + } + + pub fn build(mut self) -> SourceRuntime { + let metastore = self + .metastore_opt + .take() + .unwrap_or_else(|| self.setup_mock_metastore(None)); + + let queues_dir_path = self + .queues_dir_path_opt + .unwrap_or_else(|| PathBuf::from("./queues")); + + SourceRuntime { + pipeline_id: IndexingPipelineId { + node_id: "test-node".to_string(), + index_uid: self.index_uid, + source_id: self.source_config.source_id.clone(), + pipeline_uid: PipelineUid::for_test(0u128), + }, + metastore, + ingester_pool: IngesterPool::default(), + queues_dir_path, + source_config: self.source_config, + storage_resolver: StorageResolver::for_test(), + event_broker: EventBroker::default(), + } + } + + #[cfg(feature = "kafka")] + pub fn with_metastore(mut self, metastore: MetastoreServiceClient) -> Self { + self.metastore_opt = Some(metastore); + self + } + + pub fn with_mock_metastore( + mut self, + source_checkpoint_delta_opt: Option, + ) -> Self { + self.metastore_opt = Some(self.setup_mock_metastore(source_checkpoint_delta_opt)); + self + } + + pub fn with_queues_dir(mut self, queues_dir_path: impl Into) -> Self { + self.queues_dir_path_opt = Some(queues_dir_path.into()); + self + } + + fn setup_mock_metastore( + &self, + source_checkpoint_delta_opt: Option, + ) -> MetastoreServiceClient { + let index_uid = self.index_uid.clone(); + let source_config = self.source_config.clone(); + + let mut mock_metastore = MockMetastoreService::new(); + mock_metastore + .expect_index_metadata() + .returning(move |_request| { + let index_uri = format!("ram:///indexes/{}", index_uid.index_id); + let mut index_metadata = + IndexMetadata::for_test(&index_uid.index_id, &index_uri); + index_metadata.index_uid = index_uid.clone(); + + let source_id = source_config.source_id.clone(); + index_metadata.add_source(source_config.clone()).unwrap(); + + if let Some(source_delta) = source_checkpoint_delta_opt.clone() { + let delta = IndexCheckpointDelta { + source_id, + source_delta, + }; + index_metadata.checkpoint.try_apply_delta(delta).unwrap(); + } + let response = + IndexMetadataResponse::try_from_index_metadata(&index_metadata).unwrap(); + Ok(response) + }); + MetastoreServiceClient::from_mock(mock_metastore) + } + } + #[tokio::test] async fn test_check_source_connectivity() -> anyhow::Result<()> { { diff --git a/quickwit/quickwit-indexing/src/source/pulsar_source.rs b/quickwit/quickwit-indexing/src/source/pulsar_source.rs index 0dc6f38b7fc..2e19ddc833d 100644 --- a/quickwit/quickwit-indexing/src/source/pulsar_source.rs +++ b/quickwit/quickwit-indexing/src/source/pulsar_source.rs @@ -18,7 +18,7 @@ // along with this program. If not, see . use std::collections::BTreeMap; -use std::sync::Arc; +use std::fmt; use std::time::{Duration, Instant}; use anyhow::{anyhow, Context}; @@ -42,7 +42,7 @@ use tracing::{debug, info, warn}; use crate::actors::DocProcessor; use crate::source::{ - BatchBuilder, Source, SourceActor, SourceContext, SourceRuntimeArgs, TypedSourceFactory, + BatchBuilder, Source, SourceActor, SourceContext, SourceRuntime, TypedSourceFactory, BATCH_NUM_BYTES_LIMIT, EMIT_BATCHES_TIMEOUT, }; @@ -56,11 +56,10 @@ impl TypedSourceFactory for PulsarSourceFactory { type Params = PulsarSourceParams; async fn typed_create_source( - ctx: Arc, - params: PulsarSourceParams, - checkpoint: SourceCheckpoint, + source_runtime: SourceRuntime, + source_params: PulsarSourceParams, ) -> anyhow::Result { - PulsarSource::try_new(ctx, params, checkpoint).await + PulsarSource::try_new(source_runtime, source_params).await } } @@ -78,36 +77,47 @@ pub struct PulsarSourceState { } pub struct PulsarSource { - ctx: Arc, + source_runtime: SourceRuntime, + source_params: PulsarSourceParams, pulsar_consumer: PulsarConsumer, - params: PulsarSourceParams, subscription_name: String, current_positions: BTreeMap, state: PulsarSourceState, } +impl fmt::Debug for PulsarSource { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("PulsarSource") + .field("index_uid", self.source_runtime.index_uid()) + .field("source_id", &self.source_runtime.source_id()) + .field("subscription_name", &self.subscription_name) + .field("topics", &self.source_params.topics.join(", ")) + .finish() + } +} + impl PulsarSource { pub async fn try_new( - ctx: Arc, - params: PulsarSourceParams, - checkpoint: SourceCheckpoint, + source_runtime: SourceRuntime, + source_params: PulsarSourceParams, ) -> anyhow::Result { - let subscription_name = subscription_name(ctx.index_uid(), ctx.source_id()); + let subscription_name = + subscription_name(source_runtime.index_uid(), source_runtime.source_id()); info!( - index_id=%ctx.index_id(), - source_id=%ctx.source_id(), - topics=?params.topics, + index_id=%source_runtime.index_id(), + source_id=%source_runtime.source_id(), + topics=?source_params.topics, subscription_name=%subscription_name, "Create Pulsar source." ); - - let pulsar = connect_pulsar(¶ms).await?; + let pulsar = connect_pulsar(&source_params).await?; + let checkpoint = source_runtime.fetch_checkpoint().await?; // Current positions are built mapping the topic ID to the last-saved // message ID, pulsar ensures these topics (and topic partitions) are // unique so that we don't inadvertently clash. let mut current_positions = BTreeMap::new(); - for topic in params.topics.iter() { + for topic in source_params.topics.iter() { let partitions = pulsar.lookup_partitioned_topic(topic).await?; for (partition, _) in partitions { @@ -119,18 +129,17 @@ impl PulsarSource { } } } - let pulsar_consumer = create_pulsar_consumer( subscription_name.clone(), - params.clone(), + source_params.clone(), pulsar, current_positions.clone(), ) .await?; Ok(Self { - ctx, - params, + source_runtime, + source_params, pulsar_consumer, subscription_name, current_positions, @@ -262,16 +271,16 @@ impl Source for PulsarSource { } fn name(&self) -> String { - format!("PulsarSource{{source_id={}}}", self.ctx.source_id()) + format!("{:?}", self) } fn observable_state(&self) -> JsonValue { json!({ - "index_id": self.ctx.index_id(), - "source_id": self.ctx.source_id(), - "topics": self.params.topics, + "index_id": self.source_runtime.index_id(), + "source_id": self.source_runtime.source_id(), + "topics": self.source_params.topics, "subscription_name": self.subscription_name, - "consumer_name": self.params.consumer_name, + "consumer_name": self.source_params.consumer_name, "num_bytes_processed": self.state.num_bytes_processed, "num_messages_processed": self.state.num_messages_processed, "num_invalid_messages": self.state.num_invalid_messages, @@ -432,7 +441,6 @@ mod pulsar_broker_tests { use std::collections::HashSet; use std::num::NonZeroUsize; use std::ops::Range; - use std::path::PathBuf; use futures::future::join_all; use quickwit_actors::{ActorHandle, Inbox, Universe, HEARTBEAT}; @@ -453,6 +461,7 @@ mod pulsar_broker_tests { use super::*; use crate::new_split_id; use crate::source::pulsar_source::{msg_id_from_position, msg_id_to_position}; + use crate::source::tests::SourceRuntimeBuilder; use crate::source::{quickwit_supported_sources, RawDocBatch, SuggestTruncate}; static PULSAR_URI: &str = "pulsar://localhost:6650"; @@ -696,21 +705,14 @@ mod pulsar_broker_tests { async fn create_source( universe: &Universe, - metastore: MetastoreServiceClient, + _metastore: MetastoreServiceClient, index_uid: IndexUid, source_config: SourceConfig, - start_checkpoint: SourceCheckpoint, + _start_checkpoint: SourceCheckpoint, ) -> anyhow::Result<(ActorHandle, Inbox)> { - let ctx = SourceRuntimeArgs::for_test( - index_uid, - source_config, - metastore, - PathBuf::from("./queues"), - ); - let source_loader = quickwit_supported_sources(); - let source = source_loader.load_source(ctx, start_checkpoint).await?; - + let source_runtime = SourceRuntimeBuilder::new(index_uid, source_config).build(); + let source = source_loader.load_source(source_runtime).await?; let (doc_processor_mailbox, doc_processor_inbox) = universe.create_test_mailbox(); let source_actor = SourceActor { source, @@ -812,7 +814,6 @@ mod pulsar_broker_tests { #[tokio::test] async fn test_doc_batching_logic() { - let metastore = metastore_for_test(); let topic = append_random_suffix("test-pulsar-source-topic"); let index_id = append_random_suffix("test-pulsar-source-index"); @@ -824,15 +825,8 @@ mod pulsar_broker_tests { unreachable!() }; - let ctx = SourceRuntimeArgs::for_test( - index_uid, - source_config, - metastore, - PathBuf::from("./queues"), - ); - let start_checkpoint = SourceCheckpoint::default(); - - let mut pulsar_source = PulsarSource::try_new(ctx, params, start_checkpoint) + let source_runtime = SourceRuntimeBuilder::new(index_uid, source_config).build(); + let mut pulsar_source = PulsarSource::try_new(source_runtime, params) .await .expect("Setup pulsar source"); diff --git a/quickwit/quickwit-indexing/src/source/source_factory.rs b/quickwit/quickwit-indexing/src/source/source_factory.rs index 4bc6a29ed31..810cb3bfe66 100644 --- a/quickwit/quickwit-indexing/src/source/source_factory.rs +++ b/quickwit/quickwit-indexing/src/source/source_factory.rs @@ -18,33 +18,29 @@ // along with this program. If not, see . use std::collections::HashMap; -use std::sync::Arc; use async_trait::async_trait; use itertools::Itertools; -use quickwit_metastore::checkpoint::SourceCheckpoint; +use quickwit_proto::metastore::SourceType; use thiserror::Error; use super::Source; -use crate::source::SourceRuntimeArgs; +use crate::source::SourceRuntime; #[async_trait] -pub trait SourceFactory: 'static + Send + Sync { - async fn create_source( - &self, - ctx: Arc, - checkpoint: SourceCheckpoint, - ) -> anyhow::Result>; +pub trait SourceFactory: Send + Sync + 'static { + async fn create_source(&self, source_runtime: SourceRuntime) + -> anyhow::Result>; } #[async_trait] pub trait TypedSourceFactory: Send + Sync + 'static { type Source: Source; type Params: serde::de::DeserializeOwned + Send + Sync + 'static; + async fn typed_create_source( - ctx: Arc, - params: Self::Params, - checkpoint: SourceCheckpoint, + source_runtime: SourceRuntime, + source_params: Self::Params, ) -> anyhow::Result; } @@ -52,18 +48,18 @@ pub trait TypedSourceFactory: Send + Sync + 'static { impl SourceFactory for T { async fn create_source( &self, - ctx: Arc, - checkpoint: SourceCheckpoint, + source_runtime: SourceRuntime, ) -> anyhow::Result> { - let typed_params: T::Params = serde_json::from_value(ctx.source_config.params())?; - let file_source = Self::typed_create_source(ctx, typed_params, checkpoint).await?; - Ok(Box::new(file_source)) + let typed_params: T::Params = + serde_json::from_value(source_runtime.source_config.params())?; + let source = Self::typed_create_source(source_runtime, typed_params).await?; + Ok(Box::new(source)) } } #[derive(Default)] pub struct SourceLoader { - type_to_factory: HashMap>, + type_to_factory: HashMap>, } #[derive(Error, Debug)] @@ -73,39 +69,38 @@ pub enum SourceLoaderError { {available_source_types})" )] UnknownSourceType { - requested_source_type: String, + requested_source_type: SourceType, available_source_types: String, //< a comma separated list with the available source_type. }, #[error("failed to create source `{source_id}` of type `{source_type}`. Cause: {error:?}")] FailedToCreateSource { source_id: String, - source_type: String, + source_type: SourceType, #[source] error: anyhow::Error, }, } impl SourceLoader { - pub fn add_source(&mut self, source: S, factory: F) { + pub fn add_source(&mut self, source_type: SourceType, source_factory: F) { self.type_to_factory - .insert(source.to_string(), Box::new(factory)); + .insert(source_type, Box::new(source_factory)); } pub async fn load_source( &self, - ctx: Arc, - checkpoint: SourceCheckpoint, + source_runtime: SourceRuntime, ) -> Result, SourceLoaderError> { - let source_type = ctx.source_config.source_type().as_str().to_string(); - let source_id = ctx.source_id().to_string(); + let source_type = source_runtime.source_config.source_type(); + let source_id = source_runtime.source_id().to_string(); let source_factory = self.type_to_factory.get(&source_type).ok_or_else(|| { SourceLoaderError::UnknownSourceType { - requested_source_type: source_type.clone(), + requested_source_type: source_type, available_source_types: self.type_to_factory.keys().join(", "), } })?; source_factory - .create_source(ctx, checkpoint) + .create_source(source_runtime) .await .map_err(|error| SourceLoaderError::FailedToCreateSource { source_type, @@ -119,19 +114,17 @@ impl SourceLoader { mod tests { use std::num::NonZeroUsize; - use std::path::PathBuf; use quickwit_config::{SourceConfig, SourceInputFormat, SourceParams}; - use quickwit_metastore::metastore_for_test; use quickwit_proto::types::IndexUid; - use super::*; use crate::source::quickwit_supported_sources; + use crate::source::tests::SourceRuntimeBuilder; #[tokio::test] async fn test_source_loader_success() -> anyhow::Result<()> { - let metastore = metastore_for_test(); let source_loader = quickwit_supported_sources(); + let index_uid = IndexUid::new_with_random_ulid("test-index"); let source_config = SourceConfig { source_id: "test-source".to_string(), num_pipelines: NonZeroUsize::new(1).unwrap(), @@ -140,17 +133,8 @@ mod tests { transform_config: None, input_format: SourceInputFormat::Json, }; - source_loader - .load_source( - SourceRuntimeArgs::for_test( - IndexUid::new_with_random_ulid("test-index"), - source_config, - metastore, - PathBuf::from("./queues"), - ), - SourceCheckpoint::default(), - ) - .await?; + let source_runtime = SourceRuntimeBuilder::new(index_uid, source_config).build(); + source_loader.load_source(source_runtime).await?; Ok(()) } } diff --git a/quickwit/quickwit-indexing/src/source/vec_source.rs b/quickwit/quickwit-indexing/src/source/vec_source.rs index efbf3c13162..484e4801817 100644 --- a/quickwit/quickwit-indexing/src/source/vec_source.rs +++ b/quickwit/quickwit-indexing/src/source/vec_source.rs @@ -18,13 +18,12 @@ // along with this program. If not, see . use std::fmt; -use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; use quickwit_actors::{ActorExitStatus, Mailbox}; use quickwit_config::VecSourceParams; -use quickwit_metastore::checkpoint::{PartitionId, SourceCheckpoint, SourceCheckpointDelta}; +use quickwit_metastore::checkpoint::{PartitionId, SourceCheckpointDelta}; use quickwit_proto::metastore::SourceType; use quickwit_proto::types::Position; use serde_json::Value as JsonValue; @@ -32,12 +31,12 @@ use tracing::info; use super::BatchBuilder; use crate::actors::DocProcessor; -use crate::source::{Source, SourceContext, SourceRuntimeArgs, TypedSourceFactory}; +use crate::source::{Source, SourceContext, SourceRuntime, TypedSourceFactory}; pub struct VecSource { source_id: String, + source_params: VecSourceParams, next_item_idx: usize, - params: VecSourceParams, partition: PartitionId, } @@ -56,11 +55,11 @@ impl TypedSourceFactory for VecSourceFactory { type Source = VecSource; type Params = VecSourceParams; async fn typed_create_source( - ctx: Arc, - params: VecSourceParams, - checkpoint: SourceCheckpoint, + source_runtime: SourceRuntime, + source_params: VecSourceParams, ) -> anyhow::Result { - let partition = PartitionId::from(params.partition.as_str()); + let checkpoint = source_runtime.fetch_checkpoint().await?; + let partition = PartitionId::from(source_params.partition.as_str()); let next_item_idx = checkpoint .position_for_partition(&partition) .map(|position| { @@ -71,10 +70,10 @@ impl TypedSourceFactory for VecSourceFactory { }) .unwrap_or(0); Ok(VecSource { - source_id: ctx.source_id().to_string(), - next_item_idx, - params, + source_id: source_runtime.pipeline_id.source_id, + source_params, partition, + next_item_idx, }) } } @@ -95,9 +94,9 @@ impl Source for VecSource { ) -> Result { let mut batch_builder = BatchBuilder::new(SourceType::Vec); - for doc in self.params.docs[self.next_item_idx..] + for doc in self.source_params.docs[self.next_item_idx..] .iter() - .take(self.params.batch_num_docs) + .take(self.source_params.batch_num_docs) .cloned() { batch_builder.add_doc(doc); @@ -123,7 +122,7 @@ impl Source for VecSource { } fn name(&self) -> String { - format!("VecSource {{ source_id={} }}", self.source_id) + format!("{:?}", self) } fn observable_state(&self) -> JsonValue { @@ -136,17 +135,16 @@ impl Source for VecSource { #[cfg(test)] mod tests { use std::num::NonZeroUsize; - use std::path::PathBuf; use bytes::Bytes; use quickwit_actors::{Actor, Command, Universe}; use quickwit_config::{SourceConfig, SourceInputFormat, SourceParams}; - use quickwit_metastore::metastore_for_test; use quickwit_proto::types::IndexUid; use serde_json::json; use super::*; use crate::models::RawDocBatch; + use crate::source::tests::SourceRuntimeBuilder; use crate::source::SourceActor; #[tokio::test] @@ -161,6 +159,7 @@ mod tests { batch_num_docs: 3, partition: "partition".to_string(), }; + let index_uid = IndexUid::new_with_random_ulid("test-index"); let source_config = SourceConfig { source_id: "test-vec-source".to_string(), num_pipelines: NonZeroUsize::new(1).unwrap(), @@ -169,25 +168,15 @@ mod tests { transform_config: None, input_format: SourceInputFormat::Json, }; - let metastore = metastore_for_test(); - let vec_source = VecSourceFactory::typed_create_source( - SourceRuntimeArgs::for_test( - IndexUid::new_with_random_ulid("test-index"), - source_config, - metastore, - PathBuf::from("./queues"), - ), - params, - SourceCheckpoint::default(), - ) - .await?; + let source_runtime = SourceRuntimeBuilder::new(index_uid, source_config).build(); + let vec_source = VecSourceFactory::typed_create_source(source_runtime, params).await?; let vec_source_actor = SourceActor { source: Box::new(vec_source), doc_processor_mailbox, }; assert_eq!( vec_source_actor.name(), - "VecSource { source_id=test-vec-source }" + r#"VecSource { source_id: "test-vec-source" }"# ); let (_vec_source_mailbox, vec_source_handle) = universe.spawn_builder().spawn(vec_source_actor); @@ -218,9 +207,7 @@ mod tests { batch_num_docs: 3, partition: "".to_string(), }; - let mut checkpoint = SourceCheckpoint::default(); - checkpoint.try_apply_delta(SourceCheckpointDelta::from_range(0u64..2u64))?; - + let index_uid = IndexUid::new_with_random_ulid("test-index"); let source_config = SourceConfig { source_id: "test-vec-source".to_string(), num_pipelines: NonZeroUsize::new(1).unwrap(), @@ -229,18 +216,11 @@ mod tests { transform_config: None, input_format: SourceInputFormat::Json, }; - let metastore = metastore_for_test(); - let vec_source = VecSourceFactory::typed_create_source( - SourceRuntimeArgs::for_test( - IndexUid::new_with_random_ulid("test-index"), - source_config, - metastore, - PathBuf::from("./queues"), - ), - params, - checkpoint, - ) - .await?; + let source_delta = SourceCheckpointDelta::from_range(0u64..2u64); + let source_runtime = SourceRuntimeBuilder::new(index_uid, source_config) + .with_mock_metastore(Some(source_delta)) + .build(); + let vec_source = VecSourceFactory::typed_create_source(source_runtime, params).await?; let vec_source_actor = SourceActor { source: Box::new(vec_source), doc_processor_mailbox, diff --git a/quickwit/quickwit-indexing/src/source/void_source.rs b/quickwit/quickwit-indexing/src/source/void_source.rs index 2166986b183..8baa5e8a8e8 100644 --- a/quickwit/quickwit-indexing/src/source/void_source.rs +++ b/quickwit/quickwit-indexing/src/source/void_source.rs @@ -17,17 +17,15 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; use quickwit_actors::{ActorExitStatus, Mailbox, HEARTBEAT}; use quickwit_config::VoidSourceParams; -use quickwit_metastore::checkpoint::SourceCheckpoint; use serde_json::Value as JsonValue; use crate::actors::DocProcessor; -use crate::source::{Source, SourceContext, SourceRuntimeArgs, TypedSourceFactory}; +use crate::source::{Source, SourceContext, SourceRuntime, TypedSourceFactory}; pub struct VoidSource; @@ -60,9 +58,8 @@ impl TypedSourceFactory for VoidSourceFactory { type Params = VoidSourceParams; async fn typed_create_source( - _ctx: Arc, + _source_runtime: SourceRuntime, _params: VoidSourceParams, - _checkpoint: SourceCheckpoint, ) -> anyhow::Result { Ok(VoidSource) } @@ -72,20 +69,19 @@ impl TypedSourceFactory for VoidSourceFactory { mod tests { use std::num::NonZeroUsize; - use std::path::PathBuf; use quickwit_actors::{Health, Supervisable, Universe}; use quickwit_config::{SourceInputFormat, SourceParams}; - use quickwit_metastore::checkpoint::SourceCheckpoint; - use quickwit_metastore::metastore_for_test; use quickwit_proto::types::IndexUid; use serde_json::json; use super::*; + use crate::source::tests::SourceRuntimeBuilder; use crate::source::{quickwit_supported_sources, SourceActor, SourceConfig}; #[tokio::test] async fn test_void_source_loading() { + let index_uid = IndexUid::new_with_random_ulid("test-index"); let source_config = SourceConfig { source_id: "test-void-source".to_string(), num_pipelines: NonZeroUsize::new(1).unwrap(), @@ -94,15 +90,9 @@ mod tests { transform_config: None, input_format: SourceInputFormat::Json, }; - let metastore = metastore_for_test(); - let ctx = SourceRuntimeArgs::for_test( - IndexUid::new_with_random_ulid("test-index"), - source_config, - metastore, - PathBuf::from("./queues"), - ); + let source_runtime = SourceRuntimeBuilder::new(index_uid, source_config).build(); let source = quickwit_supported_sources() - .load_source(ctx, SourceCheckpoint::default()) + .load_source(source_runtime) .await .unwrap(); assert_eq!(source.name(), "VoidSource"); @@ -111,6 +101,7 @@ mod tests { #[tokio::test] async fn test_void_source_running() -> anyhow::Result<()> { let universe = Universe::with_accelerated_time(); + let index_uid = IndexUid::new_with_random_ulid("test-index"); let source_config = SourceConfig { source_id: "test-void-source".to_string(), num_pipelines: NonZeroUsize::new(1).unwrap(), @@ -119,18 +110,9 @@ mod tests { transform_config: None, input_format: SourceInputFormat::Json, }; - let metastore = metastore_for_test(); - let void_source = VoidSourceFactory::typed_create_source( - SourceRuntimeArgs::for_test( - IndexUid::new_with_random_ulid("test-index"), - source_config, - metastore, - PathBuf::from("./queues"), - ), - VoidSourceParams, - SourceCheckpoint::default(), - ) - .await?; + let source_runtime = SourceRuntimeBuilder::new(index_uid, source_config).build(); + let void_source = + VoidSourceFactory::typed_create_source(source_runtime, VoidSourceParams).await?; let (doc_processor_mailbox, _) = universe.create_test_mailbox(); let void_source_actor = SourceActor { source: Box::new(void_source), diff --git a/quickwit/quickwit-indexing/src/test_utils.rs b/quickwit/quickwit-indexing/src/test_utils.rs index 4dbe288fd9c..9270d6718e4 100644 --- a/quickwit/quickwit-indexing/src/test_utils.rs +++ b/quickwit/quickwit-indexing/src/test_utils.rs @@ -30,7 +30,7 @@ use quickwit_common::rand::append_random_suffix; use quickwit_common::uri::Uri; use quickwit_config::{ build_doc_mapper, ConfigFormat, IndexConfig, IndexerConfig, IngestApiConfig, MetastoreConfigs, - SourceConfig, SourceInputFormat, SourceParams, VecSourceParams, + SourceConfig, SourceInputFormat, SourceParams, VecSourceParams, INGEST_API_SOURCE_ID, }; use quickwit_doc_mapper::DocMapper; use quickwit_ingest::{init_ingest_api, IngesterPool, QUEUES_DIR_NAME}; @@ -90,23 +90,27 @@ impl TestSandbox { .iter() .map(|search_field| search_field.to_string()) .collect(); - let doc_mapper = - build_doc_mapper(&index_config.doc_mapping, &index_config.search_settings)?; - let temp_dir = tempfile::tempdir()?; - let indexer_config = IndexerConfig::for_test()?; - let num_blocking_threads = 1; + let source_config = SourceConfig::ingest_api_default(); let storage_resolver = StorageResolver::for_test(); let metastore_resolver = MetastoreResolver::configured(storage_resolver.clone(), &MetastoreConfigs::default()); let mut metastore = metastore_resolver .resolve(&Uri::for_test(METASTORE_URI)) .await?; - let create_index_request = CreateIndexRequest::try_from_index_config(&index_config)?; + let create_index_request = CreateIndexRequest::try_from_index_and_source_configs( + &index_config, + &[source_config.clone()], + )?; let index_uid: IndexUid = metastore .create_index(create_index_request) .await? .index_uid() .clone(); + let doc_mapper = + build_doc_mapper(&index_config.doc_mapping, &index_config.search_settings)?; + let temp_dir = tempfile::tempdir()?; + let indexer_config = IndexerConfig::for_test()?; + let num_blocking_threads = 1; let storage = storage_resolver.resolve(&index_uri).await?; let universe = Universe::with_accelerated_time(); let merge_scheduler_mailbox = universe.get_or_spawn_one(); @@ -157,7 +161,7 @@ impl TestSandbox { .collect(); let add_docs_id = self.add_docs_id.fetch_add(1, Ordering::SeqCst); let source_config = SourceConfig { - source_id: self.index_uid.index_id.to_string(), + source_id: INGEST_API_SOURCE_ID.to_string(), num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, source_params: SourceParams::Vec(VecSourceParams { diff --git a/quickwit/quickwit-proto/src/metastore/mod.rs b/quickwit/quickwit-proto/src/metastore/mod.rs index 0218c276225..6fbb7f3535e 100644 --- a/quickwit/quickwit-proto/src/metastore/mod.rs +++ b/quickwit/quickwit-proto/src/metastore/mod.rs @@ -241,6 +241,26 @@ impl SourceType { } } +impl fmt::Display for SourceType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let source_type_str = match self { + SourceType::Cli => "CLI ingest", + SourceType::File => "file", + SourceType::IngestV1 => "ingest API v1", + SourceType::IngestV2 => "ingest API v2", + SourceType::Kafka => "Apache Kafka", + SourceType::Kinesis => "Amazon Kinesis", + SourceType::Nats => "NATS", + SourceType::PubSub => "Google Cloud Pub/Sub", + SourceType::Pulsar => "Apache Pulsar", + SourceType::Unspecified => "unspecified", + SourceType::Vec => "vec", + SourceType::Void => "void", + }; + write!(f, "{}", source_type_str) + } +} + impl IndexMetadataRequest { pub fn for_index_id(index_id: IndexId) -> Self { Self {