From 6999fb212b34eaec2706bf3891e7d4d7fc3855e4 Mon Sep 17 00:00:00 2001 From: Adrien Guillo Date: Mon, 30 Oct 2023 15:32:25 -0400 Subject: [PATCH] Handle index/source not found and timeout/transport errors --- .../example/src/codegen/hello.rs | 10 +- quickwit/quickwit-codegen/src/codegen.rs | 7 +- .../quickwit-config/src/node_config/mod.rs | 2 +- .../src/control_plane.rs | 8 +- .../src/ingest/ingest_controller.rs | 137 +++--- quickwit/quickwit-control-plane/src/tests.rs | 3 - .../src/codegen/ingest_service.rs | 9 +- .../quickwit-ingest/src/ingest_v2/fetch.rs | 8 +- .../quickwit-ingest/src/ingest_v2/ingester.rs | 2 +- .../quickwit-ingest/src/ingest_v2/router.rs | 417 +++++++++++++++--- .../src/ingest_v2/workbench.rs | 298 +++++++++++-- quickwit/quickwit-metastore/src/checkpoint.rs | 3 +- .../file_backed_index/shards.rs | 4 + .../protos/quickwit/control_plane.proto | 31 +- .../protos/quickwit/ingest.proto | 68 +-- .../protos/quickwit/ingester.proto | 194 ++++---- .../protos/quickwit/metastore.proto | 60 +-- .../protos/quickwit/router.proto | 57 +-- .../protos/quickwit/search.proto | 48 +- .../quickwit/quickwit.control_plane.rs | 88 +++- .../src/codegen/quickwit/quickwit.indexing.rs | 11 +- .../quickwit/quickwit.ingest.ingester.rs | 11 +- .../quickwit/quickwit.ingest.router.rs | 32 +- .../codegen/quickwit/quickwit.metastore.rs | 33 +- .../quickwit-proto/src/control_plane/mod.rs | 4 +- quickwit/quickwit-proto/src/ingest/mod.rs | 41 +- 26 files changed, 1137 insertions(+), 449 deletions(-) diff --git a/quickwit/quickwit-codegen/example/src/codegen/hello.rs b/quickwit/quickwit-codegen/example/src/codegen/hello.rs index f217ea5606f..d8cff481d0d 100644 --- a/quickwit/quickwit-codegen/example/src/codegen/hello.rs +++ b/quickwit/quickwit-codegen/example/src/codegen/hello.rs @@ -105,6 +105,8 @@ impl HelloClient { ) -> hello_grpc_server::HelloGrpcServer { let adapter = HelloGrpcServerAdapter::new(self.clone()); hello_grpc_server::HelloGrpcServer::new(adapter) + .max_decoding_message_size(10 * 1024 * 1024) + .max_encoding_message_size(10 * 1024 * 1024) } pub fn from_channel( addr: std::net::SocketAddr, @@ -123,10 +125,10 @@ impl HelloClient { balance_channel: quickwit_common::tower::BalanceChannel, ) -> HelloClient { let connection_keys_watcher = balance_channel.connection_keys_watcher(); - let adapter = HelloGrpcClientAdapter::new( - hello_grpc_client::HelloGrpcClient::new(balance_channel), - connection_keys_watcher, - ); + let client = hello_grpc_client::HelloGrpcClient::new(balance_channel) + .max_decoding_message_size(10 * 1024 * 1024) + .max_encoding_message_size(10 * 1024 * 1024); + let adapter = HelloGrpcClientAdapter::new(client, connection_keys_watcher); Self::new(adapter) } pub fn from_mailbox(mailbox: quickwit_actors::Mailbox) -> Self diff --git a/quickwit/quickwit-codegen/src/codegen.rs b/quickwit/quickwit-codegen/src/codegen.rs index d2f21a43e43..5ff7e6dbbc5 100644 --- a/quickwit/quickwit-codegen/src/codegen.rs +++ b/quickwit/quickwit-codegen/src/codegen.rs @@ -573,6 +573,8 @@ fn generate_client(context: &CodegenContext) -> TokenStream { pub fn as_grpc_service(&self) -> #grpc_server_package_name::#grpc_server_name<#grpc_server_adapter_name> { let adapter = #grpc_server_adapter_name::new(self.clone()); #grpc_server_package_name::#grpc_server_name::new(adapter) + .max_decoding_message_size(10 * 1024 * 1024) + .max_encoding_message_size(10 * 1024 * 1024) } pub fn from_channel(addr: std::net::SocketAddr, channel: tonic::transport::Channel) -> Self @@ -585,7 +587,10 @@ fn generate_client(context: &CodegenContext) -> TokenStream { pub fn from_balance_channel(balance_channel: quickwit_common::tower::BalanceChannel) -> #client_name { let connection_keys_watcher = balance_channel.connection_keys_watcher(); - let adapter = #grpc_client_adapter_name::new(#grpc_client_package_name::#grpc_client_name::new(balance_channel), connection_keys_watcher); + let client = #grpc_client_package_name::#grpc_client_name::new(balance_channel) + .max_decoding_message_size(10 * 1024 * 1024) + .max_encoding_message_size(10 * 1024 * 1024); + let adapter = #grpc_client_adapter_name::new(client, connection_keys_watcher); Self::new(adapter) } diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index cadd04784fe..051208bef00 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -187,7 +187,7 @@ impl Default for IngestApiConfig { max_queue_memory_usage: Byte::from_bytes(2 * 1024 * 1024 * 1024), /* 2 GiB // TODO maybe we want more? */ max_queue_disk_usage: Byte::from_bytes(4 * 1024 * 1024 * 1024), /* 4 GiB // TODO maybe we want more? */ replication_factor: 1, - content_length_limit: 10 * 1024 * 1024, // 10 MB + content_length_limit: 10 * 1024 * 1024, // 10 MiB } } } diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index 3f366540ca5..15577bb5d7d 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -507,7 +507,6 @@ mod tests { #[tokio::test] async fn test_control_plane_add_source() { - quickwit_common::setup_logging_for_tests(); let universe = Universe::with_accelerated_time(); let cluster_id = "test-cluster".to_string(); @@ -564,7 +563,6 @@ mod tests { #[tokio::test] async fn test_control_plane_toggle_source() { - quickwit_common::setup_logging_for_tests(); let universe = Universe::with_accelerated_time(); let cluster_id = "test-cluster".to_string(); @@ -743,6 +741,7 @@ mod tests { ); let get_open_shards_request = GetOrCreateOpenShardsRequest { subrequests: vec![GetOrCreateOpenShardsSubrequest { + subrequest_id: 0, index_id: "test-index".to_string(), source_id: INGEST_SOURCE_ID.to_string(), }], @@ -753,9 +752,10 @@ mod tests { .ask_for_res(get_open_shards_request) .await .unwrap(); - assert_eq!(get_open_shards_response.subresponses.len(), 1); + assert_eq!(get_open_shards_response.successes.len(), 1); + assert_eq!(get_open_shards_response.failures.len(), 0); - let subresponse = &get_open_shards_response.subresponses[0]; + let subresponse = &get_open_shards_response.successes[0]; assert_eq!(subresponse.index_uid, "test-index:0"); assert_eq!(subresponse.source_id, INGEST_SOURCE_ID); assert_eq!(subresponse.open_shards.len(), 1); diff --git a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs index cff37a0274c..b2292ef6a74 100644 --- a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs +++ b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs @@ -25,15 +25,14 @@ use itertools::Itertools; use quickwit_common::{PrettySample, Progress}; use quickwit_ingest::IngesterPool; use quickwit_proto::control_plane::{ - ClosedShards, ControlPlaneError, ControlPlaneResult, GetOpenShardsSubresponse, - GetOrCreateOpenShardsRequest, GetOrCreateOpenShardsResponse, + ClosedShards, ControlPlaneError, ControlPlaneResult, GetOrCreateOpenShardsFailure, + GetOrCreateOpenShardsFailureReason, GetOrCreateOpenShardsRequest, + GetOrCreateOpenShardsResponse, GetOrCreateOpenShardsSuccess, }; use quickwit_proto::ingest::ingester::{IngesterService, PingRequest}; use quickwit_proto::ingest::{IngestV2Error, ShardState}; use quickwit_proto::metastore; -use quickwit_proto::metastore::{ - EntityKind, MetastoreError, MetastoreService, MetastoreServiceClient, -}; +use quickwit_proto::metastore::{MetastoreService, MetastoreServiceClient}; use quickwit_proto::types::{IndexUid, NodeId}; use rand::seq::SliceRandom; use tokio::time::timeout; @@ -232,9 +231,6 @@ impl IngestController { model: &mut ControlPlaneModel, progress: &Progress, ) -> ControlPlaneResult { - let mut get_open_shards_subresponses = - Vec::with_capacity(get_open_shards_request.subrequests.len()); - self.handle_closed_shards(get_open_shards_request.closed_shards, model); let mut unavailable_leaders: FnvHashSet = get_open_shards_request @@ -245,37 +241,44 @@ impl IngestController { self.handle_unavailable_leaders(&unavailable_leaders, model); + let num_subrequests = get_open_shards_request.subrequests.len(); + let mut get_or_create_open_shards_successes = Vec::with_capacity(num_subrequests); + let mut get_or_create_open_shards_failures = Vec::new(); let mut open_shards_subrequests = Vec::new(); for get_open_shards_subrequest in get_open_shards_request.subrequests { - let index_uid = model - .index_uid(&get_open_shards_subrequest.index_id) - .ok_or_else(|| { - MetastoreError::NotFound(EntityKind::Index { - index_id: get_open_shards_subrequest.index_id.clone(), - }) - })?; - - let (open_shards, next_shard_id) = model - .find_open_shards( - &index_uid, - &get_open_shards_subrequest.source_id, - &unavailable_leaders, - ) - .ok_or_else(|| { - MetastoreError::NotFound(EntityKind::Source { - index_id: get_open_shards_subrequest.index_id.clone(), - source_id: get_open_shards_subrequest.source_id.clone(), - }) - })?; - + let Some(index_uid) = model.index_uid(&get_open_shards_subrequest.index_id) else { + let get_or_create_open_shards_failure = GetOrCreateOpenShardsFailure { + subrequest_id: get_open_shards_subrequest.subrequest_id, + index_id: get_open_shards_subrequest.index_id, + source_id: get_open_shards_subrequest.source_id, + reason: GetOrCreateOpenShardsFailureReason::IndexNotFound as i32, + }; + get_or_create_open_shards_failures.push(get_or_create_open_shards_failure); + continue; + }; + let Some((open_shards, next_shard_id)) = model.find_open_shards( + &index_uid, + &get_open_shards_subrequest.source_id, + &unavailable_leaders, + ) else { + let get_or_create_open_shards_failure = GetOrCreateOpenShardsFailure { + subrequest_id: get_open_shards_subrequest.subrequest_id, + index_id: get_open_shards_subrequest.index_id, + source_id: get_open_shards_subrequest.source_id, + reason: GetOrCreateOpenShardsFailureReason::SourceNotFound as i32, + }; + get_or_create_open_shards_failures.push(get_or_create_open_shards_failure); + continue; + }; if !open_shards.is_empty() { - let get_open_shards_subresponse = GetOpenShardsSubresponse { + let get_or_create_open_shards_success = GetOrCreateOpenShardsSuccess { + subrequest_id: get_open_shards_subrequest.subrequest_id, index_uid: index_uid.into(), source_id: get_open_shards_subrequest.source_id, open_shards, }; - get_open_shards_subresponses.push(get_open_shards_subresponse); + get_or_create_open_shards_successes.push(get_or_create_open_shards_success); } else { // TODO: Find leaders in batches. // TODO: Round-robin leader-follower pairs or choose according to load. @@ -286,6 +289,7 @@ impl IngestController { ControlPlaneError::Unavailable("no available ingester".to_string()) })?; let open_shards_subrequest = metastore::OpenShardsSubrequest { + subrequest_id: get_open_shards_subrequest.subrequest_id, index_uid: index_uid.into(), source_id: get_open_shards_subrequest.source_id, leader_id: leader_id.into(), @@ -315,17 +319,19 @@ impl IngestController { if let Some((open_shards, _next_shard_id)) = model.find_open_shards(&index_uid, &source_id, &unavailable_leaders) { - let get_open_shards_subresponse = GetOpenShardsSubresponse { + let get_or_create_open_shards_success = GetOrCreateOpenShardsSuccess { + subrequest_id: open_shards_subresponse.subrequest_id, index_uid: index_uid.into(), - source_id, + source_id: open_shards_subresponse.source_id, open_shards, }; - get_open_shards_subresponses.push(get_open_shards_subresponse); + get_or_create_open_shards_successes.push(get_or_create_open_shards_success); } } } Ok(GetOrCreateOpenShardsResponse { - subresponses: get_open_shards_subresponses, + successes: get_or_create_open_shards_successes, + failures: get_or_create_open_shards_failures, }) } } @@ -575,6 +581,7 @@ mod tests { assert_eq!(&request.subrequests[0].source_id, source_id); let subresponses = vec![metastore::OpenShardsSubresponse { + subrequest_id: 1, index_uid: index_uid_1.clone().into(), source_id: source_id.to_string(), opened_shards: vec![Shard { @@ -657,17 +664,30 @@ mod tests { .await .unwrap(); - assert_eq!(response.subresponses.len(), 0); + assert_eq!(response.successes.len(), 0); + assert_eq!(response.failures.len(), 0); let subrequests = vec![ GetOrCreateOpenShardsSubrequest { + subrequest_id: 0, index_id: "test-index-0".to_string(), source_id: source_id.to_string(), }, GetOrCreateOpenShardsSubrequest { + subrequest_id: 1, index_id: "test-index-1".to_string(), source_id: source_id.to_string(), }, + GetOrCreateOpenShardsSubrequest { + subrequest_id: 2, + index_id: "index-not-found".to_string(), + source_id: "source-not-found".to_string(), + }, + GetOrCreateOpenShardsSubrequest { + subrequest_id: 3, + index_id: "test-index-0".to_string(), + source_id: "source-not-found".to_string(), + }, ]; let closed_shards = Vec::new(); let unavailable_leaders = vec!["test-ingester-0".to_string()]; @@ -681,24 +701,41 @@ mod tests { .await .unwrap(); - assert_eq!(response.subresponses.len(), 2); - - assert_eq!(response.subresponses[0].index_uid, index_uid_0.as_str()); - assert_eq!(response.subresponses[0].source_id, source_id); - assert_eq!(response.subresponses[0].open_shards.len(), 1); - assert_eq!(response.subresponses[0].open_shards[0].shard_id, 2); + assert_eq!(response.successes.len(), 2); + assert_eq!(response.failures.len(), 2); + + let success = &response.successes[0]; + assert_eq!(success.subrequest_id, 0); + assert_eq!(success.index_uid, index_uid_0.as_str()); + assert_eq!(success.source_id, source_id); + assert_eq!(success.open_shards.len(), 1); + assert_eq!(success.open_shards[0].shard_id, 2); + assert_eq!(success.open_shards[0].leader_id, "test-ingester-1"); + + let success = &response.successes[1]; + assert_eq!(success.subrequest_id, 1); + assert_eq!(success.index_uid, index_uid_1.as_str()); + assert_eq!(success.source_id, source_id); + assert_eq!(success.open_shards.len(), 1); + assert_eq!(success.open_shards[0].shard_id, 1); + assert_eq!(success.open_shards[0].leader_id, "test-ingester-2"); + + let failure = &response.failures[0]; + assert_eq!(failure.subrequest_id, 2); + assert_eq!(failure.index_id, "index-not-found"); + assert_eq!(failure.source_id, "source-not-found"); assert_eq!( - response.subresponses[0].open_shards[0].leader_id, - "test-ingester-1" + failure.reason(), + GetOrCreateOpenShardsFailureReason::IndexNotFound ); - assert_eq!(&response.subresponses[1].index_uid, index_uid_1.as_str()); - assert_eq!(response.subresponses[1].source_id, source_id); - assert_eq!(response.subresponses[1].open_shards.len(), 1); - assert_eq!(response.subresponses[1].open_shards[0].shard_id, 1); + let failure = &response.failures[1]; + assert_eq!(failure.subrequest_id, 3); + assert_eq!(failure.index_id, index_id_0); + assert_eq!(failure.source_id, "source-not-found"); assert_eq!( - response.subresponses[1].open_shards[0].leader_id, - "test-ingester-2" + failure.reason(), + GetOrCreateOpenShardsFailureReason::SourceNotFound ); assert_eq!(model.observable_state().num_shards, 2); diff --git a/quickwit/quickwit-control-plane/src/tests.rs b/quickwit/quickwit-control-plane/src/tests.rs index fcc99f660fc..384b6681316 100644 --- a/quickwit/quickwit-control-plane/src/tests.rs +++ b/quickwit/quickwit-control-plane/src/tests.rs @@ -156,7 +156,6 @@ async fn start_control_plane( #[tokio::test] async fn test_scheduler_scheduling_and_control_loop_apply_plan_again() { - quickwit_common::setup_logging_for_tests(); let transport = ChannelTransport::default(); let cluster = create_cluster_for_test(Vec::new(), &["indexer", "control_plane"], &transport, true) @@ -251,7 +250,6 @@ async fn test_scheduler_scheduling_and_control_loop_apply_plan_again() { #[tokio::test] async fn test_scheduler_scheduling_no_indexer() { - quickwit_common::setup_logging_for_tests(); let transport = ChannelTransport::default(); let cluster = create_cluster_for_test(Vec::new(), &["control_plane"], &transport, true) .await @@ -288,7 +286,6 @@ async fn test_scheduler_scheduling_no_indexer() { #[tokio::test] async fn test_scheduler_scheduling_multiple_indexers() { - quickwit_common::setup_logging_for_tests(); let transport = ChannelTransport::default(); let cluster = create_cluster_for_test(Vec::new(), &["control_plane"], &transport, true) .await diff --git a/quickwit/quickwit-ingest/src/codegen/ingest_service.rs b/quickwit/quickwit-ingest/src/codegen/ingest_service.rs index b2954db920f..7b51c460739 100644 --- a/quickwit/quickwit-ingest/src/codegen/ingest_service.rs +++ b/quickwit/quickwit-ingest/src/codegen/ingest_service.rs @@ -214,6 +214,8 @@ impl IngestServiceClient { > { let adapter = IngestServiceGrpcServerAdapter::new(self.clone()); ingest_service_grpc_server::IngestServiceGrpcServer::new(adapter) + .max_decoding_message_size(10 * 1024 * 1024) + .max_encoding_message_size(10 * 1024 * 1024) } pub fn from_channel( addr: std::net::SocketAddr, @@ -232,8 +234,13 @@ impl IngestServiceClient { balance_channel: quickwit_common::tower::BalanceChannel, ) -> IngestServiceClient { let connection_keys_watcher = balance_channel.connection_keys_watcher(); + let client = ingest_service_grpc_client::IngestServiceGrpcClient::new( + balance_channel, + ) + .max_decoding_message_size(10 * 1024 * 1024) + .max_encoding_message_size(10 * 1024 * 1024); let adapter = IngestServiceGrpcClientAdapter::new( - ingest_service_grpc_client::IngestServiceGrpcClient::new(balance_channel), + client, connection_keys_watcher, ); Self::new(adapter) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs index 9ace7de58f1..cdf00f0202f 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs @@ -117,7 +117,7 @@ impl FetchTask { fetch_range=?self.fetch_range, "spawning fetch task" ); - let mut has_drained_queue = true; + let mut has_drained_queue = false; let mut has_reached_eof = false; let mut num_records_total = 0; @@ -645,12 +645,6 @@ mod tests { .unwrap(); drop(state_guard); - timeout(Duration::from_millis(50), fetch_stream.next()) - .await - .unwrap_err(); - - new_records_tx.send(()).unwrap(); - let fetch_response = timeout(Duration::from_millis(50), fetch_stream.next()) .await .unwrap() diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 361b9fb7852..608e2f626d1 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -211,7 +211,7 @@ impl Ingester { .next() .await .expect("TODO") - .expect("") + .expect("TODO") .into_open_response() .expect("first message should be an open response"); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index 0cd74837ecb..1f1967b3580 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -36,9 +36,9 @@ use quickwit_proto::ingest::router::{ IngestRequestV2, IngestResponseV2, IngestRouterService, IngestSubrequest, }; use quickwit_proto::ingest::{CommitTypeV2, IngestV2Error, IngestV2Result}; -use quickwit_proto::types::{IndexUid, NodeId, ShardId, SourceId}; +use quickwit_proto::types::{IndexUid, NodeId, ShardId, SourceId, SubrequestId}; use tokio::sync::RwLock; -use tracing::warn; +use tracing::{error, warn}; use super::ingester::PERSIST_REQUEST_TIMEOUT; use super::shard_table::ShardTable; @@ -56,6 +56,8 @@ const MAX_PERSIST_ATTEMPTS: usize = 5; type LeaderId = String; +type PersistResult = (PersistRequestSummary, IngestV2Result); + #[derive(Clone)] pub struct IngestRouter { self_node_id: NodeId, @@ -124,6 +126,7 @@ impl IngestRouter { &mut unavailable_leaders, ) { let subrequest = GetOrCreateOpenShardsSubrequest { + subrequest_id: subrequest.subrequest_id, index_id: subrequest.index_id.clone(), source_id: subrequest.source_id.clone(), }; @@ -141,42 +144,49 @@ impl IngestRouter { /// shard table according to the response received. async fn populate_shard_table( &mut self, + workbench: &mut IngestWorkbench, request: GetOrCreateOpenShardsRequest, - ) -> IngestV2Result<()> { + ) { if request.subrequests.is_empty() { - return Ok(()); + return; } - let response = self - .control_plane - .get_or_create_open_shards(request) - .await?; - + let response = match self.control_plane.get_or_create_open_shards(request).await { + Ok(response) => response, + Err(control_plane_error) => { + if workbench.is_last_attempt() { + error!("failed to get open shards from control plane: {control_plane_error}"); + } else { + warn!("failed to get open shards from control plane: {control_plane_error}"); + }; + return; + } + }; let mut state_guard = self.state.write().await; - for subresponse in response.subresponses { + for success in response.successes { state_guard.shard_table.insert_shards( - subresponse.index_uid, - subresponse.source_id, - subresponse.open_shards, + success.index_uid, + success.source_id, + success.open_shards, ); } - Ok(()) + for failure in response.failures { + workbench.record_get_or_create_open_shards_failure(failure); + } } async fn process_persist_results( &mut self, workbench: &mut IngestWorkbench, - mut persist_futures: FuturesUnordered< - impl Future>, - >, + mut persist_futures: FuturesUnordered>, ) { let mut closed_shards: HashMap<(IndexUid, SourceId), Vec> = HashMap::new(); - while let Some(persist_result) = persist_futures.next().await { + while let Some((persist_summary, persist_result)) = persist_futures.next().await { match persist_result { Ok(persist_response) => { for persist_success in persist_response.successes { - workbench.record_success(persist_success); + workbench.record_persist_success(persist_success); } for persist_failure in persist_response.failures { if persist_failure.reason() == PersistFailureReason::ShardClosed { @@ -187,11 +197,39 @@ impl IngestRouter { .or_default() .push(persist_failure.shard_id); } - workbench.record_failure(persist_failure); + workbench.record_persist_failure(persist_failure); } } - Err(_persist_error) => { - // TODO + Err(persist_error) => { + if workbench.is_last_attempt() { + error!( + "failed to persist records on ingester `{}`: {persist_error}", + persist_summary.leader_id + ); + } else { + warn!( + "failed to persist records on ingester `{}`: {persist_error}", + persist_summary.leader_id + ); + } + match persist_error { + IngestV2Error::Timeout + | IngestV2Error::Transport { .. } + | IngestV2Error::IngesterUnavailable { .. } => { + for subrequest_id in persist_summary.subrequest_ids { + workbench.record_no_shards_available(subrequest_id); + } + self.ingester_pool.remove(&persist_summary.leader_id); + } + _ => { + for subrequest_id in persist_summary.subrequest_ids { + workbench.record_internal_error( + subrequest_id, + persist_error.to_string(), + ); + } + } + } } }; } @@ -214,15 +252,9 @@ impl IngestRouter { ) .await; - if let Err(error) = self - .populate_shard_table(get_or_create_open_shards_request) - .await - { - warn!( - "failed to obtain open shards from control plane: `{}`", - error - ); - } + self.populate_shard_table(workbench, get_or_create_open_shards_request) + .await; + // List of subrequest IDs for which no shards were available to route the subrequests to. let mut unavailable_subrequest_ids = Vec::new(); @@ -261,22 +293,31 @@ impl IngestRouter { for (leader_id, subrequests) in per_leader_persist_subrequests { let leader_id: NodeId = leader_id.clone().into(); + let subrequest_ids: Vec = subrequests + .iter() + .map(|subrequest| subrequest.subrequest_id) + .collect(); let Some(mut ingester) = self.ingester_pool.get(&leader_id) else { - let subrequest_ids = subrequests - .iter() - .map(|subrequest| subrequest.subrequest_id); unavailable_subrequest_ids.extend(subrequest_ids); continue; }; + let persist_summary = PersistRequestSummary { + leader_id: leader_id.clone(), + subrequest_ids, + }; let persist_request = PersistRequest { leader_id: leader_id.into(), subrequests, commit_type: commit_type as i32, }; let persist_future = async move { - tokio::time::timeout(PERSIST_REQUEST_TIMEOUT, ingester.persist(persist_request)) - .await - .map_err(|_| IngestV2Error::Timeout)? + let persist_result = tokio::time::timeout( + PERSIST_REQUEST_TIMEOUT, + ingester.persist(persist_request), + ) + .await + .unwrap_or_else(|_| Err(IngestV2Error::Timeout)); + (persist_summary, persist_result) }; persist_futures.push(persist_future); } @@ -294,14 +335,12 @@ impl IngestRouter { ingest_request: IngestRequestV2, max_num_attempts: usize, ) -> IngestV2Result { - let mut num_attempts = 0; - let commit_type = ingest_request.commit_type(); - let mut workbench = IngestWorkbench::new(ingest_request.subrequests); + let mut workbench = IngestWorkbench::new(ingest_request.subrequests, max_num_attempts); - while !workbench.is_complete() && num_attempts < max_num_attempts { + while !workbench.is_complete() { + workbench.new_attempt(); self.batch_persist(&mut workbench, commit_type).await; - num_attempts += 1; } workbench.into_ingest_response() } @@ -331,12 +370,20 @@ impl IngestRouterService for IngestRouter { } } +struct PersistRequestSummary { + leader_id: NodeId, + subrequest_ids: Vec, +} + #[cfg(test)] mod tests { use std::iter; use std::sync::atomic::AtomicUsize; - use quickwit_proto::control_plane::{GetOpenShardsSubresponse, GetOrCreateOpenShardsResponse}; + use quickwit_proto::control_plane::{ + GetOrCreateOpenShardsFailure, GetOrCreateOpenShardsFailureReason, + GetOrCreateOpenShardsResponse, GetOrCreateOpenShardsSuccess, + }; use quickwit_proto::ingest::ingester::{ IngesterServiceClient, PersistFailure, PersistResponse, PersistSuccess, }; @@ -346,6 +393,7 @@ mod tests { use super::*; use crate::ingest_v2::shard_table::ShardTableEntry; + use crate::ingest_v2::workbench::SubworkbenchFailure; #[tokio::test] async fn test_router_make_get_or_create_open_shard_request() { @@ -471,7 +519,7 @@ mod tests { .expect_get_or_create_open_shards() .once() .returning(|request| { - assert_eq!(request.subrequests.len(), 3); + assert_eq!(request.subrequests.len(), 4); let subrequest_0 = &request.subrequests[0]; assert_eq!(subrequest_0.index_id, "test-index-0"); @@ -482,12 +530,17 @@ mod tests { assert_eq!(subrequest_1.source_id, "test-source"); let subrequest_2 = &request.subrequests[2]; - assert_eq!(subrequest_2.index_id, "test-index-2"); + assert_eq!(subrequest_2.index_id, "index-not-found"); assert_eq!(subrequest_2.source_id, "test-source"); + let subrequest_3 = &request.subrequests[3]; + assert_eq!(subrequest_3.index_id, "test-index-0"); + assert_eq!(subrequest_3.source_id, "source-not-found"); + let response = GetOrCreateOpenShardsResponse { - subresponses: vec![ - GetOpenShardsSubresponse { + successes: vec![ + GetOrCreateOpenShardsSuccess { + subrequest_id: 0, index_uid: "test-index-0:0".to_string(), source_id: "test-source".to_string(), open_shards: vec![Shard { @@ -496,7 +549,8 @@ mod tests { ..Default::default() }], }, - GetOpenShardsSubresponse { + GetOrCreateOpenShardsSuccess { + subrequest_id: 1, index_uid: "test-index-1:0".to_string(), source_id: "test-source".to_string(), open_shards: vec![ @@ -513,6 +567,20 @@ mod tests { ], }, ], + failures: vec![ + GetOrCreateOpenShardsFailure { + subrequest_id: 2, + index_id: "index-not-found".to_string(), + source_id: "test-source".to_string(), + reason: GetOrCreateOpenShardsFailureReason::IndexNotFound as i32, + }, + GetOrCreateOpenShardsFailure { + subrequest_id: 3, + index_id: "test-index-0".to_string(), + source_id: "source-not-found".to_string(), + reason: GetOrCreateOpenShardsFailureReason::SourceNotFound as i32, + }, + ], }; Ok(response) }); @@ -530,34 +598,70 @@ mod tests { closed_shards: Vec::new(), unavailable_leaders: Vec::new(), }; + let mut workbench = IngestWorkbench::new(Vec::new(), 2); + router - .populate_shard_table(get_or_create_open_shards_request) - .await - .unwrap(); + .populate_shard_table(&mut workbench, get_or_create_open_shards_request) + .await; assert!(router.state.read().await.shard_table.is_empty()); + let ingest_subrequests = vec![ + IngestSubrequest { + subrequest_id: 0, + index_id: "test-index-0".to_string(), + source_id: "test-source".to_string(), + ..Default::default() + }, + IngestSubrequest { + subrequest_id: 1, + index_id: "test-index-1".to_string(), + source_id: "test-source".to_string(), + ..Default::default() + }, + IngestSubrequest { + subrequest_id: 2, + index_id: "index-not-found".to_string(), + source_id: "test-source".to_string(), + ..Default::default() + }, + IngestSubrequest { + subrequest_id: 3, + index_id: "source-not-found".to_string(), + source_id: "test-source".to_string(), + ..Default::default() + }, + ]; + let mut workbench = IngestWorkbench::new(ingest_subrequests, 2); + let get_or_create_open_shards_request = GetOrCreateOpenShardsRequest { subrequests: vec![ GetOrCreateOpenShardsSubrequest { + subrequest_id: 0, index_id: "test-index-0".to_string(), source_id: "test-source".to_string(), }, GetOrCreateOpenShardsSubrequest { + subrequest_id: 1, index_id: "test-index-1".to_string(), source_id: "test-source".to_string(), }, GetOrCreateOpenShardsSubrequest { - index_id: "test-index-2".to_string(), + subrequest_id: 2, + index_id: "index-not-found".to_string(), source_id: "test-source".to_string(), }, + GetOrCreateOpenShardsSubrequest { + subrequest_id: 3, + index_id: "test-index-0".to_string(), + source_id: "source-not-found".to_string(), + }, ], closed_shards: Vec::new(), unavailable_leaders: Vec::new(), }; router - .populate_shard_table(get_or_create_open_shards_request) - .await - .unwrap(); + .populate_shard_table(&mut workbench, get_or_create_open_shards_request) + .await; let state_guard = router.state.read().await; let shard_table = &state_guard.shard_table; @@ -575,6 +679,118 @@ mod tests { assert_eq!(routing_entry_1.len(), 2); assert_eq!(routing_entry_1.shards()[0].shard_id, 1); assert_eq!(routing_entry_1.shards()[1].shard_id, 2); + + let subworkbench = workbench.subworkbenches.get(&2).unwrap(); + assert!(matches!( + subworkbench.last_failure_opt, + Some(SubworkbenchFailure::IndexNotFound) + )); + + let subworkbench = workbench.subworkbenches.get(&3).unwrap(); + assert!(matches!( + subworkbench.last_failure_opt, + Some(SubworkbenchFailure::SourceNotFound) + )); + } + + #[tokio::test] + async fn test_router_process_persist_results_record_persist_successes() { + let self_node_id = "test-router".into(); + let control_plane = ControlPlaneServiceClient::mock().into(); + let ingester_pool = IngesterPool::default(); + let replication_factor = 1; + let mut router = IngestRouter::new( + self_node_id, + control_plane, + ingester_pool.clone(), + replication_factor, + ); + let ingest_subrequests = vec![IngestSubrequest { + subrequest_id: 0, + index_id: "test-index-0".to_string(), + source_id: "test-source".to_string(), + ..Default::default() + }]; + let mut workbench = IngestWorkbench::new(ingest_subrequests, 2); + let persist_futures = FuturesUnordered::new(); + + persist_futures.push(async { + let persist_summary = PersistRequestSummary { + leader_id: "test-ingester-0".into(), + subrequest_ids: vec![0], + }; + let persist_result = Ok::<_, IngestV2Error>(PersistResponse { + leader_id: "test-ingester-0".to_string(), + successes: vec![PersistSuccess { + subrequest_id: 0, + index_uid: "test-index-0:0".to_string(), + source_id: "test-source".to_string(), + shard_id: 1, + ..Default::default() + }], + failures: Vec::new(), + }); + (persist_summary, persist_result) + }); + router + .process_persist_results(&mut workbench, persist_futures) + .await; + + let subworkbench = workbench.subworkbenches.get(&0).unwrap(); + assert!(matches!( + subworkbench.persist_success_opt, + Some(PersistSuccess { .. }) + )); + } + + #[tokio::test] + async fn test_router_process_persist_results_record_persist_failures() { + let self_node_id = "test-router".into(); + let control_plane = ControlPlaneServiceClient::mock().into(); + let ingester_pool = IngesterPool::default(); + let replication_factor = 1; + let mut router = IngestRouter::new( + self_node_id, + control_plane, + ingester_pool.clone(), + replication_factor, + ); + let ingest_subrequests = vec![IngestSubrequest { + subrequest_id: 0, + index_id: "test-index-0".to_string(), + source_id: "test-source".to_string(), + ..Default::default() + }]; + let mut workbench = IngestWorkbench::new(ingest_subrequests, 2); + let persist_futures = FuturesUnordered::new(); + + persist_futures.push(async { + let persist_summary = PersistRequestSummary { + leader_id: "test-ingester-0".into(), + subrequest_ids: vec![0], + }; + let persist_result = Ok::<_, IngestV2Error>(PersistResponse { + leader_id: "test-ingester-0".to_string(), + successes: Vec::new(), + failures: vec![PersistFailure { + subrequest_id: 0, + index_uid: "test-index-0:0".to_string(), + source_id: "test-source".to_string(), + shard_id: 1, + reason: PersistFailureReason::RateLimited as i32, + }], + }); + (persist_summary, persist_result) + }); + router + .process_persist_results(&mut workbench, persist_futures) + .await; + + let subworkbench = workbench.subworkbenches.get(&0).unwrap(); + assert!(matches!( + subworkbench.last_failure_opt, + Some(SubworkbenchFailure::Persist { .. }) + )); } #[tokio::test] @@ -603,11 +819,15 @@ mod tests { ); drop(state_guard); - let mut workbench = IngestWorkbench::new(Vec::new()); + let mut workbench = IngestWorkbench::new(Vec::new(), 2); let persist_futures = FuturesUnordered::new(); persist_futures.push(async { - Ok::<_, IngestV2Error>(PersistResponse { + let persist_summary = PersistRequestSummary { + leader_id: "test-ingester-0".into(), + subrequest_ids: vec![0], + }; + let persist_result = Ok::<_, IngestV2Error>(PersistResponse { leader_id: "test-ingester-0".to_string(), successes: Vec::new(), failures: vec![PersistFailure { @@ -617,7 +837,8 @@ mod tests { shard_id: 1, reason: PersistFailureReason::ShardClosed as i32, }], - }) + }); + (persist_summary, persist_result) }); router .process_persist_results(&mut workbench, persist_futures) @@ -636,6 +857,86 @@ mod tests { ); } + #[tokio::test] + async fn test_router_process_persist_results_removes_unavailable_leaders() { + let self_node_id = "test-router".into(); + let control_plane = ControlPlaneServiceClient::mock().into(); + + let ingester_pool = IngesterPool::default(); + ingester_pool.insert( + "test-ingester-0".into(), + IngesterServiceClient::mock().into(), + ); + ingester_pool.insert( + "test-ingester-1".into(), + IngesterServiceClient::mock().into(), + ); + + let replication_factor = 1; + let mut router = IngestRouter::new( + self_node_id, + control_plane, + ingester_pool.clone(), + replication_factor, + ); + let ingest_subrequests = vec![ + IngestSubrequest { + subrequest_id: 0, + index_id: "test-index-0".to_string(), + source_id: "test-source".to_string(), + ..Default::default() + }, + IngestSubrequest { + subrequest_id: 1, + index_id: "test-index-1".to_string(), + source_id: "test-source".to_string(), + ..Default::default() + }, + ]; + let mut workbench = IngestWorkbench::new(ingest_subrequests, 2); + let persist_futures = FuturesUnordered::new(); + + persist_futures.push(async { + let persist_summary = PersistRequestSummary { + leader_id: "test-ingester-0".into(), + subrequest_ids: vec![0], + }; + let persist_result = Err::<_, IngestV2Error>(IngestV2Error::Timeout); + (persist_summary, persist_result) + }); + router + .process_persist_results(&mut workbench, persist_futures) + .await; + + let subworkbench = workbench.subworkbenches.get(&0).unwrap(); + assert!(matches!( + subworkbench.last_failure_opt, + Some(SubworkbenchFailure::NoShardsAvailable { .. }) + )); + + let persist_futures = FuturesUnordered::new(); + + persist_futures.push(async { + let persist_summary = PersistRequestSummary { + leader_id: "test-ingester-1".into(), + subrequest_ids: vec![1], + }; + let persist_result = + Err::<_, IngestV2Error>(IngestV2Error::Transport("transport error".to_string())); + (persist_summary, persist_result) + }); + router + .process_persist_results(&mut workbench, persist_futures) + .await; + let subworkbench = workbench.subworkbenches.get(&1).unwrap(); + assert!(matches!( + subworkbench.last_failure_opt, + Some(SubworkbenchFailure::NoShardsAvailable { .. }) + )); + + assert!(ingester_pool.is_empty()); + } + #[tokio::test] async fn test_router_ingest() { let self_node_id = "test-router".into(); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs b/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs index 018cc246f56..64ee8a41d4b 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs @@ -19,7 +19,10 @@ use std::collections::HashMap; -use quickwit_proto::ingest::ingester::{PersistFailure, PersistSuccess}; +use quickwit_proto::control_plane::{ + GetOrCreateOpenShardsFailure, GetOrCreateOpenShardsFailureReason, +}; +use quickwit_proto::ingest::ingester::{PersistFailure, PersistFailureReason, PersistSuccess}; use quickwit_proto::ingest::router::{ IngestFailure, IngestFailureReason, IngestResponseV2, IngestSubrequest, IngestSuccess, }; @@ -29,14 +32,18 @@ use tracing::warn; /// A helper struct for managing the state of the subrequests of an ingest request during multiple /// persist attempts. +#[derive(Default)] pub(super) struct IngestWorkbench { - subworkbenches: HashMap, - num_successes: usize, + pub subworkbenches: HashMap, + pub num_successes: usize, + /// The number of batch persist attempts. This is not sum of the number of attempts for each subrequest. + pub num_attempts: usize, + pub max_num_attempts: usize, } impl IngestWorkbench { - pub fn new(subrequests: Vec) -> Self { - let subworkbenches = subrequests + pub fn new(ingest_subrequests: Vec, max_num_attempts: usize) -> Self { + let subworkbenches: HashMap = ingest_subrequests .into_iter() .map(|subrequest| { ( @@ -48,14 +55,35 @@ impl IngestWorkbench { Self { subworkbenches, - num_successes: 0, + max_num_attempts, + ..Default::default() } } + pub fn new_attempt(&mut self) { + self.num_attempts += 1; + } + + pub fn is_complete(&self) -> bool { + self.num_successes >= self.subworkbenches.len() + || self.num_attempts >= self.max_num_attempts + || self.has_no_pending_subrequests() + } + + pub fn is_last_attempt(&self) -> bool { + self.num_attempts >= self.max_num_attempts + } + + fn has_no_pending_subrequests(&self) -> bool { + self.subworkbenches + .values() + .all(|subworbench| !subworbench.is_pending()) + } + #[cfg(not(test))] pub fn pending_subrequests(&self) -> impl Iterator { self.subworkbenches.values().filter_map(|subworbench| { - if !subworbench.is_success() { + if subworbench.is_pending() { Some(&subworbench.subrequest) } else { None @@ -63,7 +91,41 @@ impl IngestWorkbench { }) } - pub fn record_success(&mut self, persist_success: PersistSuccess) { + pub fn record_get_or_create_open_shards_failure( + &mut self, + open_shards_failure: GetOrCreateOpenShardsFailure, + ) { + let Some(subworkbench) = self + .subworkbenches + .get_mut(&open_shards_failure.subrequest_id) + else { + warn!( + "could not find subrequest `{}` in workbench", + open_shards_failure.subrequest_id + ); + return; + }; + subworkbench.num_attempts += 1; + + let last_failure = match open_shards_failure.reason() { + GetOrCreateOpenShardsFailureReason::IndexNotFound => { + SubworkbenchFailure::IndexNotFound + } + GetOrCreateOpenShardsFailureReason::SourceNotFound => { + SubworkbenchFailure::SourceNotFound + } + GetOrCreateOpenShardsFailureReason::Unspecified => { + warn!( + "failure reason for subrequest `{}` is unspecified", + open_shards_failure.subrequest_id + ); + SubworkbenchFailure::Internal("unspecified".to_string()) + } + }; + subworkbench.last_failure_opt = Some(last_failure); + } + + pub fn record_persist_success(&mut self, persist_success: PersistSuccess) { let Some(subworkbench) = self.subworkbenches.get_mut(&persist_success.subrequest_id) else { warn!( "could not find subrequest `{}` in workbench", @@ -76,7 +138,7 @@ impl IngestWorkbench { subworkbench.persist_success_opt = Some(persist_success); } - pub fn record_failure(&mut self, persist_failure: PersistFailure) { + pub fn record_persist_failure(&mut self, persist_failure: PersistFailure) { let Some(subworkbench) = self.subworkbenches.get_mut(&persist_failure.subrequest_id) else { warn!( "could not find subrequest `{}` in workbench", @@ -85,7 +147,7 @@ impl IngestWorkbench { return; }; subworkbench.num_attempts += 1; - subworkbench.last_attempt_failure_opt = Some(SubworkbenchFailure::Persist(persist_failure)); + subworkbench.last_failure_opt = Some(SubworkbenchFailure::Persist(persist_failure)); } pub fn record_no_shards_available(&mut self, subrequest_id: SubrequestId) { @@ -94,11 +156,16 @@ impl IngestWorkbench { return; }; subworkbench.num_attempts += 1; - subworkbench.last_attempt_failure_opt = Some(SubworkbenchFailure::NoShardsAvailable); + subworkbench.last_failure_opt = Some(SubworkbenchFailure::NoShardsAvailable); } - pub fn is_complete(&self) -> bool { - self.num_successes == self.subworkbenches.len() + pub fn record_internal_error(&mut self, subrequest_id: SubrequestId, error_message: String) { + let Some(subworkbench) = self.subworkbenches.get_mut(&subrequest_id) else { + warn!("could not find subrequest `{}` in workbench", subrequest_id); + return; + }; + subworkbench.num_attempts += 1; + subworkbench.last_failure_opt = Some(SubworkbenchFailure::Internal(error_message)); } pub fn into_ingest_response(self) -> IngestV2Result { @@ -116,7 +183,7 @@ impl IngestWorkbench { replication_position_inclusive: persist_success.replication_position_inclusive, }; successes.push(success); - } else if let Some(failure) = subworkbench.last_attempt_failure_opt { + } else if let Some(failure) = subworkbench.last_failure_opt { let failure = IngestFailure { subrequest_id: subworkbench.subrequest.subrequest_id, index_id: subworkbench.subrequest.index_id, @@ -141,7 +208,7 @@ impl IngestWorkbench { self.subworkbenches .values() .filter_map(|subworbench| { - if !subworbench.is_success() { + if subworbench.is_pending() { Some(&subworbench.subrequest) } else { None @@ -152,40 +219,61 @@ impl IngestWorkbench { } #[derive(Debug)] -enum SubworkbenchFailure { +pub(super) enum SubworkbenchFailure { + IndexNotFound, + SourceNotFound, NoShardsAvailable, Persist(PersistFailure), + Internal(String), } impl SubworkbenchFailure { fn reason(&self) -> IngestFailureReason { - // TODO: Return a better failure reason for `Self::Persist`. match self { + Self::IndexNotFound => IngestFailureReason::IndexNotFound, + Self::SourceNotFound => IngestFailureReason::SourceNotFound, + Self::Internal(_) => IngestFailureReason::Internal, Self::NoShardsAvailable => IngestFailureReason::NoShardsAvailable, - Self::Persist(_) => IngestFailureReason::Unspecified, + Self::Persist(persist_failure) => match persist_failure.reason() { + PersistFailureReason::RateLimited => IngestFailureReason::RateLimited, + PersistFailureReason::ResourceExhausted => IngestFailureReason::ResourceExhausted, + PersistFailureReason::ShardClosed => IngestFailureReason::NoShardsAvailable, + PersistFailureReason::Unspecified => IngestFailureReason::Unspecified, + }, } } } +#[derive(Debug, Default)] pub(super) struct IngestSubworkbench { - subrequest: IngestSubrequest, - persist_success_opt: Option, - last_attempt_failure_opt: Option, - num_attempts: usize, + pub subrequest: IngestSubrequest, + pub persist_success_opt: Option, + pub last_failure_opt: Option, + /// The number of persist attempts for this subrequest. + pub num_attempts: usize, } impl IngestSubworkbench { pub fn new(subrequest: IngestSubrequest) -> Self { Self { subrequest, - persist_success_opt: None, - last_attempt_failure_opt: None, - num_attempts: 0, + ..Default::default() } } - pub fn is_success(&self) -> bool { - self.persist_success_opt.is_some() + pub fn is_pending(&self) -> bool { + self.persist_success_opt.is_none() && self.last_failure_is_transient() + } + + fn last_failure_is_transient(&self) -> bool { + match self.last_failure_opt { + Some(SubworkbenchFailure::IndexNotFound) => false, + Some(SubworkbenchFailure::SourceNotFound) => false, + Some(SubworkbenchFailure::Internal(_)) => false, + Some(SubworkbenchFailure::NoShardsAvailable) => true, + Some(SubworkbenchFailure::Persist(_)) => true, + None => true, + } } } @@ -199,18 +287,41 @@ mod tests { ..Default::default() }; let mut subworkbench = IngestSubworkbench::new(subrequest); - assert!(!subworkbench.is_success()); + assert!(subworkbench.is_pending()); + assert!(subworkbench.last_failure_is_transient()); + + subworkbench.last_failure_opt = Some(SubworkbenchFailure::NoShardsAvailable); + assert!(subworkbench.is_pending()); + assert!(subworkbench.last_failure_is_transient()); + + subworkbench.last_failure_opt = Some(SubworkbenchFailure::IndexNotFound); + assert!(!subworkbench.is_pending()); + assert!(!subworkbench.last_failure_is_transient()); let persist_success = PersistSuccess { ..Default::default() }; subworkbench.persist_success_opt = Some(persist_success); - assert!(subworkbench.is_success()); + assert!(!subworkbench.is_pending()); } #[test] fn test_ingest_workbench() { - let subrequests = vec![ + let workbench = IngestWorkbench::new(Vec::new(), 1); + assert!(workbench.is_complete()); + + let ingest_subrequests = vec![IngestSubrequest { + ..Default::default() + }]; + let mut workbench = IngestWorkbench::new(ingest_subrequests, 1); + assert!(!workbench.is_last_attempt()); + assert!(!workbench.is_complete()); + + workbench.new_attempt(); + assert!(workbench.is_last_attempt()); + assert!(workbench.is_complete()); + + let ingest_subrequests = vec![ IngestSubrequest { subrequest_id: 0, ..Default::default() @@ -220,7 +331,7 @@ mod tests { ..Default::default() }, ]; - let mut workbench = IngestWorkbench::new(subrequests); + let mut workbench = IngestWorkbench::new(ingest_subrequests, 1); assert_eq!(workbench.pending_subrequests().count(), 2); assert!(!workbench.is_complete()); @@ -228,7 +339,7 @@ mod tests { subrequest_id: 0, ..Default::default() }; - workbench.record_success(persist_success); + workbench.record_persist_success(persist_success); assert_eq!(workbench.num_successes, 1); assert_eq!(workbench.pending_subrequests().count(), 1); @@ -243,13 +354,13 @@ mod tests { let subworkbench = workbench.subworkbenches.get(&0).unwrap(); assert_eq!(subworkbench.num_attempts, 1); - assert!(subworkbench.is_success()); + assert!(!subworkbench.is_pending()); let persist_failure = PersistFailure { subrequest_id: 1, ..Default::default() }; - workbench.record_failure(persist_failure); + workbench.record_persist_failure(persist_failure); assert_eq!(workbench.num_successes, 1); assert_eq!(workbench.pending_subrequests().count(), 1); @@ -264,16 +375,131 @@ mod tests { let subworkbench = workbench.subworkbenches.get(&1).unwrap(); assert_eq!(subworkbench.num_attempts, 1); - assert!(subworkbench.last_attempt_failure_opt.is_some()); + assert!(subworkbench.last_failure_opt.is_some()); let persist_success = PersistSuccess { subrequest_id: 1, ..Default::default() }; - workbench.record_success(persist_success); + workbench.record_persist_success(persist_success); assert!(workbench.is_complete()); assert_eq!(workbench.num_successes, 2); assert_eq!(workbench.pending_subrequests().count(), 0); } + + #[test] + fn test_ingest_workbench_record_get_or_create_open_shards_failure() { + let ingest_subrequests = vec![IngestSubrequest { + subrequest_id: 0, + ..Default::default() + }]; + let mut workbench = IngestWorkbench::new(ingest_subrequests, 1); + + let get_or_create_open_shards_failure = GetOrCreateOpenShardsFailure { + subrequest_id: 42, + reason: GetOrCreateOpenShardsFailureReason::IndexNotFound as i32, + ..Default::default() + }; + workbench.record_get_or_create_open_shards_failure(get_or_create_open_shards_failure); + + let get_or_create_open_shards_failure = GetOrCreateOpenShardsFailure { + subrequest_id: 0, + reason: GetOrCreateOpenShardsFailureReason::SourceNotFound as i32, + ..Default::default() + }; + workbench.record_get_or_create_open_shards_failure(get_or_create_open_shards_failure); + + assert_eq!(workbench.num_successes, 0); + + let subworkbench = workbench.subworkbenches.get(&0).unwrap(); + assert!(matches!( + subworkbench.last_failure_opt, + Some(SubworkbenchFailure::SourceNotFound) + )); + assert_eq!(subworkbench.num_attempts, 1); + } + + #[test] + fn test_ingest_workbench_record_persist_success() { + let ingest_subrequests = vec![IngestSubrequest { + subrequest_id: 0, + ..Default::default() + }]; + let mut workbench = IngestWorkbench::new(ingest_subrequests, 1); + + let persist_success = PersistSuccess { + subrequest_id: 42, + ..Default::default() + }; + workbench.record_persist_success(persist_success); + + let persist_success = PersistSuccess { + subrequest_id: 0, + ..Default::default() + }; + workbench.record_persist_success(persist_success); + + assert_eq!(workbench.num_successes, 1); + + let subworkbench = workbench.subworkbenches.get(&0).unwrap(); + assert!(matches!( + subworkbench.persist_success_opt, + Some(PersistSuccess { .. }) + )); + assert_eq!(subworkbench.num_attempts, 1); + } + + #[test] + fn test_ingest_workbench_record_persist_failure() { + let ingest_subrequests = vec![IngestSubrequest { + subrequest_id: 0, + ..Default::default() + }]; + let mut workbench = IngestWorkbench::new(ingest_subrequests, 1); + + let persist_failure = PersistFailure { + subrequest_id: 42, + reason: PersistFailureReason::RateLimited as i32, + ..Default::default() + }; + workbench.record_persist_failure(persist_failure); + + let persist_failure = PersistFailure { + subrequest_id: 0, + reason: PersistFailureReason::ResourceExhausted as i32, + ..Default::default() + }; + workbench.record_persist_failure(persist_failure); + + assert_eq!(workbench.num_successes, 0); + + let subworkbench = workbench.subworkbenches.get(&0).unwrap(); + assert!(matches!( + subworkbench.last_failure_opt, + Some(SubworkbenchFailure::Persist ( PersistFailure { reason, .. })) if reason == PersistFailureReason::ResourceExhausted as i32 + )); + assert_eq!(subworkbench.num_attempts, 1); + } + + #[test] + fn test_ingest_workbench_record_no_shards_available() { + let ingest_subrequests = vec![IngestSubrequest { + subrequest_id: 0, + ..Default::default() + }]; + let mut workbench = IngestWorkbench::new(ingest_subrequests, 1); + + workbench.record_no_shards_available(42); + workbench.record_no_shards_available(0); + + assert_eq!(workbench.num_successes, 0); + + let subworkbench = workbench.subworkbenches.get(&0).unwrap(); + assert!(matches!( + subworkbench.last_failure_opt, + Some(SubworkbenchFailure::NoShardsAvailable) + )); + assert_eq!(subworkbench.num_attempts, 1); + } } diff --git a/quickwit/quickwit-metastore/src/checkpoint.rs b/quickwit/quickwit-metastore/src/checkpoint.rs index 2ccaee9f710..f9862102b06 100644 --- a/quickwit/quickwit-metastore/src/checkpoint.rs +++ b/quickwit/quickwit-metastore/src/checkpoint.rs @@ -286,7 +286,6 @@ impl SourceCheckpoint { &self, delta: &SourceCheckpointDelta, ) -> Result<(), IncompatibleCheckpointDelta> { - info!(delta=?delta, checkpoint=?self); for (delta_partition, delta_position) in &delta.per_partition { let Some(position) = self.per_partition.get(delta_partition) else { continue; @@ -329,6 +328,8 @@ impl SourceCheckpoint { delta: SourceCheckpointDelta, ) -> Result<(), IncompatibleCheckpointDelta> { self.check_compatibility(&delta)?; + info!(delta=?delta, checkpoint=?self, "applying delta to checkpoint"); + for (partition_id, partition_position) in delta.per_partition { self.per_partition .insert(partition_id, partition_position.to); diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/file_backed_index/shards.rs b/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/file_backed_index/shards.rs index 79c9949814e..175e0db1ec0 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/file_backed_index/shards.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/file_backed_index/shards.rs @@ -158,6 +158,7 @@ impl Shards { let next_shard_id = self.next_shard_id; let response = OpenShardsSubresponse { + subrequest_id: subrequest.subrequest_id, index_uid: subrequest.index_uid, source_id: subrequest.source_id, opened_shards, @@ -335,6 +336,7 @@ mod tests { let mut shards = Shards::empty(index_uid.clone(), source_id.clone()); let subrequest = OpenShardsSubrequest { + subrequest_id: 0, index_uid: index_uid.clone().into(), source_id: source_id.clone(), leader_id: "leader_id".to_string(), @@ -371,6 +373,7 @@ mod tests { assert_eq!(shards.next_shard_id, 2); let subrequest = OpenShardsSubrequest { + subrequest_id: 0, index_uid: index_uid.clone().into(), source_id: source_id.clone(), leader_id: "leader_id".to_string(), @@ -397,6 +400,7 @@ mod tests { assert_eq!(shards.next_shard_id, 3); let subrequest = OpenShardsSubrequest { + subrequest_id: 0, index_uid: index_uid.clone().into(), source_id: source_id.clone(), leader_id: "leader_id".to_string(), diff --git a/quickwit/quickwit-proto/protos/quickwit/control_plane.proto b/quickwit/quickwit-proto/protos/quickwit/control_plane.proto index ceeb70b559f..32e9686c7e5 100644 --- a/quickwit/quickwit-proto/protos/quickwit/control_plane.proto +++ b/quickwit/quickwit-proto/protos/quickwit/control_plane.proto @@ -70,8 +70,9 @@ message GetOrCreateOpenShardsRequest { } message GetOrCreateOpenShardsSubrequest { - string index_id = 1; - string source_id = 2; + uint32 subrequest_id = 1; + string index_id = 2; + string source_id = 3; } message ClosedShards { @@ -80,13 +81,27 @@ message ClosedShards { repeated uint64 shard_ids = 3; } -// TODO: Handle partial failures. message GetOrCreateOpenShardsResponse { - repeated GetOpenShardsSubresponse subresponses = 1; + repeated GetOrCreateOpenShardsSuccess successes = 1; + repeated GetOrCreateOpenShardsFailure failures = 2; } -message GetOpenShardsSubresponse { - string index_uid = 1; - string source_id = 2; - repeated quickwit.ingest.Shard open_shards = 3; +message GetOrCreateOpenShardsSuccess { + uint32 subrequest_id = 1; + string index_uid = 2; + string source_id = 3; + repeated quickwit.ingest.Shard open_shards = 4; +} + +enum GetOrCreateOpenShardsFailureReason { + GET_OR_CREATE_OPEN_SHARDS_FAILURE_REASON_UNSPECIFIED = 0; + GET_OR_CREATE_OPEN_SHARDS_FAILURE_REASON_INDEX_NOT_FOUND = 1; + GET_OR_CREATE_OPEN_SHARDS_FAILURE_REASON_SOURCE_NOT_FOUND = 2; +} + +message GetOrCreateOpenShardsFailure { + uint32 subrequest_id = 1; + string index_id = 2; + string source_id = 3; + GetOrCreateOpenShardsFailureReason reason = 4; } diff --git a/quickwit/quickwit-proto/protos/quickwit/ingest.proto b/quickwit/quickwit-proto/protos/quickwit/ingest.proto index deea81a36e2..e6b0288759f 100644 --- a/quickwit/quickwit-proto/protos/quickwit/ingest.proto +++ b/quickwit/quickwit-proto/protos/quickwit/ingest.proto @@ -30,51 +30,51 @@ message Position { } enum CommitTypeV2 { - COMMIT_TYPE_V2_UNSPECIFIED = 0; - COMMIT_TYPE_V2_AUTO = 1; - COMMIT_TYPE_V2_WAIT = 2; - COMMIT_TYPE_V2_FORCE = 3; + COMMIT_TYPE_V2_UNSPECIFIED = 0; + COMMIT_TYPE_V2_AUTO = 1; + COMMIT_TYPE_V2_WAIT = 2; + COMMIT_TYPE_V2_FORCE = 3; } message DocBatchV2 { - bytes doc_buffer = 1; - repeated uint32 doc_lengths = 2; + bytes doc_buffer = 1; + repeated uint32 doc_lengths = 2; } message MRecordBatch { - // Buffer of encoded and then concatenated mrecords. - bytes mrecord_buffer = 1; - // Lengths of the mrecords in the buffer. - repeated uint32 mrecord_lengths = 2; + // Buffer of encoded and then concatenated mrecords. + bytes mrecord_buffer = 1; + // Lengths of the mrecords in the buffer. + repeated uint32 mrecord_lengths = 2; } enum ShardState { - SHARD_STATE_UNSPECIFIED = 0; - // The shard is open and accepts write requests. - SHARD_STATE_OPEN = 1; - // The ingester hosting the shard is unavailable. - SHARD_STATE_UNAVAILABLE = 2; - // The shard is closed and cannot be written to. - // It can be safely deleted if the publish position is superior or equal to the replication position. - SHARD_STATE_CLOSED = 3; + SHARD_STATE_UNSPECIFIED = 0; + // The shard is open and accepts write requests. + SHARD_STATE_OPEN = 1; + // The ingester hosting the shard is unavailable. + SHARD_STATE_UNAVAILABLE = 2; + // The shard is closed and cannot be written to. + // It can be safely deleted if the publish position is superior or equal to the replication position. + SHARD_STATE_CLOSED = 3; } message Shard { - // Immutable fields - string index_uid = 1; - string source_id = 2; - uint64 shard_id = 3; - // The node ID of the ingester to which all the write requests for this shard should be sent to. - string leader_id = 4; - // The node ID of the ingester holding a copy of the data. - optional string follower_id = 5; + // Immutable fields + string index_uid = 1; + string source_id = 2; + uint64 shard_id = 3; + // The node ID of the ingester to which all the write requests for this shard should be sent to. + string leader_id = 4; + // The node ID of the ingester holding a copy of the data. + optional string follower_id = 5; - // Mutable fields - ShardState shard_state = 8; - // Position up to which indexers have indexed and published the records stored in the shard. - // It is updated asynchronously in a best effort manner by the indexers and indicates the position up to which the log can be safely truncated. - Position publish_position_inclusive = 9; - // A publish token that ensures only one indexer works on a given shard at a time. - // For instance, if an indexer goes rogue, eventually the control plane will detect it and assign the shard to another indexer, which will override the publish token. - optional string publish_token = 10; + // Mutable fields + ShardState shard_state = 8; + // Position up to which indexers have indexed and published the records stored in the shard. + // It is updated asynchronously in a best effort manner by the indexers and indicates the position up to which the log can be safely truncated. + Position publish_position_inclusive = 9; + // A publish token that ensures only one indexer works on a given shard at a time. + // For instance, if an indexer goes rogue, eventually the control plane will detect it and assign the shard to another indexer, which will override the publish token. + optional string publish_token = 10; } diff --git a/quickwit/quickwit-proto/protos/quickwit/ingester.proto b/quickwit/quickwit-proto/protos/quickwit/ingester.proto index aca868f29b7..c7dd2e9c6be 100644 --- a/quickwit/quickwit-proto/protos/quickwit/ingester.proto +++ b/quickwit/quickwit-proto/protos/quickwit/ingester.proto @@ -24,171 +24,171 @@ package quickwit.ingest.ingester; import "quickwit/ingest.proto"; service IngesterService { - // Persists batches of documents to primary shards owned by a leader. - rpc Persist(PersistRequest) returns (PersistResponse); + // Persists batches of documents to primary shards owned by a leader. + rpc Persist(PersistRequest) returns (PersistResponse); - // Opens a replication stream from a leader to a follower. - rpc OpenReplicationStream(stream SynReplicationMessage) returns (stream AckReplicationMessage); + // Opens a replication stream from a leader to a follower. + rpc OpenReplicationStream(stream SynReplicationMessage) returns (stream AckReplicationMessage); - // Streams records from a leader or a follower. The client can optionally specify a range of positions to fetch. - rpc OpenFetchStream(OpenFetchStreamRequest) returns (stream FetchResponseV2); + // Streams records from a leader or a follower. The client can optionally specify a range of positions to fetch. + rpc OpenFetchStream(OpenFetchStreamRequest) returns (stream FetchResponseV2); - // rpc OpenWatchStream(OpenWatchStreamRequest) returns (stream WatchMessage); + // rpc OpenWatchStream(OpenWatchStreamRequest) returns (stream WatchMessage); - // Pings an ingester to check if it is ready to host shards and serve requests. - rpc Ping(PingRequest) returns (PingResponse); + // Pings an ingester to check if it is ready to host shards and serve requests. + rpc Ping(PingRequest) returns (PingResponse); - // Truncates the shards at the given positions. Indexers should call this RPC on leaders, which will replicate the request to followers. - rpc Truncate(TruncateRequest) returns (TruncateResponse); + // Truncates the shards at the given positions. Indexers should call this RPC on leaders, which will replicate the request to followers. + rpc Truncate(TruncateRequest) returns (TruncateResponse); } message PersistRequest { - string leader_id = 1; - quickwit.ingest.CommitTypeV2 commit_type = 3; - repeated PersistSubrequest subrequests = 4; + string leader_id = 1; + quickwit.ingest.CommitTypeV2 commit_type = 3; + repeated PersistSubrequest subrequests = 4; } message PersistSubrequest { - uint32 subrequest_id = 1; - string index_uid = 2; - string source_id = 3; - uint64 shard_id = 4; - optional string follower_id = 5; - quickwit.ingest.DocBatchV2 doc_batch = 6; + uint32 subrequest_id = 1; + string index_uid = 2; + string source_id = 3; + uint64 shard_id = 4; + optional string follower_id = 5; + quickwit.ingest.DocBatchV2 doc_batch = 6; } message PersistResponse { - string leader_id = 1; - repeated PersistSuccess successes = 2; - repeated PersistFailure failures = 3; + string leader_id = 1; + repeated PersistSuccess successes = 2; + repeated PersistFailure failures = 3; } message PersistSuccess { - uint32 subrequest_id = 1; - string index_uid = 2; - string source_id = 3; - uint64 shard_id = 4; - quickwit.ingest.Position replication_position_inclusive = 5; + uint32 subrequest_id = 1; + string index_uid = 2; + string source_id = 3; + uint64 shard_id = 4; + quickwit.ingest.Position replication_position_inclusive = 5; } enum PersistFailureReason { - PERSIST_FAILURE_REASON_UNSPECIFIED = 0; - PERSIST_FAILURE_REASON_SHARD_CLOSED = 1; - PERSIST_FAILURE_REASON_RATE_LIMITED = 2; - PERSIST_FAILURE_REASON_RESOURCE_EXHAUSTED = 3; + PERSIST_FAILURE_REASON_UNSPECIFIED = 0; + PERSIST_FAILURE_REASON_SHARD_CLOSED = 1; + PERSIST_FAILURE_REASON_RATE_LIMITED = 2; + PERSIST_FAILURE_REASON_RESOURCE_EXHAUSTED = 3; } message PersistFailure { - uint32 subrequest_id = 1; - string index_uid = 2; - string source_id = 3; - uint64 shard_id = 4; - PersistFailureReason reason = 5; + uint32 subrequest_id = 1; + string index_uid = 2; + string source_id = 3; + uint64 shard_id = 4; + PersistFailureReason reason = 5; } message SynReplicationMessage { - oneof message { - OpenReplicationStreamRequest open_request = 1; - ReplicateRequest replicate_request = 2; - } + oneof message { + OpenReplicationStreamRequest open_request = 1; + ReplicateRequest replicate_request = 2; + } } message AckReplicationMessage { - oneof message { - OpenReplicationStreamResponse open_response = 1; - ReplicateResponse replicate_response = 2; - } + oneof message { + OpenReplicationStreamResponse open_response = 1; + ReplicateResponse replicate_response = 2; + } } message OpenReplicationStreamRequest { - string leader_id = 1; - string follower_id = 2; + string leader_id = 1; + string follower_id = 2; } message OpenReplicationStreamResponse { } message ReplicateRequest { - string leader_id = 1; - string follower_id = 2; - quickwit.ingest.CommitTypeV2 commit_type = 3; - repeated ReplicateSubrequest subrequests = 4; - // Position of the request in the replication stream. - uint64 replication_seqno = 5; + string leader_id = 1; + string follower_id = 2; + quickwit.ingest.CommitTypeV2 commit_type = 3; + repeated ReplicateSubrequest subrequests = 4; + // Position of the request in the replication stream. + uint64 replication_seqno = 5; } message ReplicateSubrequest { - uint32 subrequest_id = 1; - string index_uid = 2; - string source_id = 3; - uint64 shard_id = 4; - quickwit.ingest.Position from_position_exclusive = 5; - quickwit.ingest.Position to_position_inclusive = 6; - ingest.DocBatchV2 doc_batch = 7; + uint32 subrequest_id = 1; + string index_uid = 2; + string source_id = 3; + uint64 shard_id = 4; + quickwit.ingest.Position from_position_exclusive = 5; + quickwit.ingest.Position to_position_inclusive = 6; + ingest.DocBatchV2 doc_batch = 7; } message ReplicateResponse { - string follower_id = 1; - repeated ReplicateSuccess successes = 2; - repeated ReplicateFailure failures = 3; - // Position of the response in the replication stream. It should match the position of the request. - uint64 replication_seqno = 4; + string follower_id = 1; + repeated ReplicateSuccess successes = 2; + repeated ReplicateFailure failures = 3; + // Position of the response in the replication stream. It should match the position of the request. + uint64 replication_seqno = 4; } message ReplicateSuccess { - uint32 subrequest_id = 1; - string index_uid = 2; - string source_id = 3; - uint64 shard_id = 4; - quickwit.ingest.Position replication_position_inclusive = 5; + uint32 subrequest_id = 1; + string index_uid = 2; + string source_id = 3; + uint64 shard_id = 4; + quickwit.ingest.Position replication_position_inclusive = 5; } message ReplicateFailure { - uint32 subrequest_id = 1; - string index_uid = 2; - string source_id = 3; - uint64 shard_id = 4; - // ingest.DocBatchV2 doc_batch = 4; - // ingest.IngestError error = 5; + uint32 subrequest_id = 1; + string index_uid = 2; + string source_id = 3; + uint64 shard_id = 4; + // ingest.DocBatchV2 doc_batch = 4; + // ingest.IngestError error = 5; } message TruncateRequest { - string ingester_id = 1; - repeated TruncateSubrequest subrequests = 2; + string ingester_id = 1; + repeated TruncateSubrequest subrequests = 2; } message TruncateSubrequest { - string index_uid = 1; - string source_id = 2; - uint64 shard_id = 3; - quickwit.ingest.Position to_position_inclusive = 4; + string index_uid = 1; + string source_id = 2; + uint64 shard_id = 3; + quickwit.ingest.Position to_position_inclusive = 4; } message TruncateResponse { - // TODO + // TODO } message OpenFetchStreamRequest { - string client_id = 1; - string index_uid = 2; - string source_id = 3; - uint64 shard_id = 4; - quickwit.ingest.Position from_position_exclusive = 5; - quickwit.ingest.Position to_position_inclusive = 6; + string client_id = 1; + string index_uid = 2; + string source_id = 3; + uint64 shard_id = 4; + quickwit.ingest.Position from_position_exclusive = 5; + quickwit.ingest.Position to_position_inclusive = 6; } message FetchResponseV2 { - string index_uid = 1; - string source_id = 2; - uint64 shard_id = 3; - quickwit.ingest.MRecordBatch mrecord_batch = 4; - quickwit.ingest.Position from_position_exclusive = 5; - quickwit.ingest.Position to_position_inclusive = 6; + string index_uid = 1; + string source_id = 2; + uint64 shard_id = 3; + quickwit.ingest.MRecordBatch mrecord_batch = 4; + quickwit.ingest.Position from_position_exclusive = 5; + quickwit.ingest.Position to_position_inclusive = 6; } message PingRequest { - string leader_id = 1; - optional string follower_id = 2; + string leader_id = 1; + optional string follower_id = 2; } message PingResponse { diff --git a/quickwit/quickwit-proto/protos/quickwit/metastore.proto b/quickwit/quickwit-proto/protos/quickwit/metastore.proto index 40e20ac5f36..4d74bd65ad0 100644 --- a/quickwit/quickwit-proto/protos/quickwit/metastore.proto +++ b/quickwit/quickwit-proto/protos/quickwit/metastore.proto @@ -125,8 +125,8 @@ message CreateIndexResponse { } message ListIndexesMetadataRequest { - reserved 1; - repeated string index_id_patterns = 2; + reserved 1; + repeated string index_id_patterns = 2; } message ListIndexesMetadataResponse { @@ -263,11 +263,12 @@ message OpenShardsRequest { } message OpenShardsSubrequest { - string index_uid = 1; - string source_id = 2; - string leader_id = 3; - optional string follower_id = 4; - uint64 next_shard_id = 5; + uint32 subrequest_id = 1; + string index_uid = 2; + string source_id = 3; + string leader_id = 4; + optional string follower_id = 5; + uint64 next_shard_id = 6; } message OpenShardsResponse { @@ -275,10 +276,11 @@ message OpenShardsResponse { } message OpenShardsSubresponse { - string index_uid = 1; - string source_id = 2; - repeated quickwit.ingest.Shard opened_shards = 3; - uint64 next_shard_id = 4; + uint32 subrequest_id = 1; + string index_uid = 2; + string source_id = 3; + repeated quickwit.ingest.Shard opened_shards = 4; + uint64 next_shard_id = 5; } message AcquireShardsRequest { @@ -286,10 +288,10 @@ message AcquireShardsRequest { } message AcquireShardsSubrequest { - string index_uid = 1; - string source_id = 2; - repeated uint64 shard_ids = 3; - string publish_token = 4; + string index_uid = 1; + string source_id = 2; + repeated uint64 shard_ids = 3; + string publish_token = 4; } message AcquireShardsResponse { @@ -297,9 +299,9 @@ message AcquireShardsResponse { } message AcquireShardsSubresponse { - string index_uid = 1; - string source_id = 2; - repeated quickwit.ingest.Shard acquired_shards = 3; + string index_uid = 1; + string source_id = 2; + repeated quickwit.ingest.Shard acquired_shards = 3; } message DeleteShardsRequest { @@ -308,9 +310,9 @@ message DeleteShardsRequest { } message DeleteShardsSubrequest { - string index_uid = 1; - string source_id = 2; - repeated uint64 shard_ids = 3; + string index_uid = 1; + string source_id = 2; + repeated uint64 shard_ids = 3; } message DeleteShardsResponse { @@ -321,18 +323,18 @@ message ListShardsRequest { } message ListShardsSubrequest { - string index_uid = 1; - string source_id = 2; - optional quickwit.ingest.ShardState shard_state = 3; + string index_uid = 1; + string source_id = 2; + optional quickwit.ingest.ShardState shard_state = 3; } message ListShardsResponse { - repeated ListShardsSubresponse subresponses = 1; + repeated ListShardsSubresponse subresponses = 1; } message ListShardsSubresponse { - string index_uid = 1; - string source_id = 2; - repeated quickwit.ingest.Shard shards = 3; - uint64 next_shard_id = 4; + string index_uid = 1; + string source_id = 2; + repeated quickwit.ingest.Shard shards = 3; + uint64 next_shard_id = 4; } diff --git a/quickwit/quickwit-proto/protos/quickwit/router.proto b/quickwit/quickwit-proto/protos/quickwit/router.proto index 68bae11746e..796def8acce 100644 --- a/quickwit/quickwit-proto/protos/quickwit/router.proto +++ b/quickwit/quickwit-proto/protos/quickwit/router.proto @@ -24,48 +24,53 @@ package quickwit.ingest.router; import "quickwit/ingest.proto"; service IngestRouterService { - // Ingests batches of documents for one or multiple indexes. - // TODO: Describe error cases and how to handle them. - rpc Ingest(IngestRequestV2) returns (IngestResponseV2); + // Ingests batches of documents for one or multiple indexes. + // TODO: Describe error cases and how to handle them. + rpc Ingest(IngestRequestV2) returns (IngestResponseV2); } message IngestRequestV2 { - repeated IngestSubrequest subrequests = 1; - quickwit.ingest.CommitTypeV2 commit_type = 2; + repeated IngestSubrequest subrequests = 1; + quickwit.ingest.CommitTypeV2 commit_type = 2; } message IngestSubrequest { - // The subrequest ID is used to identify the various subrequests and responses - // (ingest, persist, replicate) at play during the ingest and replication - // process. - uint32 subrequest_id = 1; - string index_id = 2; - string source_id = 3; - quickwit.ingest.DocBatchV2 doc_batch = 4; + // The subrequest ID is used to identify the various subrequests and responses + // (ingest, persist, replicate) at play during the ingest and replication + // process. + uint32 subrequest_id = 1; + string index_id = 2; + string source_id = 3; + quickwit.ingest.DocBatchV2 doc_batch = 4; } message IngestResponseV2 { - repeated IngestSuccess successes = 1; - repeated IngestFailure failures = 2; + repeated IngestSuccess successes = 1; + repeated IngestFailure failures = 2; } message IngestSuccess { - uint32 subrequest_id = 1; - string index_uid = 2; - string source_id = 3; - uint64 shard_id = 4; - // Replication position inclusive. - quickwit.ingest.Position replication_position_inclusive = 5; + uint32 subrequest_id = 1; + string index_uid = 2; + string source_id = 3; + uint64 shard_id = 4; + // Replication position inclusive. + quickwit.ingest.Position replication_position_inclusive = 5; } enum IngestFailureReason { - INGEST_FAILURE_REASON_UNSPECIFIED = 0; - INGEST_FAILURE_REASON_NO_SHARDS_AVAILABLE = 1; + INGEST_FAILURE_REASON_UNSPECIFIED = 0; + INGEST_FAILURE_REASON_INDEX_NOT_FOUND = 1; + INGEST_FAILURE_REASON_SOURCE_NOT_FOUND = 2; + INGEST_FAILURE_REASON_INTERNAL = 3; + INGEST_FAILURE_REASON_NO_SHARDS_AVAILABLE = 4; + INGEST_FAILURE_REASON_RATE_LIMITED = 5; + INGEST_FAILURE_REASON_RESOURCE_EXHAUSTED = 6; } message IngestFailure { - uint32 subrequest_id = 1; - string index_id = 2; - string source_id = 3; - IngestFailureReason reason = 5; + uint32 subrequest_id = 1; + string index_id = 2; + string source_id = 3; + IngestFailureReason reason = 5; } diff --git a/quickwit/quickwit-proto/protos/quickwit/search.proto b/quickwit/quickwit-proto/protos/quickwit/search.proto index 9b82a3ec88e..c95eb716eaf 100644 --- a/quickwit/quickwit-proto/protos/quickwit/search.proto +++ b/quickwit/quickwit-proto/protos/quickwit/search.proto @@ -76,9 +76,9 @@ service SearchService { /// Scroll Request message ScrollRequest { - /// The `scroll_id` is the given in the response of a search request including a scroll. - string scroll_id = 1; - optional uint32 scroll_ttl_secs = 2; + /// The `scroll_id` is the given in the response of a search request including a scroll. + string scroll_id = 1; + optional uint32 scroll_ttl_secs = 2; } message PutKVRequest { @@ -90,23 +90,23 @@ message PutKVRequest { message PutKVResponse {} message GetKVRequest { - bytes key = 1; + bytes key = 1; } message GetKVResponse { - optional bytes payload = 1; + optional bytes payload = 1; } message ReportSplit { - // Split id (ULID format `01HAV29D4XY3D462FS3D8K5Q2H`) - string split_id = 2; - // The storage uri. This URI does NOT include the split id. - string storage_uri = 1; + // Split id (ULID format `01HAV29D4XY3D462FS3D8K5Q2H`) + string split_id = 2; + // The storage uri. This URI does NOT include the split id. + string storage_uri = 1; } message ReportSplitsRequest { - repeated ReportSplit report_splits = 1; + repeated ReportSplit report_splits = 1; } message ReportSplitsResponse {} @@ -188,10 +188,10 @@ message SortField { } enum SortOrder { - // Ascending order. - ASC = 0; - // Descending order. - DESC = 1; //< This will be the default value; + // Ascending order. + ASC = 0; + // Descending order. + DESC = 1; //< This will be the default value; } message SearchResponse { @@ -325,10 +325,10 @@ message PartialHit { message SortByValue { oneof sort_value { - uint64 u64 = 1; - int64 i64 = 2; - double f64 = 3; - bool boolean = 4; + uint64 u64 = 1; + int64 i64 = 2; + double f64 = 3; + bool boolean = 4; } // Room for eventual future sorted key types. reserved 5 to 20; @@ -449,12 +449,12 @@ message LeafListTermsResponse { // -- Stream ------------------- enum OutputFormat { - // Comma Separated Values format (https://datatracker.ietf.org/doc/html/rfc4180). - // The delimiter is `,`. - CSV = 0; //< This will be the default value - // Format data by row in ClickHouse binary format. - // https://clickhouse.tech/docs/en/interfaces/formats/#rowbinary - CLICK_HOUSE_ROW_BINARY = 1; + // Comma Separated Values format (https://datatracker.ietf.org/doc/html/rfc4180). + // The delimiter is `,`. + CSV = 0; //< This will be the default value + // Format data by row in ClickHouse binary format. + // https://clickhouse.tech/docs/en/interfaces/formats/#rowbinary + CLICK_HOUSE_ROW_BINARY = 1; } message SearchStreamRequest { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs index 82535245522..5e7c01caade 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs @@ -13,9 +13,11 @@ pub struct GetOrCreateOpenShardsRequest { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct GetOrCreateOpenShardsSubrequest { - #[prost(string, tag = "1")] - pub index_id: ::prost::alloc::string::String, + #[prost(uint32, tag = "1")] + pub subrequest_id: u32, #[prost(string, tag = "2")] + pub index_id: ::prost::alloc::string::String, + #[prost(string, tag = "3")] pub source_id: ::prost::alloc::string::String, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] @@ -29,25 +31,84 @@ pub struct ClosedShards { #[prost(uint64, repeated, tag = "3")] pub shard_ids: ::prost::alloc::vec::Vec, } -/// TODO: Handle partial failures. #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct GetOrCreateOpenShardsResponse { #[prost(message, repeated, tag = "1")] - pub subresponses: ::prost::alloc::vec::Vec, + pub successes: ::prost::alloc::vec::Vec, + #[prost(message, repeated, tag = "2")] + pub failures: ::prost::alloc::vec::Vec, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct GetOpenShardsSubresponse { - #[prost(string, tag = "1")] - pub index_uid: ::prost::alloc::string::String, +pub struct GetOrCreateOpenShardsSuccess { + #[prost(uint32, tag = "1")] + pub subrequest_id: u32, #[prost(string, tag = "2")] + pub index_uid: ::prost::alloc::string::String, + #[prost(string, tag = "3")] pub source_id: ::prost::alloc::string::String, - #[prost(message, repeated, tag = "3")] + #[prost(message, repeated, tag = "4")] pub open_shards: ::prost::alloc::vec::Vec, } +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetOrCreateOpenShardsFailure { + #[prost(uint32, tag = "1")] + pub subrequest_id: u32, + #[prost(string, tag = "2")] + pub index_id: ::prost::alloc::string::String, + #[prost(string, tag = "3")] + pub source_id: ::prost::alloc::string::String, + #[prost(enumeration = "GetOrCreateOpenShardsFailureReason", tag = "4")] + pub reason: i32, +} +#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] +#[serde(rename_all = "snake_case")] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum GetOrCreateOpenShardsFailureReason { + Unspecified = 0, + IndexNotFound = 1, + SourceNotFound = 2, +} +impl GetOrCreateOpenShardsFailureReason { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + GetOrCreateOpenShardsFailureReason::Unspecified => { + "GET_OR_CREATE_OPEN_SHARDS_FAILURE_REASON_UNSPECIFIED" + } + GetOrCreateOpenShardsFailureReason::IndexNotFound => { + "GET_OR_CREATE_OPEN_SHARDS_FAILURE_REASON_INDEX_NOT_FOUND" + } + GetOrCreateOpenShardsFailureReason::SourceNotFound => { + "GET_OR_CREATE_OPEN_SHARDS_FAILURE_REASON_SOURCE_NOT_FOUND" + } + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "GET_OR_CREATE_OPEN_SHARDS_FAILURE_REASON_UNSPECIFIED" => { + Some(Self::Unspecified) + } + "GET_OR_CREATE_OPEN_SHARDS_FAILURE_REASON_INDEX_NOT_FOUND" => { + Some(Self::IndexNotFound) + } + "GET_OR_CREATE_OPEN_SHARDS_FAILURE_REASON_SOURCE_NOT_FOUND" => { + Some(Self::SourceNotFound) + } + _ => None, + } + } +} /// BEGIN quickwit-codegen #[allow(unused_imports)] use std::str::FromStr; @@ -118,6 +179,8 @@ impl ControlPlaneServiceClient { > { let adapter = ControlPlaneServiceGrpcServerAdapter::new(self.clone()); control_plane_service_grpc_server::ControlPlaneServiceGrpcServer::new(adapter) + .max_decoding_message_size(10 * 1024 * 1024) + .max_encoding_message_size(10 * 1024 * 1024) } pub fn from_channel( addr: std::net::SocketAddr, @@ -138,10 +201,13 @@ impl ControlPlaneServiceClient { balance_channel: quickwit_common::tower::BalanceChannel, ) -> ControlPlaneServiceClient { let connection_keys_watcher = balance_channel.connection_keys_watcher(); - let adapter = ControlPlaneServiceGrpcClientAdapter::new( - control_plane_service_grpc_client::ControlPlaneServiceGrpcClient::new( + let client = control_plane_service_grpc_client::ControlPlaneServiceGrpcClient::new( balance_channel, - ), + ) + .max_decoding_message_size(10 * 1024 * 1024) + .max_encoding_message_size(10 * 1024 * 1024); + let adapter = ControlPlaneServiceGrpcClientAdapter::new( + client, connection_keys_watcher, ); Self::new(adapter) diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.indexing.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.indexing.rs index 7b9c977eac2..deccabf4b09 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.indexing.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.indexing.rs @@ -67,6 +67,8 @@ impl IndexingServiceClient { > { let adapter = IndexingServiceGrpcServerAdapter::new(self.clone()); indexing_service_grpc_server::IndexingServiceGrpcServer::new(adapter) + .max_decoding_message_size(10 * 1024 * 1024) + .max_encoding_message_size(10 * 1024 * 1024) } pub fn from_channel( addr: std::net::SocketAddr, @@ -85,10 +87,13 @@ impl IndexingServiceClient { balance_channel: quickwit_common::tower::BalanceChannel, ) -> IndexingServiceClient { let connection_keys_watcher = balance_channel.connection_keys_watcher(); - let adapter = IndexingServiceGrpcClientAdapter::new( - indexing_service_grpc_client::IndexingServiceGrpcClient::new( + let client = indexing_service_grpc_client::IndexingServiceGrpcClient::new( balance_channel, - ), + ) + .max_decoding_message_size(10 * 1024 * 1024) + .max_encoding_message_size(10 * 1024 * 1024); + let adapter = IndexingServiceGrpcClientAdapter::new( + client, connection_keys_watcher, ); Self::new(adapter) diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs index 70258aa0ecf..8ceb73fc7ee 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs @@ -376,6 +376,8 @@ impl IngesterServiceClient { > { let adapter = IngesterServiceGrpcServerAdapter::new(self.clone()); ingester_service_grpc_server::IngesterServiceGrpcServer::new(adapter) + .max_decoding_message_size(10 * 1024 * 1024) + .max_encoding_message_size(10 * 1024 * 1024) } pub fn from_channel( addr: std::net::SocketAddr, @@ -394,10 +396,13 @@ impl IngesterServiceClient { balance_channel: quickwit_common::tower::BalanceChannel, ) -> IngesterServiceClient { let connection_keys_watcher = balance_channel.connection_keys_watcher(); - let adapter = IngesterServiceGrpcClientAdapter::new( - ingester_service_grpc_client::IngesterServiceGrpcClient::new( + let client = ingester_service_grpc_client::IngesterServiceGrpcClient::new( balance_channel, - ), + ) + .max_decoding_message_size(10 * 1024 * 1024) + .max_encoding_message_size(10 * 1024 * 1024); + let adapter = IngesterServiceGrpcClientAdapter::new( + client, connection_keys_watcher, ); Self::new(adapter) diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.router.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.router.rs index 5b26c86fecf..443c9005fa7 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.router.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.router.rs @@ -67,7 +67,12 @@ pub struct IngestFailure { #[repr(i32)] pub enum IngestFailureReason { Unspecified = 0, - NoShardsAvailable = 1, + IndexNotFound = 1, + SourceNotFound = 2, + Internal = 3, + NoShardsAvailable = 4, + RateLimited = 5, + ResourceExhausted = 6, } impl IngestFailureReason { /// String value of the enum field names used in the ProtoBuf definition. @@ -77,16 +82,30 @@ impl IngestFailureReason { pub fn as_str_name(&self) -> &'static str { match self { IngestFailureReason::Unspecified => "INGEST_FAILURE_REASON_UNSPECIFIED", + IngestFailureReason::IndexNotFound => "INGEST_FAILURE_REASON_INDEX_NOT_FOUND", + IngestFailureReason::SourceNotFound => { + "INGEST_FAILURE_REASON_SOURCE_NOT_FOUND" + } + IngestFailureReason::Internal => "INGEST_FAILURE_REASON_INTERNAL", IngestFailureReason::NoShardsAvailable => { "INGEST_FAILURE_REASON_NO_SHARDS_AVAILABLE" } + IngestFailureReason::RateLimited => "INGEST_FAILURE_REASON_RATE_LIMITED", + IngestFailureReason::ResourceExhausted => { + "INGEST_FAILURE_REASON_RESOURCE_EXHAUSTED" + } } } /// Creates an enum from field names used in the ProtoBuf definition. pub fn from_str_name(value: &str) -> ::core::option::Option { match value { "INGEST_FAILURE_REASON_UNSPECIFIED" => Some(Self::Unspecified), + "INGEST_FAILURE_REASON_INDEX_NOT_FOUND" => Some(Self::IndexNotFound), + "INGEST_FAILURE_REASON_SOURCE_NOT_FOUND" => Some(Self::SourceNotFound), + "INGEST_FAILURE_REASON_INTERNAL" => Some(Self::Internal), "INGEST_FAILURE_REASON_NO_SHARDS_AVAILABLE" => Some(Self::NoShardsAvailable), + "INGEST_FAILURE_REASON_RATE_LIMITED" => Some(Self::RateLimited), + "INGEST_FAILURE_REASON_RESOURCE_EXHAUSTED" => Some(Self::ResourceExhausted), _ => None, } } @@ -136,6 +155,8 @@ impl IngestRouterServiceClient { > { let adapter = IngestRouterServiceGrpcServerAdapter::new(self.clone()); ingest_router_service_grpc_server::IngestRouterServiceGrpcServer::new(adapter) + .max_decoding_message_size(10 * 1024 * 1024) + .max_encoding_message_size(10 * 1024 * 1024) } pub fn from_channel( addr: std::net::SocketAddr, @@ -156,10 +177,13 @@ impl IngestRouterServiceClient { balance_channel: quickwit_common::tower::BalanceChannel, ) -> IngestRouterServiceClient { let connection_keys_watcher = balance_channel.connection_keys_watcher(); - let adapter = IngestRouterServiceGrpcClientAdapter::new( - ingest_router_service_grpc_client::IngestRouterServiceGrpcClient::new( + let client = ingest_router_service_grpc_client::IngestRouterServiceGrpcClient::new( balance_channel, - ), + ) + .max_decoding_message_size(10 * 1024 * 1024) + .max_encoding_message_size(10 * 1024 * 1024); + let adapter = IngestRouterServiceGrpcClientAdapter::new( + client, connection_keys_watcher, ); Self::new(adapter) diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs index 014f5783f05..3c06ee2e69b 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs @@ -249,15 +249,17 @@ pub struct OpenShardsRequest { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct OpenShardsSubrequest { - #[prost(string, tag = "1")] - pub index_uid: ::prost::alloc::string::String, + #[prost(uint32, tag = "1")] + pub subrequest_id: u32, #[prost(string, tag = "2")] - pub source_id: ::prost::alloc::string::String, + pub index_uid: ::prost::alloc::string::String, #[prost(string, tag = "3")] + pub source_id: ::prost::alloc::string::String, + #[prost(string, tag = "4")] pub leader_id: ::prost::alloc::string::String, - #[prost(string, optional, tag = "4")] + #[prost(string, optional, tag = "5")] pub follower_id: ::core::option::Option<::prost::alloc::string::String>, - #[prost(uint64, tag = "5")] + #[prost(uint64, tag = "6")] pub next_shard_id: u64, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] @@ -271,13 +273,15 @@ pub struct OpenShardsResponse { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct OpenShardsSubresponse { - #[prost(string, tag = "1")] - pub index_uid: ::prost::alloc::string::String, + #[prost(uint32, tag = "1")] + pub subrequest_id: u32, #[prost(string, tag = "2")] + pub index_uid: ::prost::alloc::string::String, + #[prost(string, tag = "3")] pub source_id: ::prost::alloc::string::String, - #[prost(message, repeated, tag = "3")] + #[prost(message, repeated, tag = "4")] pub opened_shards: ::prost::alloc::vec::Vec, - #[prost(uint64, tag = "4")] + #[prost(uint64, tag = "5")] pub next_shard_id: u64, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] @@ -705,6 +709,8 @@ impl MetastoreServiceClient { > { let adapter = MetastoreServiceGrpcServerAdapter::new(self.clone()); metastore_service_grpc_server::MetastoreServiceGrpcServer::new(adapter) + .max_decoding_message_size(10 * 1024 * 1024) + .max_encoding_message_size(10 * 1024 * 1024) } pub fn from_channel( addr: std::net::SocketAddr, @@ -723,10 +729,13 @@ impl MetastoreServiceClient { balance_channel: quickwit_common::tower::BalanceChannel, ) -> MetastoreServiceClient { let connection_keys_watcher = balance_channel.connection_keys_watcher(); - let adapter = MetastoreServiceGrpcClientAdapter::new( - metastore_service_grpc_client::MetastoreServiceGrpcClient::new( + let client = metastore_service_grpc_client::MetastoreServiceGrpcClient::new( balance_channel, - ), + ) + .max_decoding_message_size(10 * 1024 * 1024) + .max_encoding_message_size(10 * 1024 * 1024); + let adapter = MetastoreServiceGrpcClientAdapter::new( + client, connection_keys_watcher, ); Self::new(adapter) diff --git a/quickwit/quickwit-proto/src/control_plane/mod.rs b/quickwit/quickwit-proto/src/control_plane/mod.rs index e9f3168ba78..ddcade9892a 100644 --- a/quickwit/quickwit-proto/src/control_plane/mod.rs +++ b/quickwit/quickwit-proto/src/control_plane/mod.rs @@ -31,7 +31,7 @@ pub type ControlPlaneResult = std::result::Result; pub enum ControlPlaneError { #[error("an internal error occurred: {0}")] Internal(String), - #[error("an internal error occurred: {0}")] + #[error("a metastore error occurred: {0}")] Metastore(#[from] MetastoreError), #[error("control plane is unavailable: {0}")] Unavailable(String), @@ -41,7 +41,7 @@ impl From for MetastoreError { fn from(error: ControlPlaneError) -> Self { match error { ControlPlaneError::Internal(message) => MetastoreError::Internal { - message: "todo".to_string(), + message: "an internal metastore error occurred".to_string(), cause: message, }, ControlPlaneError::Metastore(error) => error, diff --git a/quickwit/quickwit-proto/src/ingest/mod.rs b/quickwit/quickwit-proto/src/ingest/mod.rs index ee776f5f3c5..ef5f54ec368 100644 --- a/quickwit/quickwit-proto/src/ingest/mod.rs +++ b/quickwit/quickwit-proto/src/ingest/mod.rs @@ -20,10 +20,10 @@ use bytes::Bytes; use self::ingester::FetchResponseV2; -use super::types::{NodeId, ShardId, SourceId}; +use super::types::NodeId; use super::{ServiceError, ServiceErrorCode}; use crate::control_plane::ControlPlaneError; -use crate::types::{queue_id, IndexUid, Position}; +use crate::types::{queue_id, Position}; pub mod ingester; pub mod router; @@ -38,29 +38,11 @@ pub enum IngestV2Error { Internal(String), #[error("failed to connect to ingester `{ingester_id}`")] IngesterUnavailable { ingester_id: NodeId }, - #[error( - "ingest service is currently unavailable with {num_ingesters} in the cluster and a \ - replication factor of {replication_factor}" - )] - ServiceUnavailable { - num_ingesters: usize, - replication_factor: usize, - }, - // #[error("Could not find shard.")] - // ShardNotFound { - // index_uid: IndexUid, - // source_id: SourceId, - // shard_id: ShardId, - // }, - #[error("failed to open or write to shard")] - ShardUnavailable { - leader_id: NodeId, - index_uid: IndexUid, - source_id: SourceId, - shard_id: ShardId, - }, #[error("request timed out")] Timeout, + // TODO: Merge `Transport` and `IngesterUnavailable` into a single `Unavailable` error. + #[error("transport error: {0}")] + Transport(String), } impl From for IngestV2Error { @@ -74,17 +56,19 @@ impl From for tonic::Status { let code = match &error { IngestV2Error::IngesterUnavailable { .. } => tonic::Code::Unavailable, IngestV2Error::Internal(_) => tonic::Code::Internal, - IngestV2Error::ServiceUnavailable { .. } => tonic::Code::Unavailable, - IngestV2Error::ShardUnavailable { .. } => tonic::Code::Unavailable, IngestV2Error::Timeout { .. } => tonic::Code::DeadlineExceeded, + IngestV2Error::Transport { .. } => tonic::Code::Unavailable, }; - let message = error.to_string(); + let message: String = error.to_string(); tonic::Status::new(code, message) } } impl From for IngestV2Error { fn from(status: tonic::Status) -> Self { + if status.code() == tonic::Code::Unavailable { + return IngestV2Error::Transport(status.message().to_string()); + } IngestV2Error::Internal(status.message().to_string()) } } @@ -92,11 +76,10 @@ impl From for IngestV2Error { impl ServiceError for IngestV2Error { fn error_code(&self) -> ServiceErrorCode { match self { - Self::Internal { .. } => ServiceErrorCode::Internal, Self::IngesterUnavailable { .. } => ServiceErrorCode::Unavailable, - Self::ShardUnavailable { .. } => ServiceErrorCode::Unavailable, - Self::ServiceUnavailable { .. } => ServiceErrorCode::Unavailable, + Self::Internal { .. } => ServiceErrorCode::Internal, Self::Timeout { .. } => ServiceErrorCode::Timeout, + Self::Transport { .. } => ServiceErrorCode::Unavailable, } } }