diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index a081074a531..9642d48a198 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -890,7 +890,9 @@ mod tests { use quickwit_metastore::{ CreateIndexRequestExt, IndexMetadata, ListIndexesMetadataResponseExt, }; - use quickwit_proto::control_plane::GetOrCreateOpenShardsSubrequest; + use quickwit_proto::control_plane::{ + GetOrCreateOpenShardsFailureReason, GetOrCreateOpenShardsSubrequest, + }; use quickwit_proto::indexing::{ApplyIndexingPlanRequest, CpuCapacity, IndexingServiceClient}; use quickwit_proto::ingest::ingester::{IngesterServiceClient, RetainShardsResponse}; use quickwit_proto::ingest::{Shard, ShardState}; @@ -1998,7 +2000,7 @@ mod tests { MetastoreServiceClient::from(mock_metastore), ); - let error = control_plane_mailbox + let response = control_plane_mailbox .ask(GetOrCreateOpenShardsRequest { subrequests: vec![GetOrCreateOpenShardsSubrequest { subrequest_id: 0, @@ -2010,8 +2012,13 @@ mod tests { }) .await .unwrap() - .unwrap_err(); - assert!(matches!(error, ControlPlaneError::Unavailable { .. })); + .unwrap(); + assert!(response.successes.is_empty()); + assert_eq!(response.failures.len(), 1); + assert!(matches!( + response.failures[0].reason(), + GetOrCreateOpenShardsFailureReason::NoIngestersAvailable + )); let control_plane_state = control_plane_mailbox.ask(Observe).await.unwrap(); assert_eq!(control_plane_state.num_indexes, 1); diff --git a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs index 2ce268f617a..3798a58ea09 100644 --- a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs +++ b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs @@ -28,21 +28,19 @@ use quickwit_common::pretty::PrettySample; use quickwit_common::Progress; use quickwit_ingest::{IngesterPool, LocalShardsUpdate}; use quickwit_proto::control_plane::{ - AdviseResetShardsRequest, AdviseResetShardsResponse, ControlPlaneError, ControlPlaneResult, + AdviseResetShardsRequest, AdviseResetShardsResponse, ControlPlaneResult, GetOrCreateOpenShardsFailure, GetOrCreateOpenShardsFailureReason, GetOrCreateOpenShardsRequest, GetOrCreateOpenShardsResponse, GetOrCreateOpenShardsSuccess, }; use quickwit_proto::ingest::ingester::{ - CloseShardsRequest, IngesterService, InitShardsRequest, PingRequest, RetainShardsForSource, + CloseShardsRequest, IngesterService, InitShardsRequest, RetainShardsForSource, RetainShardsRequest, }; use quickwit_proto::ingest::{IngestV2Error, Shard, ShardIdPosition, ShardIdPositions, ShardIds}; use quickwit_proto::metastore; use quickwit_proto::metastore::{MetastoreService, MetastoreServiceClient}; use quickwit_proto::types::{IndexUid, NodeId, ShardId, SourceUid}; -use rand::seq::SliceRandom; use serde::{Deserialize, Serialize}; -use tokio::time::timeout; use tracing::{error, info, warn}; use ulid::Ulid; @@ -59,15 +57,9 @@ const SCALE_UP_SHARDS_THRESHOLD_MIB_PER_SEC: f32 = const SCALE_DOWN_SHARDS_THRESHOLD_MIB_PER_SEC: f32 = MAX_SHARD_INGESTION_THROUGHPUT_MIB_PER_SEC * 2. / 10.; -const PING_LEADER_TIMEOUT: Duration = if cfg!(test) { - Duration::from_millis(50) -} else { - Duration::from_secs(2) -}; - const FIRE_AND_FORGET_TIMEOUT: Duration = Duration::from_secs(3); -/// Spawn a new task to execute the given future, +/// Spawns a new task to execute the given future, /// and stops polling it/drops it after a timeout. /// /// All errors are ignored, and not even logged. @@ -178,103 +170,6 @@ impl IngestController { wait_handle } - /// Pings an ingester to determine whether it is available for hosting a shard. If a follower ID - /// is provided, the leader candidate is in charge of pinging the follower candidate as - /// well. - async fn ping_leader_and_follower( - &mut self, - leader_id: &NodeId, - follower_id_opt: Option<&NodeId>, - progress: &Progress, - ) -> Result<(), PingError> { - let mut leader_ingester = self - .ingester_pool - .get(leader_id) - .ok_or(PingError::LeaderUnavailable)?; - - let ping_request = PingRequest { - leader_id: leader_id.clone().into(), - follower_id: follower_id_opt - .cloned() - .map(|follower_id| follower_id.into()), - }; - progress.protect_future(timeout( - PING_LEADER_TIMEOUT, - leader_ingester.ping(ping_request), - )) - .await - .map_err(|_| PingError::LeaderUnavailable)? // The leader timed out. - .map_err(|error| { - if let Some(follower_id) = follower_id_opt { - if matches!(error, IngestV2Error::IngesterUnavailable { ingester_id } if ingester_id == *follower_id) { - return PingError::FollowerUnavailable; - } - } - PingError::LeaderUnavailable - })?; - Ok(()) - } - - /// Finds an available leader-follower pair to host a shard. If the replication factor is set to - /// 1, only a leader is returned. If no nodes are available, `None` is returned. - async fn find_leader_and_follower( - &mut self, - unavailable_ingesters: &mut FnvHashSet, - progress: &Progress, - ) -> Option<(NodeId, Option)> { - let mut candidates: Vec = self - .ingester_pool - .keys() - .into_iter() - .filter(|node_id| !unavailable_ingesters.contains(node_id)) - .collect(); - candidates.shuffle(&mut rand::thread_rng()); - - #[cfg(test)] - candidates.sort(); - - if self.replication_factor == 1 { - for leader_id in candidates { - if unavailable_ingesters.contains(&leader_id) { - continue; - } - if self - .ping_leader_and_follower(&leader_id, None, progress) - .await - .is_ok() - { - return Some((leader_id, None)); - } - } - } else { - for (leader_id, follower_id) in candidates.into_iter().tuple_combinations() { - // We must perform this check here since the `unavailable_ingesters` set can grow as - // we go through the loop. - if unavailable_ingesters.contains(&leader_id) - || unavailable_ingesters.contains(&follower_id) - { - continue; - } - match self - .ping_leader_and_follower(&leader_id, Some(&follower_id), progress) - .await - { - Ok(_) => return Some((leader_id, Some(follower_id))), - Err(PingError::LeaderUnavailable) => { - unavailable_ingesters.insert(leader_id); - } - Err(PingError::FollowerUnavailable) => { - // We do not mark the follower as unavailable here. The issue could be - // specific to the link between the leader and follower. We define - // unavailability as being unavailable from the point of view of the control - // plane. - } - } - } - } - None - } - fn handle_closed_shards(&self, closed_shards: Vec, model: &mut ControlPlaneModel) { for closed_shard in closed_shards { let index_uid: IndexUid = closed_shard.index_uid().clone(); @@ -356,7 +251,7 @@ impl IngestController { ) -> ControlPlaneResult { self.handle_closed_shards(get_open_shards_request.closed_shards, model); - let mut unavailable_leaders: FnvHashSet = get_open_shards_request + let unavailable_leaders: FnvHashSet = get_open_shards_request .unavailable_leaders .into_iter() .map(|ingester_id| ingester_id.into()) @@ -407,66 +302,171 @@ impl IngestController { }; get_or_create_open_shards_successes.push(get_or_create_open_shards_success); } else { - // TODO: Find leaders in batches. - // TODO: Round-robin leader-follower pairs or choose according to load. - let (leader_id, follower_id) = self - .find_leader_and_follower(&mut unavailable_leaders, progress) - .await - .ok_or_else(|| { - ControlPlaneError::Unavailable("no ingester available".to_string()) - })?; let shard_id = ShardId::from(Ulid::new()); let open_shards_subrequest = metastore::OpenShardsSubrequest { subrequest_id: get_open_shards_subrequest.subrequest_id, index_uid: index_uid.into(), source_id: get_open_shards_subrequest.source_id, shard_id: Some(shard_id), - leader_id: leader_id.into(), - follower_id: follower_id.map(|follower_id| follower_id.into()), + // These attributes will be overwritten in the next stage. + leader_id: "".to_string(), + follower_id: None, }; open_shards_subrequests.push(open_shards_subrequest); } } if !open_shards_subrequests.is_empty() { - let open_shards_request = metastore::OpenShardsRequest { - subrequests: open_shards_subrequests, - }; - let open_shards_response = progress - .protect_future(self.metastore.open_shards(open_shards_request)) - .await?; - - // TODO: Handle failures. - let _ = self.init_shards(&open_shards_response, progress).await; - - for open_shards_subresponse in open_shards_response.subresponses { - let index_uid: IndexUid = open_shards_subresponse.index_uid().clone(); - let source_id = open_shards_subresponse.source_id.clone(); - model.insert_shards( - &index_uid, - &source_id, - open_shards_subresponse.opened_shards, - ); - if let Some(open_shard_entries) = - model.find_open_shards(&index_uid, &source_id, &unavailable_leaders) + if let Some(leader_follower_pairs) = + self.allocate_shards(open_shards_subrequests.len(), &unavailable_leaders, model) + { + for (open_shards_subrequest, (leader_id, follower_opt)) in open_shards_subrequests + .iter_mut() + .zip(leader_follower_pairs) { - let open_shards = open_shard_entries - .into_iter() - .map(|shard_entry| shard_entry.shard) - .collect(); - let get_or_create_open_shards_success = GetOrCreateOpenShardsSuccess { - subrequest_id: open_shards_subresponse.subrequest_id, - index_uid: index_uid.into(), - source_id: open_shards_subresponse.source_id, - open_shards, + open_shards_subrequest.leader_id = leader_id.into(); + open_shards_subrequest.follower_id = follower_opt.map(Into::into); + } + let open_shards_request = metastore::OpenShardsRequest { + subrequests: open_shards_subrequests, + }; + let open_shards_response = progress + .protect_future(self.metastore.open_shards(open_shards_request)) + .await?; + + // TODO: Handle failures. + let _ = self.init_shards(&open_shards_response, progress).await; + + for open_shards_subresponse in open_shards_response.subresponses { + let index_uid: IndexUid = open_shards_subresponse.index_uid().clone(); + let source_id = open_shards_subresponse.source_id.clone(); + model.insert_shards( + &index_uid, + &source_id, + open_shards_subresponse.opened_shards, + ); + if let Some(open_shard_entries) = + model.find_open_shards(&index_uid, &source_id, &unavailable_leaders) + { + let open_shards = open_shard_entries + .into_iter() + .map(|shard_entry| shard_entry.shard) + .collect(); + let get_or_create_open_shards_success = GetOrCreateOpenShardsSuccess { + subrequest_id: open_shards_subresponse.subrequest_id, + index_uid: index_uid.into(), + source_id: open_shards_subresponse.source_id, + open_shards, + }; + get_or_create_open_shards_successes.push(get_or_create_open_shards_success); + } + } + } else { + for open_shards_subrequest in open_shards_subrequests { + let get_or_create_open_shards_failure = GetOrCreateOpenShardsFailure { + subrequest_id: open_shards_subrequest.subrequest_id, + index_id: open_shards_subrequest.index_uid().index_id.clone(), + source_id: open_shards_subrequest.source_id, + reason: GetOrCreateOpenShardsFailureReason::NoIngestersAvailable as i32, }; - get_or_create_open_shards_successes.push(get_or_create_open_shards_success); + get_or_create_open_shards_failures.push(get_or_create_open_shards_failure); } } } - Ok(GetOrCreateOpenShardsResponse { + let response = GetOrCreateOpenShardsResponse { successes: get_or_create_open_shards_successes, failures: get_or_create_open_shards_failures, - }) + }; + Ok(response) + } + + /// Allocates and assigns new shards to ingesters. + fn allocate_shards( + &self, + num_shards_to_allocate: usize, + unavailable_leaders: &FnvHashSet, + model: &ControlPlaneModel, + ) -> Option)>> { + let ingesters: Vec = self + .ingester_pool + .keys() + .into_iter() + .filter(|ingester| !unavailable_leaders.contains(ingester)) + .sorted_by(|left, right| left.cmp(right)) + .collect(); + + let num_ingesters = ingesters.len(); + + if num_ingesters == 0 { + warn!("failed to allocate {num_shards_to_allocate} shards: no ingesters available"); + return None; + } else if self.replication_factor > num_ingesters { + warn!( + "failed to allocate {num_shards_to_allocate} shards: replication factor is \ + greater than the number of available ingesters" + ); + return None; + } + let mut leader_follower_pairs = Vec::with_capacity(num_shards_to_allocate); + + let mut num_open_shards: usize = 0; + let mut num_open_shards_per_ingester: HashMap<&str, usize> = + HashMap::with_capacity(num_ingesters); + + for shard in model.all_shards() { + if shard.is_open() && !unavailable_leaders.contains(&shard.leader_id) { + num_open_shards += 1; + + *num_open_shards_per_ingester + .entry(&shard.leader_id) + .or_default() += 1; + } + } + let mut num_remaining_shards_to_allocate = num_shards_to_allocate; + let num_open_shards_target = num_shards_to_allocate + num_open_shards; + let max_num_shards_to_allocate_per_node = num_open_shards_target / num_ingesters; + + // Allocate at most `max_num_shards_to_allocate_per_node` shards to each ingester. + for (leader_id, follower_id) in ingesters.iter().zip(ingesters.iter().cycle().skip(1)) { + if num_remaining_shards_to_allocate == 0 { + break; + } + let num_open_shards_inner = num_open_shards_per_ingester + .get(leader_id.as_str()) + .copied() + .unwrap_or_default(); + + let num_shards_to_allocate_inner = max_num_shards_to_allocate_per_node + .saturating_sub(num_open_shards_inner) + .min(num_remaining_shards_to_allocate); + + for _ in 0..num_shards_to_allocate_inner { + num_remaining_shards_to_allocate -= 1; + + let leader = leader_id.clone(); + let mut follower_opt = None; + + if self.replication_factor > 1 { + follower_opt = Some(follower_id.clone()); + } + leader_follower_pairs.push((leader, follower_opt)); + } + } + // Allocate remaining shards one by one. + for (leader_id, follower_id) in ingesters.iter().zip(ingesters.iter().cycle().skip(1)) { + if num_remaining_shards_to_allocate == 0 { + break; + } + num_remaining_shards_to_allocate -= 1; + + let leader = leader_id.clone(); + let mut follower_opt = None; + + if self.replication_factor > 1 { + follower_opt = Some(follower_id.clone()); + } + leader_follower_pairs.push((leader, follower_opt)); + } + Some(leader_follower_pairs) } /// Calls init shards on the leaders hosting newly opened shards. @@ -526,13 +526,13 @@ impl IngestController { source_id=%source_uid.source_id, "scaling up number of shards to {new_num_open_shards}" ); - let mut unavailable_leaders: FnvHashSet = FnvHashSet::default(); + let unavailable_leaders: FnvHashSet = FnvHashSet::default(); let Some((leader_id, follower_id)) = self - .find_leader_and_follower(&mut unavailable_leaders, progress) - .await + .allocate_shards(1, &unavailable_leaders, model) + .and_then(|pairs| pairs.into_iter().next()) else { - warn!("failed to scale up number of shards: no ingester available"); + warn!("failed to scale up number of shards: no ingesters available"); model.release_scaling_permits(&source_uid, ScalingMode::Up, NUM_PERMITS); return; }; @@ -542,7 +542,7 @@ impl IngestController { index_uid: source_uid.index_uid.clone().into(), source_id: source_uid.source_id.clone(), shard_id: Some(shard_id), - leader_id: leader_id.into(), + leader_id: leader_id.to_string(), follower_id: follower_id.map(Into::into), }; let open_shards_request = metastore::OpenShardsRequest { @@ -726,12 +726,6 @@ fn find_scale_down_candidate( }) } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum PingError { - LeaderUnavailable, - FollowerUnavailable, -} - #[cfg(test)] mod tests { @@ -746,7 +740,7 @@ mod tests { use quickwit_proto::control_plane::GetOrCreateOpenShardsSubrequest; use quickwit_proto::ingest::ingester::{ CloseShardsResponse, IngesterServiceClient, InitShardsResponse, MockIngesterService, - PingResponse, RetainShardsResponse, + RetainShardsResponse, }; use quickwit_proto::ingest::{Shard, ShardState}; use quickwit_proto::metastore::MetastoreError; @@ -754,207 +748,6 @@ mod tests { use super::*; - #[tokio::test] - async fn test_ingest_controller_ping_leader() { - let progress = Progress::default(); - - let mock_metastore = MetastoreServiceClient::mock(); - let ingester_pool = IngesterPool::default(); - let replication_factor = 1; - let mut ingest_controller = IngestController::new( - MetastoreServiceClient::from(mock_metastore), - ingester_pool.clone(), - replication_factor, - ); - - let leader_id: NodeId = "test-ingester-0".into(); - let error = ingest_controller - .ping_leader_and_follower(&leader_id, None, &progress) - .await - .unwrap_err(); - assert!(matches!(error, PingError::LeaderUnavailable)); - - let mut mock_ingester = MockIngesterService::default(); - mock_ingester.expect_ping().once().returning(|request| { - assert_eq!(request.leader_id, "test-ingester-0"); - assert!(request.follower_id.is_none()); - - Ok(PingResponse {}) - }); - let ingester: IngesterServiceClient = mock_ingester.into(); - ingester_pool.insert("test-ingester-0".into(), ingester.clone()); - - ingest_controller - .ping_leader_and_follower(&leader_id, None, &progress) - .await - .unwrap(); - - let mut mock_ingester = MockIngesterService::default(); - mock_ingester.expect_ping().once().returning(|request| { - assert_eq!(request.leader_id, "test-ingester-0"); - assert!(request.follower_id.is_none()); - - let leader_id: NodeId = "test-ingester-0".into(); - Err(IngestV2Error::IngesterUnavailable { - ingester_id: leader_id, - }) - }); - let ingester: IngesterServiceClient = mock_ingester.into(); - ingester_pool.insert("test-ingester-0".into(), ingester.clone()); - - let error = ingest_controller - .ping_leader_and_follower(&leader_id, None, &progress) - .await - .unwrap_err(); - assert!(matches!(error, PingError::LeaderUnavailable)); - - let mut mock_ingester = MockIngesterService::default(); - mock_ingester.expect_ping().once().returning(|request| { - assert_eq!(request.leader_id, "test-ingester-0"); - assert_eq!(request.follower_id.unwrap(), "test-ingester-1"); - - let follower_id: NodeId = "test-ingester-1".into(); - Err(IngestV2Error::IngesterUnavailable { - ingester_id: follower_id, - }) - }); - let ingester: IngesterServiceClient = mock_ingester.into(); - ingester_pool.insert("test-ingester-0".into(), ingester.clone()); - - let follower_id: NodeId = "test-ingester-1".into(); - let error = ingest_controller - .ping_leader_and_follower(&leader_id, Some(&follower_id), &progress) - .await - .unwrap_err(); - assert!(matches!(error, PingError::FollowerUnavailable)); - } - - #[tokio::test] - async fn test_ingest_controller_find_leader_replication_factor_1() { - let progress = Progress::default(); - - let mock_metastore = MetastoreServiceClient::mock(); - let ingester_pool = IngesterPool::default(); - let replication_factor = 1; - let mut ingest_controller = IngestController::new( - MetastoreServiceClient::from(mock_metastore), - ingester_pool.clone(), - replication_factor, - ); - - let leader_follower_pair = ingest_controller - .find_leader_and_follower(&mut FnvHashSet::default(), &progress) - .await; - assert!(leader_follower_pair.is_none()); - - let mut mock_ingester = MockIngesterService::default(); - mock_ingester.expect_ping().times(2).returning(|request| { - assert_eq!(request.leader_id, "test-ingester-0"); - assert!(request.follower_id.is_none()); - - Err(IngestV2Error::Internal("Io error".to_string())) - }); - let ingester: IngesterServiceClient = mock_ingester.into(); - ingester_pool.insert("test-ingester-0".into(), ingester.clone()); - - let leader_follower_pair = ingest_controller - .find_leader_and_follower(&mut FnvHashSet::default(), &progress) - .await; - assert!(leader_follower_pair.is_none()); - - let mut mock_ingester = MockIngesterService::default(); - mock_ingester.expect_ping().once().returning(|request| { - assert_eq!(request.leader_id, "test-ingester-1"); - assert!(request.follower_id.is_none()); - - Ok(PingResponse {}) - }); - let ingester: IngesterServiceClient = mock_ingester.into(); - ingester_pool.insert("test-ingester-1".into(), ingester); - - let (leader_id, follower_id) = ingest_controller - .find_leader_and_follower(&mut FnvHashSet::default(), &progress) - .await - .unwrap(); - assert_eq!(leader_id.as_str(), "test-ingester-1"); - assert!(follower_id.is_none()); - } - - #[tokio::test] - async fn test_ingest_controller_find_leader_replication_factor_2() { - let progress = Progress::default(); - - let mock_metastore = MetastoreServiceClient::mock(); - let ingester_pool = IngesterPool::default(); - let replication_factor = 2; - let mut ingest_controller = IngestController::new( - MetastoreServiceClient::from(mock_metastore), - ingester_pool.clone(), - replication_factor, - ); - - let leader_follower_pair = ingest_controller - .find_leader_and_follower(&mut FnvHashSet::default(), &progress) - .await; - assert!(leader_follower_pair.is_none()); - - let mut mock_ingester = MockIngesterService::default(); - mock_ingester.expect_ping().once().returning(|request| { - assert_eq!(request.leader_id, "test-ingester-0"); - assert_eq!(request.follower_id.unwrap(), "test-ingester-1"); - - Err(IngestV2Error::IngesterUnavailable { - ingester_id: "test-ingester-1".into(), - }) - }); - let ingester: IngesterServiceClient = mock_ingester.into(); - ingester_pool.insert("test-ingester-0".into(), ingester.clone()); - - let mut mock_ingester = MockIngesterService::default(); - mock_ingester.expect_ping().returning(|_request| { - panic!("`test-ingester-1` should not be pinged."); - }); - let ingester: IngesterServiceClient = mock_ingester.into(); - ingester_pool.insert("test-ingester-1".into(), ingester.clone()); - - let leader_follower_pair = ingest_controller - .find_leader_and_follower(&mut FnvHashSet::default(), &progress) - .await; - assert!(leader_follower_pair.is_none()); - - let mut mock_ingester = MockIngesterService::default(); - mock_ingester.expect_ping().once().returning(|request| { - assert_eq!(request.leader_id, "test-ingester-0"); - assert_eq!(request.follower_id.unwrap(), "test-ingester-1"); - - Err(IngestV2Error::IngesterUnavailable { - ingester_id: "test-ingester-1".into(), - }) - }); - mock_ingester.expect_ping().once().returning(|request| { - assert_eq!(request.leader_id, "test-ingester-0"); - assert_eq!(request.follower_id.unwrap(), "test-ingester-2"); - - Ok(PingResponse {}) - }); - let ingester: IngesterServiceClient = mock_ingester.into(); - ingester_pool.insert("test-ingester-0".into(), ingester.clone()); - - let mut mock_ingester = MockIngesterService::default(); - mock_ingester.expect_ping().returning(|_request| { - panic!("`test-ingester-2` should not be pinged."); - }); - let ingester: IngesterServiceClient = mock_ingester.into(); - ingester_pool.insert("test-ingester-2".into(), ingester.clone()); - - let (leader_id, follower_id) = ingest_controller - .find_leader_and_follower(&mut FnvHashSet::default(), &progress) - .await - .unwrap(); - assert_eq!(leader_id.as_str(), "test-ingester-0"); - assert_eq!(follower_id.unwrap().as_str(), "test-ingester-2"); - } - #[tokio::test] async fn test_ingest_controller_get_or_create_open_shards() { let source_id: &'static str = "test-source"; @@ -997,13 +790,7 @@ mod tests { }); let ingester_pool = IngesterPool::default(); - let mut mock_ingester = MockIngesterService::default(); - mock_ingester.expect_ping().once().returning(|request| { - assert_eq!(request.leader_id, "test-ingester-1"); - assert_eq!(request.follower_id.unwrap(), "test-ingester-2"); - - Ok(PingResponse {}) - }); + let mock_ingester = MockIngesterService::default(); let ingester: IngesterServiceClient = mock_ingester.into(); ingester_pool.insert("test-ingester-1".into(), ingester.clone()); @@ -1274,9 +1061,205 @@ mod tests { assert!(shard_3.is_open()); } + #[test] + fn test_ingest_controller_allocate_shards() { + let metastore = MetastoreServiceClient::mock().into(); + let ingester_pool = IngesterPool::default(); + let replication_factor = 2; + + let ingest_controller = + IngestController::new(metastore, ingester_pool.clone(), replication_factor); + + let mut model = ControlPlaneModel::default(); + + let leader_follower_pairs_opt = + ingest_controller.allocate_shards(0, &FnvHashSet::default(), &model); + assert!(leader_follower_pairs_opt.is_none()); + + ingester_pool.insert( + "test-ingester-1".into(), + IngesterServiceClient::mock().into(), + ); + + let leader_follower_pairs_opt = + ingest_controller.allocate_shards(0, &FnvHashSet::default(), &model); + assert!(leader_follower_pairs_opt.is_none()); + + ingester_pool.insert( + "test-ingester-2".into(), + IngesterServiceClient::mock().into(), + ); + + let leader_follower_pairs = ingest_controller + .allocate_shards(0, &FnvHashSet::default(), &model) + .unwrap(); + assert!(leader_follower_pairs.is_empty()); + + let leader_follower_pairs = ingest_controller + .allocate_shards(1, &FnvHashSet::default(), &model) + .unwrap(); + assert_eq!(leader_follower_pairs.len(), 1); + assert_eq!(leader_follower_pairs[0].0, "test-ingester-1"); + assert_eq!( + leader_follower_pairs[0].1, + Some(NodeId::from("test-ingester-2")) + ); + + let leader_follower_pairs = ingest_controller + .allocate_shards(2, &FnvHashSet::default(), &model) + .unwrap(); + assert_eq!(leader_follower_pairs.len(), 2); + assert_eq!(leader_follower_pairs[0].0, "test-ingester-1"); + assert_eq!( + leader_follower_pairs[0].1, + Some(NodeId::from("test-ingester-2")) + ); + + assert_eq!(leader_follower_pairs[1].0, "test-ingester-2"); + assert_eq!( + leader_follower_pairs[1].1, + Some(NodeId::from("test-ingester-1")) + ); + + let leader_follower_pairs = ingest_controller + .allocate_shards(3, &FnvHashSet::default(), &model) + .unwrap(); + assert_eq!(leader_follower_pairs.len(), 3); + assert_eq!(leader_follower_pairs[0].0, "test-ingester-1"); + assert_eq!( + leader_follower_pairs[0].1, + Some(NodeId::from("test-ingester-2")) + ); + + assert_eq!(leader_follower_pairs[1].0, "test-ingester-2"); + assert_eq!( + leader_follower_pairs[1].1, + Some(NodeId::from("test-ingester-1")) + ); + + assert_eq!(leader_follower_pairs[2].0, "test-ingester-1"); + assert_eq!( + leader_follower_pairs[2].1, + Some(NodeId::from("test-ingester-2")) + ); + + let index_uid = IndexUid::for_test("test-index", 0); + let source_id: SourceId = "test-source".into(); + let open_shards = vec![Shard { + index_uid: Some(index_uid.clone()), + source_id: source_id.clone(), + shard_id: Some(ShardId::from(1)), + shard_state: ShardState::Open as i32, + leader_id: "test-ingester-1".to_string(), + ..Default::default() + }]; + model.insert_shards(&index_uid, &source_id, open_shards); + + let leader_follower_pairs = ingest_controller + .allocate_shards(3, &FnvHashSet::default(), &model) + .unwrap(); + assert_eq!(leader_follower_pairs.len(), 3); + assert_eq!(leader_follower_pairs[0].0, "test-ingester-1"); + assert_eq!( + leader_follower_pairs[0].1, + Some(NodeId::from("test-ingester-2")) + ); + + assert_eq!(leader_follower_pairs[1].0, "test-ingester-2"); + assert_eq!( + leader_follower_pairs[1].1, + Some(NodeId::from("test-ingester-1")) + ); + + assert_eq!(leader_follower_pairs[2].0, "test-ingester-2"); + assert_eq!( + leader_follower_pairs[2].1, + Some(NodeId::from("test-ingester-1")) + ); + + let open_shards = vec![ + Shard { + index_uid: Some(index_uid.clone()), + source_id: source_id.clone(), + shard_id: Some(ShardId::from(2)), + shard_state: ShardState::Open as i32, + leader_id: "test-ingester-1".to_string(), + ..Default::default() + }, + Shard { + index_uid: Some(index_uid.clone()), + source_id: source_id.clone(), + shard_id: Some(ShardId::from(3)), + shard_state: ShardState::Open as i32, + leader_id: "test-ingester-1".to_string(), + ..Default::default() + }, + ]; + model.insert_shards(&index_uid, &source_id, open_shards); + + let leader_follower_pairs = ingest_controller + .allocate_shards(1, &FnvHashSet::default(), &model) + .unwrap(); + assert_eq!(leader_follower_pairs.len(), 1); + assert_eq!(leader_follower_pairs[0].0, "test-ingester-2"); + assert_eq!( + leader_follower_pairs[0].1, + Some(NodeId::from("test-ingester-1")) + ); + + ingester_pool.insert( + "test-ingester-3".into(), + IngesterServiceClient::mock().into(), + ); + let unavailable_leaders = FnvHashSet::from_iter([NodeId::from("test-ingester-2")]); + let leader_follower_pairs = ingest_controller + .allocate_shards(4, &unavailable_leaders, &model) + .unwrap(); + assert_eq!(leader_follower_pairs.len(), 4); + assert_eq!(leader_follower_pairs[0].0, "test-ingester-3"); + assert_eq!( + leader_follower_pairs[0].1, + Some(NodeId::from("test-ingester-1")) + ); + + assert_eq!(leader_follower_pairs[1].0, "test-ingester-3"); + assert_eq!( + leader_follower_pairs[1].1, + Some(NodeId::from("test-ingester-1")) + ); + + assert_eq!(leader_follower_pairs[2].0, "test-ingester-3"); + assert_eq!( + leader_follower_pairs[2].1, + Some(NodeId::from("test-ingester-1")) + ); + + assert_eq!(leader_follower_pairs[3].0, "test-ingester-1"); + assert_eq!( + leader_follower_pairs[3].1, + Some(NodeId::from("test-ingester-3")) + ); + } + #[tokio::test] async fn test_ingest_controller_handle_local_shards_update() { - let metastore = MetastoreServiceClient::mock().into(); + let mut mock_metastore = MetastoreServiceClient::mock(); + mock_metastore + .expect_open_shards() + .once() + .returning(|request| { + assert_eq!(request.subrequests.len(), 1); + let subrequest = &request.subrequests[0]; + + assert_eq!(subrequest.index_uid(), &IndexUid::for_test("test-index", 0)); + assert_eq!(subrequest.source_id, "test-source"); + assert_eq!(subrequest.leader_id, "test-ingester"); + + Err(MetastoreError::InvalidArgument { + message: "failed to open shards".to_string(), + }) + }); + let metastore = MetastoreServiceClient::from(mock_metastore); let ingester_pool = IngesterPool::default(); let replication_factor = 1; @@ -1355,11 +1338,6 @@ mod tests { "failed to close shards".to_string(), )) }); - ingester_mock.expect_ping().returning(|request| { - assert_eq!(request.leader_id, "test-ingester"); - - Err(IngestV2Error::Internal("failed ping ingester".to_string())) - }); ingester_pool.insert("test-ingester".into(), ingester_mock.into()); let shard_infos = BTreeSet::from_iter([ @@ -1488,11 +1466,6 @@ mod tests { let mut ingester_mock = IngesterServiceClient::mock(); - ingester_mock.expect_ping().returning(|request| { - assert_eq!(request.leader_id, "test-ingester"); - - Ok(PingResponse {}) - }); let index_uid_clone = index_uid.clone(); ingester_mock .expect_init_shards() diff --git a/quickwit/quickwit-control-plane/src/model/mod.rs b/quickwit/quickwit-control-plane/src/model/mod.rs index 592f09bcccc..80d34c3fb9a 100644 --- a/quickwit/quickwit-control-plane/src/model/mod.rs +++ b/quickwit/quickwit-control-plane/src/model/mod.rs @@ -257,7 +257,6 @@ impl ControlPlaneModel { .set_shards_as_unavailable(unavailable_leaders); } - #[cfg(test)] pub(crate) fn all_shards(&self) -> impl Iterator + '_ { self.shard_table.all_shards() } diff --git a/quickwit/quickwit-control-plane/src/model/shard_table.rs b/quickwit/quickwit-control-plane/src/model/shard_table.rs index 158efdf2e2d..42092b82d74 100644 --- a/quickwit/quickwit-control-plane/src/model/shard_table.rs +++ b/quickwit/quickwit-control-plane/src/model/shard_table.rs @@ -263,7 +263,6 @@ impl ShardTable { self.check_invariant(); } - #[cfg(test)] pub(crate) fn all_shards(&self) -> impl Iterator + '_ { self.table_entries .values() diff --git a/quickwit/quickwit-ingest/src/error.rs b/quickwit/quickwit-ingest/src/error.rs index 4eb0479b73c..c81f9ff7744 100644 --- a/quickwit/quickwit-ingest/src/error.rs +++ b/quickwit/quickwit-ingest/src/error.rs @@ -75,9 +75,9 @@ impl From for IngestServiceError { impl From for IngestServiceError { fn from(error: IngestV2Error) -> Self { match error { - IngestV2Error::IngesterUnavailable { .. } - | IngestV2Error::Timeout(_) - | IngestV2Error::Unavailable(_) => IngestServiceError::Unavailable, + IngestV2Error::Timeout(_) | IngestV2Error::Unavailable(_) => { + IngestServiceError::Unavailable + } IngestV2Error::Internal(message) => IngestServiceError::Internal(message), IngestV2Error::ShardNotFound { .. } => { IngestServiceError::Internal("shard not found".to_string()) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs index 29f9f3565f4..617fda9a93c 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs @@ -448,7 +448,7 @@ async fn fault_tolerant_fetch_stream( index_uid=%index_uid, source_id=%source_id, shard_id=%shard_id, - "ingester `{ingester_id}` is not available: failing over to ingester `{failover_ingester_id}`" + "ingester `{ingester_id}` is unavailable: failing over to ingester `{failover_ingester_id}`" ); } else { error!( @@ -456,11 +456,11 @@ async fn fault_tolerant_fetch_stream( index_uid=%index_uid, source_id=%source_id, shard_id=%shard_id, - "ingester `{ingester_id}` is not available: closing fetch stream" + "ingester `{ingester_id}` is unavailable: closing fetch stream" ); - let ingest_error = IngestV2Error::IngesterUnavailable { - ingester_id: ingester_id.clone(), - }; + let message = + format!("ingester `{ingester_id}` is unavailable: closing fetch stream"); + let ingest_error = IngestV2Error::Unavailable(message); // Attempt to send the error to the consumer in a best-effort manner before // returning. let fetch_stream_error = FetchStreamError { diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 0f96f62b584..89a058a7650 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -47,10 +47,9 @@ use quickwit_proto::ingest::ingester::{ IngesterServiceStream, IngesterStatus, InitShardsRequest, InitShardsResponse, ObservationMessage, OpenFetchStreamRequest, OpenObservationStreamRequest, OpenReplicationStreamRequest, OpenReplicationStreamResponse, PersistFailure, - PersistFailureReason, PersistRequest, PersistResponse, PersistSuccess, PingRequest, - PingResponse, ReplicateFailureReason, ReplicateSubrequest, RetainShardsForSource, - RetainShardsRequest, RetainShardsResponse, SynReplicationMessage, TruncateShardsRequest, - TruncateShardsResponse, + PersistFailureReason, PersistRequest, PersistResponse, PersistSuccess, ReplicateFailureReason, + ReplicateSubrequest, RetainShardsForSource, RetainShardsRequest, RetainShardsResponse, + SynReplicationMessage, TruncateShardsRequest, TruncateShardsResponse, }; use quickwit_proto::ingest::{ CommitTypeV2, IngestV2Error, IngestV2Result, Shard, ShardIds, ShardState, @@ -372,12 +371,10 @@ impl Ingester { .try_send(open_message) .expect("channel should be open and have capacity"); - let mut ingester = - self.ingester_pool - .get(&follower_id) - .ok_or(IngestV2Error::IngesterUnavailable { - ingester_id: follower_id.clone(), - })?; + let mut ingester = self.ingester_pool.get(&follower_id).ok_or_else(|| { + let message = format!("ingester `{follower_id}` is unavailable"); + IngestV2Error::Unavailable(message) + })?; let mut ack_replication_stream = ingester .open_replication_stream(syn_replication_stream) .await?; @@ -948,31 +945,6 @@ impl Ingester { Ok(CloseShardsResponse {}) } - async fn ping_inner(&mut self, ping_request: PingRequest) -> IngestV2Result { - let state_guard = self.state.lock_partially().await?; - - if state_guard.status() != IngesterStatus::Ready { - return Err(IngestV2Error::Internal("node decommissioned".to_string())); - } - if ping_request.leader_id != self.self_node_id { - let ping_response = PingResponse {}; - return Ok(ping_response); - }; - let Some(follower_id) = &ping_request.follower_id else { - let ping_response = PingResponse {}; - return Ok(ping_response); - }; - let follower_id: NodeId = follower_id.clone().into(); - let mut ingester = self.ingester_pool.get(&follower_id).ok_or({ - IngestV2Error::IngesterUnavailable { - ingester_id: follower_id, - } - })?; - ingester.ping(ping_request).await?; - let ping_response = PingResponse {}; - Ok(ping_response) - } - async fn decommission_inner( &mut self, _decommission_request: DecommissionRequest, @@ -1077,10 +1049,6 @@ impl IngesterService for Ingester { self.close_shards_inner(close_shards_request).await } - async fn ping(&mut self, ping_request: PingRequest) -> IngestV2Result { - self.ping_inner(ping_request).await - } - async fn decommission( &mut self, decommission_request: DecommissionRequest, diff --git a/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs b/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs index ef711dfa2c9..29dadcb7222 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs @@ -112,6 +112,9 @@ impl IngestWorkbench { GetOrCreateOpenShardsFailureReason::SourceNotFound => { SubworkbenchFailure::SourceNotFound } + GetOrCreateOpenShardsFailureReason::NoIngestersAvailable => { + SubworkbenchFailure::NoShardsAvailable + } GetOrCreateOpenShardsFailureReason::Unspecified => { warn!( "failure reason for subrequest `{}` is unspecified", @@ -145,7 +148,7 @@ impl IngestWorkbench { // `NotFound`, and `TooManyRequests`: in reality, we should never have to handle these cases // here. match persist_error { - IngestV2Error::IngesterUnavailable { .. } | IngestV2Error::Unavailable(_) => { + IngestV2Error::Unavailable(_) => { self.unavailable_leaders.insert(persist_summary.leader_id); for subrequest_id in persist_summary.subrequest_ids { diff --git a/quickwit/quickwit-proto/protos/quickwit/control_plane.proto b/quickwit/quickwit-proto/protos/quickwit/control_plane.proto index b2cff277a49..9748789bd25 100644 --- a/quickwit/quickwit-proto/protos/quickwit/control_plane.proto +++ b/quickwit/quickwit-proto/protos/quickwit/control_plane.proto @@ -103,6 +103,7 @@ enum GetOrCreateOpenShardsFailureReason { GET_OR_CREATE_OPEN_SHARDS_FAILURE_REASON_UNSPECIFIED = 0; GET_OR_CREATE_OPEN_SHARDS_FAILURE_REASON_INDEX_NOT_FOUND = 1; GET_OR_CREATE_OPEN_SHARDS_FAILURE_REASON_SOURCE_NOT_FOUND = 2; + GET_OR_CREATE_OPEN_SHARDS_FAILURE_REASON_NO_INGESTERS_AVAILABLE = 3; } message GetOrCreateOpenShardsFailure { diff --git a/quickwit/quickwit-proto/protos/quickwit/ingester.proto b/quickwit/quickwit-proto/protos/quickwit/ingester.proto index 336617fd7d0..21a407e3f5a 100644 --- a/quickwit/quickwit-proto/protos/quickwit/ingester.proto +++ b/quickwit/quickwit-proto/protos/quickwit/ingester.proto @@ -51,12 +51,8 @@ service IngesterService { // Closes a set of shards. This RPC is called by the control plane. rpc CloseShards(CloseShardsRequest) returns (CloseShardsResponse); - // Pings an ingester to check if it is ready to host shards and serve requests. - rpc Ping(PingRequest) returns (PingResponse); - // Decommissions the ingester. rpc Decommission(DecommissionRequest) returns (DecommissionResponse); - } message RetainShardsForSource { @@ -265,14 +261,6 @@ message CloseShardsRequest { message CloseShardsResponse { } -message PingRequest { - string leader_id = 1; - optional string follower_id = 2; -} - -message PingResponse { -} - message DecommissionRequest { } diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs index b6c4989e976..238470c1a9e 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.control_plane.rs @@ -114,6 +114,7 @@ pub enum GetOrCreateOpenShardsFailureReason { Unspecified = 0, IndexNotFound = 1, SourceNotFound = 2, + NoIngestersAvailable = 3, } impl GetOrCreateOpenShardsFailureReason { /// String value of the enum field names used in the ProtoBuf definition. @@ -131,6 +132,9 @@ impl GetOrCreateOpenShardsFailureReason { GetOrCreateOpenShardsFailureReason::SourceNotFound => { "GET_OR_CREATE_OPEN_SHARDS_FAILURE_REASON_SOURCE_NOT_FOUND" } + GetOrCreateOpenShardsFailureReason::NoIngestersAvailable => { + "GET_OR_CREATE_OPEN_SHARDS_FAILURE_REASON_NO_INGESTERS_AVAILABLE" + } } } /// Creates an enum from field names used in the ProtoBuf definition. @@ -145,6 +149,9 @@ impl GetOrCreateOpenShardsFailureReason { "GET_OR_CREATE_OPEN_SHARDS_FAILURE_REASON_SOURCE_NOT_FOUND" => { Some(Self::SourceNotFound) } + "GET_OR_CREATE_OPEN_SHARDS_FAILURE_REASON_NO_INGESTERS_AVAILABLE" => { + Some(Self::NoIngestersAvailable) + } _ => None, } } diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs index 6fe0a8c4fd0..61ed763dd0b 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs @@ -364,19 +364,6 @@ pub struct CloseShardsResponse {} #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct PingRequest { - #[prost(string, tag = "1")] - pub leader_id: ::prost::alloc::string::String, - #[prost(string, optional, tag = "2")] - pub follower_id: ::core::option::Option<::prost::alloc::string::String>, -} -#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct PingResponse {} -#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] pub struct DecommissionRequest {} #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] @@ -571,11 +558,6 @@ impl RpcName for CloseShardsRequest { "close_shards" } } -impl RpcName for PingRequest { - fn rpc_name() -> &'static str { - "ping" - } -} impl RpcName for DecommissionRequest { fn rpc_name() -> &'static str { "decommission" @@ -629,11 +611,6 @@ pub trait IngesterService: std::fmt::Debug + dyn_clone::DynClone + Send + Sync + &mut self, request: CloseShardsRequest, ) -> crate::ingest::IngestV2Result; - /// Pings an ingester to check if it is ready to host shards and serve requests. - async fn ping( - &mut self, - request: PingRequest, - ) -> crate::ingest::IngestV2Result; /// Decommissions the ingester. async fn decommission( &mut self, @@ -775,12 +752,6 @@ impl IngesterService for IngesterServiceClient { ) -> crate::ingest::IngestV2Result { self.inner.close_shards(request).await } - async fn ping( - &mut self, - request: PingRequest, - ) -> crate::ingest::IngestV2Result { - self.inner.ping(request).await - } async fn decommission( &mut self, request: DecommissionRequest, @@ -849,12 +820,6 @@ pub mod ingester_service_mock { ) -> crate::ingest::IngestV2Result { self.inner.lock().await.close_shards(request).await } - async fn ping( - &mut self, - request: super::PingRequest, - ) -> crate::ingest::IngestV2Result { - self.inner.lock().await.ping(request).await - } async fn decommission( &mut self, request: super::DecommissionRequest, @@ -1006,22 +971,6 @@ impl tower::Service for Box { Box::pin(fut) } } -impl tower::Service for Box { - type Response = PingResponse; - type Error = crate::ingest::IngestV2Error; - type Future = BoxFuture; - fn poll_ready( - &mut self, - _cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - std::task::Poll::Ready(Ok(())) - } - fn call(&mut self, request: PingRequest) -> Self::Future { - let mut svc = self.clone(); - let fut = async move { svc.ping(request).await }; - Box::pin(fut) - } -} impl tower::Service for Box { type Response = DecommissionResponse; type Error = crate::ingest::IngestV2Error; @@ -1082,11 +1031,6 @@ struct IngesterServiceTowerServiceStack { CloseShardsResponse, crate::ingest::IngestV2Error, >, - ping_svc: quickwit_common::tower::BoxService< - PingRequest, - PingResponse, - crate::ingest::IngestV2Error, - >, decommission_svc: quickwit_common::tower::BoxService< DecommissionRequest, DecommissionResponse, @@ -1105,7 +1049,6 @@ impl Clone for IngesterServiceTowerServiceStack { retain_shards_svc: self.retain_shards_svc.clone(), truncate_shards_svc: self.truncate_shards_svc.clone(), close_shards_svc: self.close_shards_svc.clone(), - ping_svc: self.ping_svc.clone(), decommission_svc: self.decommission_svc.clone(), } } @@ -1160,12 +1103,6 @@ impl IngesterService for IngesterServiceTowerServiceStack { ) -> crate::ingest::IngestV2Result { self.close_shards_svc.ready().await?.call(request).await } - async fn ping( - &mut self, - request: PingRequest, - ) -> crate::ingest::IngestV2Result { - self.ping_svc.ready().await?.call(request).await - } async fn decommission( &mut self, request: DecommissionRequest, @@ -1253,16 +1190,6 @@ type CloseShardsLayer = quickwit_common::tower::BoxLayer< CloseShardsResponse, crate::ingest::IngestV2Error, >; -type PingLayer = quickwit_common::tower::BoxLayer< - quickwit_common::tower::BoxService< - PingRequest, - PingResponse, - crate::ingest::IngestV2Error, - >, - PingRequest, - PingResponse, - crate::ingest::IngestV2Error, ->; type DecommissionLayer = quickwit_common::tower::BoxLayer< quickwit_common::tower::BoxService< DecommissionRequest, @@ -1283,7 +1210,6 @@ pub struct IngesterServiceTowerLayerStack { retain_shards_layers: Vec, truncate_shards_layers: Vec, close_shards_layers: Vec, - ping_layers: Vec, decommission_layers: Vec, } impl IngesterServiceTowerLayerStack { @@ -1493,31 +1419,6 @@ impl IngesterServiceTowerLayerStack { crate::ingest::IngestV2Error, >, >>::Service as tower::Service>::Future: Send + 'static, - L: tower::Layer< - quickwit_common::tower::BoxService< - PingRequest, - PingResponse, - crate::ingest::IngestV2Error, - >, - > + Clone + Send + Sync + 'static, - , - >>::Service: tower::Service< - PingRequest, - Response = PingResponse, - Error = crate::ingest::IngestV2Error, - > + Clone + Send + Sync + 'static, - <, - >>::Service as tower::Service>::Future: Send + 'static, L: tower::Layer< quickwit_common::tower::BoxService< DecommissionRequest, @@ -1559,7 +1460,6 @@ impl IngesterServiceTowerLayerStack { .push(quickwit_common::tower::BoxLayer::new(layer.clone())); self.close_shards_layers .push(quickwit_common::tower::BoxLayer::new(layer.clone())); - self.ping_layers.push(quickwit_common::tower::BoxLayer::new(layer.clone())); self.decommission_layers .push(quickwit_common::tower::BoxLayer::new(layer.clone())); self @@ -1722,25 +1622,6 @@ impl IngesterServiceTowerLayerStack { self.close_shards_layers.push(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn stack_ping_layer(mut self, layer: L) -> Self - where - L: tower::Layer< - quickwit_common::tower::BoxService< - PingRequest, - PingResponse, - crate::ingest::IngestV2Error, - >, - > + Send + Sync + 'static, - L::Service: tower::Service< - PingRequest, - Response = PingResponse, - Error = crate::ingest::IngestV2Error, - > + Clone + Send + Sync + 'static, - >::Future: Send + 'static, - { - self.ping_layers.push(quickwit_common::tower::BoxLayer::new(layer)); - self - } pub fn stack_decommission_layer(mut self, layer: L) -> Self where L: tower::Layer< @@ -1870,14 +1751,6 @@ impl IngesterServiceTowerLayerStack { quickwit_common::tower::BoxService::new(boxed_instance.clone()), |svc, layer| layer.layer(svc), ); - let ping_svc = self - .ping_layers - .into_iter() - .rev() - .fold( - quickwit_common::tower::BoxService::new(boxed_instance.clone()), - |svc, layer| layer.layer(svc), - ); let decommission_svc = self .decommission_layers .into_iter() @@ -1896,7 +1769,6 @@ impl IngesterServiceTowerLayerStack { retain_shards_svc, truncate_shards_svc, close_shards_svc, - ping_svc, decommission_svc, }; IngesterServiceClient::new(tower_svc_stack) @@ -2031,12 +1903,6 @@ where Error = crate::ingest::IngestV2Error, Future = BoxFuture, > - + tower::Service< - PingRequest, - Response = PingResponse, - Error = crate::ingest::IngestV2Error, - Future = BoxFuture, - > + tower::Service< DecommissionRequest, Response = DecommissionResponse, @@ -2092,12 +1958,6 @@ where ) -> crate::ingest::IngestV2Result { self.call(request).await } - async fn ping( - &mut self, - request: PingRequest, - ) -> crate::ingest::IngestV2Result { - self.call(request).await - } async fn decommission( &mut self, request: DecommissionRequest, @@ -2231,16 +2091,6 @@ where .map(|response| response.into_inner()) .map_err(crate::error::grpc_status_to_service_error) } - async fn ping( - &mut self, - request: PingRequest, - ) -> crate::ingest::IngestV2Result { - self.inner - .ping(request) - .await - .map(|response| response.into_inner()) - .map_err(crate::error::grpc_status_to_service_error) - } async fn decommission( &mut self, request: DecommissionRequest, @@ -2373,17 +2223,6 @@ for IngesterServiceGrpcServerAdapter { .map(tonic::Response::new) .map_err(crate::error::grpc_error_to_grpc_status) } - async fn ping( - &self, - request: tonic::Request, - ) -> Result, tonic::Status> { - self.inner - .clone() - .ping(request.into_inner()) - .await - .map(tonic::Response::new) - .map_err(crate::error::grpc_error_to_grpc_status) - } async fn decommission( &self, request: tonic::Request, @@ -2733,31 +2572,6 @@ pub mod ingester_service_grpc_client { ); self.inner.unary(req, path, codec).await } - /// Pings an ingester to check if it is ready to host shards and serve requests. - pub async fn ping( - &mut self, - request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/quickwit.ingest.ingester.IngesterService/Ping", - ); - let mut req = request.into_request(); - req.extensions_mut() - .insert( - GrpcMethod::new("quickwit.ingest.ingester.IngesterService", "Ping"), - ); - self.inner.unary(req, path, codec).await - } /// Decommissions the ingester. pub async fn decommission( &mut self, @@ -2879,11 +2693,6 @@ pub mod ingester_service_grpc_server { tonic::Response, tonic::Status, >; - /// Pings an ingester to check if it is ready to host shards and serve requests. - async fn ping( - &self, - request: tonic::Request, - ) -> std::result::Result, tonic::Status>; /// Decommissions the ingester. async fn decommission( &self, @@ -3343,49 +3152,6 @@ pub mod ingester_service_grpc_server { }; Box::pin(fut) } - "/quickwit.ingest.ingester.IngesterService/Ping" => { - #[allow(non_camel_case_types)] - struct PingSvc(pub Arc); - impl< - T: IngesterServiceGrpc, - > tonic::server::UnaryService for PingSvc { - type Response = super::PingResponse; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = Arc::clone(&self.0); - let fut = async move { (*inner).ping(request).await }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let max_decoding_message_size = self.max_decoding_message_size; - let max_encoding_message_size = self.max_encoding_message_size; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = PingSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ) - .apply_max_message_size_config( - max_decoding_message_size, - max_encoding_message_size, - ); - let res = grpc.unary(method, req).await; - Ok(res) - }; - Box::pin(fut) - } "/quickwit.ingest.ingester.IngesterService/Decommission" => { #[allow(non_camel_case_types)] struct DecommissionSvc(pub Arc); diff --git a/quickwit/quickwit-proto/src/ingest/mod.rs b/quickwit/quickwit-proto/src/ingest/mod.rs index bba1e82500e..398a687b6d6 100644 --- a/quickwit/quickwit-proto/src/ingest/mod.rs +++ b/quickwit/quickwit-proto/src/ingest/mod.rs @@ -36,9 +36,6 @@ pub type IngestV2Result = std::result::Result; #[derive(Debug, thiserror::Error, Eq, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum IngestV2Error { - // TODO: Get rid of this variant. - #[error("failed to connect to ingester `{ingester_id}`")] - IngesterUnavailable { ingester_id: NodeId }, #[error("internal error: {0}")] Internal(String), #[error("shard `{shard_id}` not found")] @@ -54,7 +51,6 @@ pub enum IngestV2Error { impl ServiceError for IngestV2Error { fn error_code(&self) -> ServiceErrorCode { match self { - Self::IngesterUnavailable { .. } => ServiceErrorCode::Unavailable, Self::Internal(_) => ServiceErrorCode::Internal, Self::ShardNotFound { .. } => ServiceErrorCode::NotFound, Self::Timeout(_) => ServiceErrorCode::Timeout,