diff --git a/quickwit/quickwit-common/src/tower/pool.rs b/quickwit/quickwit-common/src/tower/pool.rs index b1a0e584f3a..ac2265e24f9 100644 --- a/quickwit/quickwit-common/src/tower/pool.rs +++ b/quickwit/quickwit-common/src/tower/pool.rs @@ -178,7 +178,7 @@ where } /// Removes a value from the pool. - pub fn remove(&self, key: &K) { + fn remove(&self, key: &K) { self.pool .write() .expect("lock should not be poisoned") diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 762cff87cff..14f231776de 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -273,9 +273,7 @@ impl Ingester { "failed to create mrecordlog queue `{}`: {}", queue_id, io_error ); - return Err(IngestV2Error::IngesterUnavailable { - ingester_id: shard.leader_id.into(), - }); + return Err(IngestV2Error::Internal(format!("Io Error: {io_error}"))); } }; let rate_limiter = RateLimiter::from_settings(self.rate_limiter_settings); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index a1c938b6b98..9d4de397b20 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -109,7 +109,6 @@ impl IngestRouter { pub fn subscribe(&self, event_broker: &EventBroker) { let weak_router_state = WeakRouterState(Arc::downgrade(&self.state)); - event_broker .subscribe::(weak_router_state.clone()) .forever(); @@ -138,8 +137,8 @@ impl IngestRouter { if !state_guard.routing_table.has_open_shards( &subrequest.index_id, &subrequest.source_id, - &mut closed_shards, ingester_pool, + &mut closed_shards, &mut unavailable_leaders, ) { let subrequest = GetOrCreateOpenShardsSubrequest { @@ -153,16 +152,10 @@ impl IngestRouter { drop(state_guard); if !closed_shards.is_empty() { - info!( - "reporting {} closed shard(s) to control plane", - closed_shards.len() - ) + info!(closed_shards=?closed_shards, "reporting closed shard(s) to control plane"); } if !unavailable_leaders.is_empty() { - info!( - "reporting {} unavailable leader(s) to control plane", - unavailable_leaders.len() - ); + info!(unvailable_leaders=?unavailable_leaders, "reporting unavailable leader(s) to control plane"); } GetOrCreateOpenShardsRequest { subrequests: get_open_shards_subrequests, @@ -267,9 +260,10 @@ impl IngestRouter { for subrequest_id in persist_summary.subrequest_ids { workbench.record_no_shards_available(subrequest_id); } - self.ingester_pool.remove(&persist_summary.leader_id); } - _ => { + IngestV2Error::TooManyRequests + | IngestV2Error::Internal(_) + | IngestV2Error::ShardNotFound { .. } => { for subrequest_id in persist_summary.subrequest_ids { workbench.record_internal_error( subrequest_id, @@ -394,7 +388,6 @@ impl IngestRouter { ) -> IngestV2Result { let commit_type = ingest_request.commit_type(); let mut workbench = IngestWorkbench::new(ingest_request.subrequests, max_num_attempts); - while !workbench.is_complete() { workbench.new_attempt(); self.batch_persist(&mut workbench, commit_type).await; @@ -1015,7 +1008,7 @@ mod tests { } #[tokio::test] - async fn test_router_process_persist_results_removes_unavailable_leaders() { + async fn test_router_process_persist_results_does_not_removes_unavailable_leaders() { let self_node_id = "test-router".into(); let control_plane = ControlPlaneServiceClient::mock().into(); @@ -1091,7 +1084,7 @@ mod tests { Some(SubworkbenchFailure::NoShardsAvailable { .. }) )); - assert!(ingester_pool.is_empty()); + assert!(!ingester_pool.is_empty()); } #[tokio::test] diff --git a/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs b/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs index 51bffe2c1f9..ca8a9788a70 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs @@ -111,21 +111,21 @@ impl RoutingTableEntry { /// unavailable ingesters encountered along the way. pub fn has_open_shards( &self, - closed_shard_ids: &mut Vec, ingester_pool: &IngesterPool, + closed_shard_ids: &mut Vec, unavailable_leaders: &mut HashSet, ) -> bool { - for shards in [&self.local_shards, &self.remote_shards] { - for shard in shards { - if shard.shard_state.is_closed() { - closed_shard_ids.push(shard.shard_id); - } else if shard.shard_state.is_open() { - if ingester_pool.contains_key(&shard.leader_id) { - return true; - } else { - let leader_id: NodeId = shard.leader_id.clone(); - unavailable_leaders.insert(leader_id); - } + let shards = self.local_shards.iter().chain(self.remote_shards.iter()); + for shard in shards { + if shard.shard_state.is_closed() { + closed_shard_ids.push(shard.shard_id); + } else if shard.shard_state.is_open() { + if ingester_pool.contains_key(&shard.leader_id) { + return true; + } else { + // Note we do not change the state of the shard to unavailable here. + let leader_id: NodeId = shard.leader_id.clone(); + unavailable_leaders.insert(leader_id); } } } @@ -328,31 +328,36 @@ impl RoutingTable { self.table.get(&key) } + /// Returns `true` if the router already knows about a shard for a given source that has + /// an available `leader`. + /// + /// If this function returns false, it populates the set of unavailable leaders and closed + /// shards. These will be joined to the GetOrCreate shard request emitted to the control + /// plane. pub fn has_open_shards( &self, index_id: impl Into, source_id: impl Into, - closed_shards: &mut Vec, ingester_pool: &IngesterPool, + closed_shards: &mut Vec, unavailable_leaders: &mut HashSet, ) -> bool { - if let Some(entry) = self.find_entry(index_id, source_id) { - let mut closed_shard_ids: Vec = Vec::new(); - - let result = - entry.has_open_shards(&mut closed_shard_ids, ingester_pool, unavailable_leaders); - - if !closed_shard_ids.is_empty() { - closed_shards.push(ShardIds { - index_uid: entry.index_uid.clone().into(), - source_id: entry.source_id.clone(), - shard_ids: closed_shard_ids, - }); - } - result - } else { - false + let Some(entry) = self.find_entry(index_id, source_id) else { + return false; + }; + let mut closed_shard_ids: Vec = Vec::new(); + + let result = + entry.has_open_shards(ingester_pool, &mut closed_shard_ids, unavailable_leaders); + + if !closed_shard_ids.is_empty() { + closed_shards.push(ShardIds { + index_uid: entry.index_uid.clone().into(), + source_id: entry.source_id.clone(), + shard_ids: closed_shard_ids, + }); } + result } /// Replaces the routing table entry for the source with the provided shards. @@ -530,8 +535,8 @@ mod tests { let mut unavailable_leaders = HashSet::new(); assert!(!table_entry.has_open_shards( - &mut closed_shard_ids, &ingester_pool, + &mut closed_shard_ids, &mut unavailable_leaders )); assert!(closed_shard_ids.is_empty()); @@ -570,8 +575,8 @@ mod tests { remote_round_robin_idx: AtomicUsize::default(), }; assert!(table_entry.has_open_shards( - &mut closed_shard_ids, &ingester_pool, + &mut closed_shard_ids, &mut unavailable_leaders )); assert_eq!(closed_shard_ids.len(), 1); @@ -611,8 +616,8 @@ mod tests { remote_round_robin_idx: AtomicUsize::default(), }; assert!(table_entry.has_open_shards( - &mut closed_shard_ids, &ingester_pool, + &mut closed_shard_ids, &mut unavailable_leaders )); assert_eq!(closed_shard_ids.len(), 1); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs b/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs index 1aa99bb1185..db0b5c80dcd 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs @@ -27,7 +27,7 @@ use quickwit_proto::ingest::router::{ IngestFailure, IngestFailureReason, IngestResponseV2, IngestSubrequest, IngestSuccess, }; use quickwit_proto::ingest::IngestV2Result; -use quickwit_proto::types::{ShardId, SubrequestId}; +use quickwit_proto::types::SubrequestId; use tracing::warn; /// A helper struct for managing the state of the subrequests of an ingest request during multiple @@ -65,6 +65,8 @@ impl IngestWorkbench { self.num_attempts += 1; } + /// Returns true if all subrequests were successful or if the number of + /// attempts has been exhausted. pub fn is_complete(&self) -> bool { self.num_successes >= self.subworkbenches.len() || self.num_attempts >= self.max_num_attempts @@ -146,10 +148,8 @@ impl IngestWorkbench { return; }; subworkbench.num_attempts += 1; - subworkbench.last_failure_opt = Some(SubworkbenchFailure::Persist(( - persist_failure.shard_id, - persist_failure.reason(), - ))); + subworkbench.last_failure_opt = + Some(SubworkbenchFailure::Persist(persist_failure.reason())); } pub fn record_no_shards_available(&mut self, subrequest_id: SubrequestId) { @@ -225,7 +225,7 @@ pub(super) enum SubworkbenchFailure { IndexNotFound, SourceNotFound, NoShardsAvailable, - Persist((ShardId, PersistFailureReason)), + Persist(PersistFailureReason), Internal(String), } @@ -236,7 +236,7 @@ impl SubworkbenchFailure { Self::SourceNotFound => IngestFailureReason::SourceNotFound, Self::Internal(_) => IngestFailureReason::Internal, Self::NoShardsAvailable => IngestFailureReason::NoShardsAvailable, - Self::Persist((_shard_id, persist_failure_reason)) => (*persist_failure_reason).into(), + Self::Persist(persist_failure_reason) => (*persist_failure_reason).into(), } } } @@ -262,11 +262,15 @@ impl IngestSubworkbench { self.persist_success_opt.is_none() && self.last_failure_is_transient() } + /// Returns `false` if and only if the last attempt suggest retrying will fail. + /// e.g.: + /// - the index does not exist + /// - the source does not exist. fn last_failure_is_transient(&self) -> bool { match self.last_failure_opt { Some(SubworkbenchFailure::IndexNotFound) => false, Some(SubworkbenchFailure::SourceNotFound) => false, - Some(SubworkbenchFailure::Internal(_)) => false, + Some(SubworkbenchFailure::Internal(_)) => true, Some(SubworkbenchFailure::NoShardsAvailable) => true, Some(SubworkbenchFailure::Persist(_)) => true, None => true, @@ -477,7 +481,7 @@ mod tests { let subworkbench = workbench.subworkbenches.get(&0).unwrap(); assert!(matches!( subworkbench.last_failure_opt, - Some(SubworkbenchFailure::Persist((shard_id, reason))) if shard_id == 1 && reason == PersistFailureReason::ResourceExhausted + Some(SubworkbenchFailure::Persist(reason)) if reason == PersistFailureReason::ResourceExhausted )); assert_eq!(subworkbench.num_attempts, 1); } diff --git a/quickwit/quickwit-proto/protos/quickwit/control_plane.proto b/quickwit/quickwit-proto/protos/quickwit/control_plane.proto index 6dfd55c0da1..15b5b84dd75 100644 --- a/quickwit/quickwit-proto/protos/quickwit/control_plane.proto +++ b/quickwit/quickwit-proto/protos/quickwit/control_plane.proto @@ -66,6 +66,10 @@ service ControlPlaneService { message GetOrCreateOpenShardsRequest { repeated GetOrCreateOpenShardsSubrequest subrequests = 1; repeated quickwit.ingest.ShardIds closed_shards = 2; + // The control plane should return shards that are not present on the supplied leaders. + // + // The control plane does not change the status of those leaders just from this signal. + // It will check the status of its own ingester pool. repeated string unavailable_leaders = 3; } diff --git a/quickwit/quickwit-proto/protos/quickwit/ingest.proto b/quickwit/quickwit-proto/protos/quickwit/ingest.proto index b0fea66d59e..bdcde3f0eeb 100644 --- a/quickwit/quickwit-proto/protos/quickwit/ingest.proto +++ b/quickwit/quickwit-proto/protos/quickwit/ingest.proto @@ -52,7 +52,7 @@ enum ShardState { SHARD_STATE_UNSPECIFIED = 0; // The shard is open and accepts write requests. SHARD_STATE_OPEN = 1; - // The ingester hosting the shard is unavailable. + // The leader ingester hosting the shard is unavailable. SHARD_STATE_UNAVAILABLE = 2; // The shard is closed and cannot be written to. // It can be safely deleted if the publish position is superior or equal to `~eof`. diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.rs index 744131883ab..5686795a413 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.rs @@ -105,7 +105,7 @@ pub enum ShardState { Unspecified = 0, /// The shard is open and accepts write requests. Open = 1, - /// The ingester hosting the shard is unavailable. + /// The leader ingester hosting the shard is unavailable. Unavailable = 2, /// The shard is closed and cannot be written to. /// It can be safely deleted if the publish position is superior or equal to `~eof`. diff --git a/quickwit/quickwit-proto/src/ingest/mod.rs b/quickwit/quickwit-proto/src/ingest/mod.rs index f074306d423..5c443380a7c 100644 --- a/quickwit/quickwit-proto/src/ingest/mod.rs +++ b/quickwit/quickwit-proto/src/ingest/mod.rs @@ -43,6 +43,7 @@ pub enum IngestV2Error { ShardNotFound { shard_id: ShardId }, #[error("request timed out")] Timeout, + // This error is provoked by semaphore located on the router. #[error("too many requests")] TooManyRequests, // TODO: Merge `Transport` and `IngesterUnavailable` into a single `Unavailable` error.