diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index 390abd96bae..b78f781ddcc 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -292,28 +292,7 @@ impl IngestRouter { persist_summary.leader_id ); } - match persist_error { - IngestV2Error::Unavailable(_) => { - workbench - .unavailable_leaders - .insert(persist_summary.leader_id); - for subrequest_id in persist_summary.subrequest_ids { - workbench.record_transport_error(subrequest_id); - } - } - IngestV2Error::IngesterUnavailable { .. } - | IngestV2Error::Internal(_) - | IngestV2Error::ShardNotFound { .. } - | IngestV2Error::Timeout(_) - | IngestV2Error::TooManyRequests => { - for subrequest_id in persist_summary.subrequest_ids { - workbench.record_internal_error( - subrequest_id, - persist_error.to_string(), - ); - } - } - } + workbench.record_persist_error(persist_error, persist_summary); } }; } @@ -341,8 +320,8 @@ impl IngestRouter { self.populate_routing_table_debounced(workbench, debounced_request) .await; - // List of subrequest IDs for which no shards were available to route the subrequests to. - let mut unavailable_subrequest_ids = Vec::new(); + // List of subrequest IDs for which no shards are available to route the subrequests to. + let mut no_shards_available_subrequest_ids = Vec::new(); let mut per_leader_persist_subrequests: HashMap<&LeaderId, Vec> = HashMap::new(); @@ -359,7 +338,7 @@ impl IngestRouter { .find_entry(&subrequest.index_id, &subrequest.source_id) .and_then(|entry| entry.next_open_shard_round_robin(&self.ingester_pool)) else { - unavailable_subrequest_ids.push(subrequest.subrequest_id); + no_shards_available_subrequest_ids.push(subrequest.subrequest_id); continue; }; let persist_subrequest = PersistSubrequest { @@ -383,7 +362,7 @@ impl IngestRouter { .map(|subrequest| subrequest.subrequest_id) .collect(); let Some(mut ingester) = self.ingester_pool.get(&leader_id) else { - unavailable_subrequest_ids.extend(subrequest_ids); + no_shards_available_subrequest_ids.extend(subrequest_ids); continue; }; let persist_summary = PersistRequestSummary { @@ -414,7 +393,7 @@ impl IngestRouter { } drop(state_guard); - for subrequest_id in unavailable_subrequest_ids { + for subrequest_id in no_shards_available_subrequest_ids { workbench.record_no_shards_available(subrequest_id); } self.process_persist_results(workbench, persist_futures) @@ -543,9 +522,9 @@ impl EventSubscriber for WeakRouterState { } } -struct PersistRequestSummary { - leader_id: NodeId, - subrequest_ids: Vec, +pub(super) struct PersistRequestSummary { + pub leader_id: NodeId, + pub subrequest_ids: Vec, } #[cfg(test)] @@ -909,6 +888,107 @@ mod tests { )); } + #[tokio::test] + async fn test_router_batch_persist_records_no_shards_available_empty_routing_table() { + let self_node_id = "test-router".into(); + let mut control_plane_mock = ControlPlaneServiceClient::mock(); + control_plane_mock + .expect_get_or_create_open_shards() + .once() + .returning(move |request| { + assert_eq!(request.subrequests.len(), 1); + + let subrequest = &request.subrequests[0]; + assert_eq!(subrequest.index_id, "test-index"); + assert_eq!(subrequest.source_id, "test-source"); + + let response = GetOrCreateOpenShardsResponse::default(); + Ok(response) + }); + let control_plane: ControlPlaneServiceClient = control_plane_mock.into(); + let ingester_pool = IngesterPool::default(); + let replication_factor = 1; + let mut router = IngestRouter::new( + self_node_id, + control_plane, + ingester_pool.clone(), + replication_factor, + ); + let ingest_subrequests = vec![IngestSubrequest { + subrequest_id: 0, + index_id: "test-index".to_string(), + source_id: "test-source".to_string(), + ..Default::default() + }]; + let mut workbench = IngestWorkbench::new(ingest_subrequests, 2); + let commit_type = CommitTypeV2::Auto; + router.batch_persist(&mut workbench, commit_type).await; + + let subworkbench = workbench.subworkbenches.get(&0).unwrap(); + assert!(matches!( + subworkbench.last_failure_opt, + Some(SubworkbenchFailure::NoShardsAvailable) + )); + } + + #[tokio::test] + async fn test_router_batch_persist_records_no_shards_available_unavailable_ingester() { + let self_node_id = "test-router".into(); + let mut control_plane_mock = ControlPlaneServiceClient::mock(); + control_plane_mock + .expect_get_or_create_open_shards() + .once() + .returning(move |request| { + assert_eq!(request.subrequests.len(), 1); + + let subrequest = &request.subrequests[0]; + assert_eq!(subrequest.index_id, "test-index"); + assert_eq!(subrequest.source_id, "test-source"); + + let response = GetOrCreateOpenShardsResponse { + successes: vec![GetOrCreateOpenShardsSuccess { + subrequest_id: 0, + index_uid: Some(IndexUid::for_test("test-index", 0)), + source_id: "test-source".to_string(), + open_shards: vec![Shard { + index_uid: Some(IndexUid::for_test("test-index", 0)), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(1)), + shard_state: ShardState::Open as i32, + leader_id: "test-ingester".into(), + ..Default::default() + }], + }], + ..Default::default() + }; + Ok(response) + }); + let control_plane: ControlPlaneServiceClient = control_plane_mock.into(); + let ingester_pool = IngesterPool::default(); + let replication_factor = 1; + let mut router = IngestRouter::new( + self_node_id, + control_plane, + ingester_pool.clone(), + replication_factor, + ); + let ingest_subrequests = vec![IngestSubrequest { + subrequest_id: 0, + index_id: "test-index".to_string(), + source_id: "test-source".to_string(), + ..Default::default() + }]; + let mut workbench = IngestWorkbench::new(ingest_subrequests, 2); + let commit_type = CommitTypeV2::Auto; + router.batch_persist(&mut workbench, commit_type).await; + + let subworkbench = workbench.subworkbenches.get(&0).unwrap(); + assert!(matches!( + subworkbench.last_failure_opt, + Some(SubworkbenchFailure::NoShardsAvailable) + )); + } + #[tokio::test] async fn test_router_process_persist_results_record_persist_successes() { let self_node_id = "test-router".into(); @@ -1094,7 +1174,7 @@ mod tests { } #[tokio::test] - async fn test_router_process_persist_results_does_not_removes_unavailable_leaders() { + async fn test_router_process_persist_results_does_not_remove_unavailable_leaders() { let self_node_id = "test-router".into(); let control_plane = ControlPlaneServiceClient::mock().into(); @@ -1178,7 +1258,7 @@ mod tests { let subworkbench = workbench.subworkbenches.get(&1).unwrap(); assert!(matches!( subworkbench.last_failure_opt, - Some(SubworkbenchFailure::Transport) + Some(SubworkbenchFailure::Unavailable) )); } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs b/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs index 43b3c0ab897..ef711dfa2c9 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs @@ -26,9 +26,12 @@ use quickwit_proto::ingest::ingester::{PersistFailure, PersistFailureReason, Per use quickwit_proto::ingest::router::{ IngestFailure, IngestFailureReason, IngestResponseV2, IngestSubrequest, IngestSuccess, }; +use quickwit_proto::ingest::IngestV2Error; use quickwit_proto::types::{NodeId, SubrequestId}; use tracing::warn; +use super::router::PersistRequestSummary; + /// A helper struct for managing the state of the subrequests of an ingest request during multiple /// persist attempts. #[derive(Default)] @@ -114,7 +117,7 @@ impl IngestWorkbench { "failure reason for subrequest `{}` is unspecified", open_shards_failure.subrequest_id ); - SubworkbenchFailure::Internal("unspecified".to_string()) + SubworkbenchFailure::Internal("unspecified failure reason".to_string()) } }; self.record_failure(open_shards_failure.subrequest_id, last_failure); @@ -133,6 +136,38 @@ impl IngestWorkbench { subworkbench.persist_success_opt = Some(persist_success); } + pub fn record_persist_error( + &mut self, + persist_error: IngestV2Error, + persist_summary: PersistRequestSummary, + ) { + // Persist responses use dedicated failure reasons for `IngesterUnavailable`, + // `NotFound`, and `TooManyRequests`: in reality, we should never have to handle these cases + // here. + match persist_error { + IngestV2Error::IngesterUnavailable { .. } | IngestV2Error::Unavailable(_) => { + self.unavailable_leaders.insert(persist_summary.leader_id); + + for subrequest_id in persist_summary.subrequest_ids { + self.record_ingester_unavailable(subrequest_id); + } + } + IngestV2Error::Internal(_) + | IngestV2Error::ShardNotFound { .. } + | IngestV2Error::Timeout(_) + | IngestV2Error::TooManyRequests => { + for subrequest_id in persist_summary.subrequest_ids { + self.record_internal_error(subrequest_id, persist_error.to_string()); + } + } + } + } + + pub fn record_persist_failure(&mut self, persist_failure: &PersistFailure) { + let failure = SubworkbenchFailure::Persist(persist_failure.reason()); + self.record_failure(persist_failure.subrequest_id, failure); + } + fn record_failure(&mut self, subrequest_id: SubrequestId, failure: SubworkbenchFailure) { let Some(subworkbench) = self.subworkbenches.get_mut(&subrequest_id) else { warn!("could not find subrequest `{}` in workbench", subrequest_id); @@ -149,16 +184,11 @@ impl IngestWorkbench { /// Marks a node as unavailable for the span of the workbench. /// /// Remaining attempts will treat the node as if it was not in the ingester pool. - pub fn record_transport_error(&mut self, subrequest_id: SubrequestId) { - self.record_failure(subrequest_id, SubworkbenchFailure::Transport); + pub fn record_ingester_unavailable(&mut self, subrequest_id: SubrequestId) { + self.record_failure(subrequest_id, SubworkbenchFailure::Unavailable); } - pub fn record_persist_failure(&mut self, persist_failure: &PersistFailure) { - let failure = SubworkbenchFailure::Persist(persist_failure.reason()); - self.record_failure(persist_failure.subrequest_id, failure); - } - - pub fn record_internal_error(&mut self, subrequest_id: SubrequestId, error_message: String) { + fn record_internal_error(&mut self, subrequest_id: SubrequestId, error_message: String) { self.record_failure(subrequest_id, SubworkbenchFailure::Internal(error_message)); } @@ -166,6 +196,7 @@ impl IngestWorkbench { let num_subworkbenches = self.subworkbenches.len(); let mut successes = Vec::with_capacity(self.num_successes); let mut failures = Vec::with_capacity(num_subworkbenches - self.num_successes); + for subworkbench in self.subworkbenches.into_values() { if let Some(persist_success) = subworkbench.persist_success_opt { let success = IngestSuccess { @@ -187,6 +218,7 @@ impl IngestWorkbench { } } assert_eq!(successes.len() + failures.len(), num_subworkbenches); + IngestResponseV2 { successes, failures, @@ -212,17 +244,23 @@ impl IngestWorkbench { #[derive(Debug)] pub(super) enum SubworkbenchFailure { + // There is no entry in the routing table for this index. IndexNotFound, + // There is no entry in the routing table for this source. SourceNotFound, + // The routing table entry for this source is empty, shards are all closed, or their leaders + // are unavailable. NoShardsAvailable, - // Transport error: we failed to reach the ingester. - Transport, - // This is an error supplied by the ingester. + // This is an error returned by the ingester: e.g. shard not found, shard closed, rate + // limited, resource exhausted, etc. Persist(PersistFailureReason), Internal(String), + // The ingester is no longer in the pool or a transport error occurred. + Unavailable, } impl SubworkbenchFailure { + /// Returns the final `IngestFailureReason` returned to the client. fn reason(&self) -> IngestFailureReason { match self { Self::IndexNotFound => IngestFailureReason::IndexNotFound, @@ -231,7 +269,7 @@ impl SubworkbenchFailure { Self::NoShardsAvailable => IngestFailureReason::NoShardsAvailable, // In our last attempt, we did not manage to reach the ingester. // We can consider that as a no shards available. - Self::Transport => IngestFailureReason::NoShardsAvailable, + Self::Unavailable => IngestFailureReason::NoShardsAvailable, Self::Persist(persist_failure_reason) => (*persist_failure_reason).into(), } } @@ -258,7 +296,7 @@ impl IngestSubworkbench { self.persist_success_opt.is_none() && self.last_failure_is_transient() } - /// Returns `false` if and only if the last attempt suggest retrying will fail. + /// Returns `false` if and only if the last attempt suggests retrying will fail. /// e.g.: /// - the index does not exist /// - the source does not exist. @@ -267,10 +305,9 @@ impl IngestSubworkbench { Some(SubworkbenchFailure::IndexNotFound) => false, Some(SubworkbenchFailure::SourceNotFound) => false, Some(SubworkbenchFailure::Internal(_)) => true, - // No need to retry no shards were available. - Some(SubworkbenchFailure::NoShardsAvailable) => false, + Some(SubworkbenchFailure::NoShardsAvailable) => true, Some(SubworkbenchFailure::Persist(_)) => true, - Some(SubworkbenchFailure::Transport) => true, + Some(SubworkbenchFailure::Unavailable) => true, None => true, } } @@ -292,7 +329,7 @@ mod tests { assert!(subworkbench.is_pending()); assert!(subworkbench.last_failure_is_transient()); - subworkbench.last_failure_opt = Some(SubworkbenchFailure::Transport); + subworkbench.last_failure_opt = Some(SubworkbenchFailure::Unavailable); assert!(subworkbench.is_pending()); assert!(subworkbench.last_failure_is_transient()); @@ -302,8 +339,8 @@ mod tests { assert!(subworkbench.last_failure_is_transient()); subworkbench.last_failure_opt = Some(SubworkbenchFailure::NoShardsAvailable); - assert!(!subworkbench.is_pending()); - assert!(!subworkbench.last_failure_is_transient()); + assert!(subworkbench.is_pending()); + assert!(subworkbench.last_failure_is_transient()); subworkbench.last_failure_opt = Some(SubworkbenchFailure::IndexNotFound); assert!(!subworkbench.is_pending()); @@ -470,6 +507,59 @@ mod tests { assert_eq!(subworkbench.num_attempts, 1); } + #[test] + fn test_ingest_workbench_record_persist_error_unavailable() { + let ingest_subrequests = vec![IngestSubrequest { + subrequest_id: 0, + ..Default::default() + }]; + let mut workbench = IngestWorkbench::new(ingest_subrequests, 1); + + let persist_error = IngestV2Error::Unavailable("connection error".to_string()); + let leader_id = NodeId::from("test-leader"); + let persist_summary = PersistRequestSummary { + leader_id: leader_id.clone(), + subrequest_ids: vec![0], + }; + workbench.record_persist_error(persist_error, persist_summary); + + assert!(workbench.unavailable_leaders.contains(&leader_id)); + + let subworkbench = workbench.subworkbenches.get(&0).unwrap(); + assert_eq!(subworkbench.num_attempts, 1); + + assert!(matches!( + subworkbench.last_failure_opt, + Some(SubworkbenchFailure::Unavailable) + )); + assert!(subworkbench.persist_success_opt.is_none()); + } + + #[test] + fn test_ingest_workbench_record_persist_error_internal() { + let ingest_subrequests = vec![IngestSubrequest { + subrequest_id: 0, + ..Default::default() + }]; + let mut workbench = IngestWorkbench::new(ingest_subrequests, 1); + + let persist_error = IngestV2Error::Internal("IO error".to_string()); + let persist_summary = PersistRequestSummary { + leader_id: NodeId::from("test-leader"), + subrequest_ids: vec![0], + }; + workbench.record_persist_error(persist_error, persist_summary); + + let subworkbench = workbench.subworkbenches.get(&0).unwrap(); + assert_eq!(subworkbench.num_attempts, 1); + + assert!(matches!( + &subworkbench.last_failure_opt, + Some(SubworkbenchFailure::Internal(message)) if message.contains("IO error") + )); + assert!(subworkbench.persist_success_opt.is_none()); + } + #[test] fn test_ingest_workbench_record_persist_failure() { let ingest_subrequests = vec![IngestSubrequest {