diff --git a/quickwit/quickwit-actors/src/lib.rs b/quickwit/quickwit-actors/src/lib.rs index 886c775b935..b97a4e037f7 100644 --- a/quickwit/quickwit-actors/src/lib.rs +++ b/quickwit/quickwit-actors/src/lib.rs @@ -66,7 +66,7 @@ pub use universe::Universe; pub use self::actor_context::ActorContext; pub use self::actor_state::ActorState; pub use self::channel_with_priority::{QueueCapacity, RecvError, SendError, TrySendError}; -pub use self::mailbox::{Inbox, Mailbox}; +pub use self::mailbox::{Inbox, Mailbox, WeakMailbox}; pub use self::registry::ActorObservation; pub use self::supervisor::{Supervisor, SupervisorMetrics, SupervisorState}; diff --git a/quickwit/quickwit-actors/src/mailbox.rs b/quickwit/quickwit-actors/src/mailbox.rs index efe404eef02..a6944b5c27c 100644 --- a/quickwit/quickwit-actors/src/mailbox.rs +++ b/quickwit/quickwit-actors/src/mailbox.rs @@ -438,6 +438,15 @@ pub struct WeakMailbox { ref_count: Weak, } +impl Clone for WeakMailbox { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + ref_count: self.ref_count.clone(), + } + } +} + impl WeakMailbox { pub fn upgrade(&self) -> Option> { let inner = self.inner.upgrade()?; diff --git a/quickwit/quickwit-common/src/lib.rs b/quickwit/quickwit-common/src/lib.rs index 3079ef25696..eac5881219b 100644 --- a/quickwit/quickwit-common/src/lib.rs +++ b/quickwit/quickwit-common/src/lib.rs @@ -31,12 +31,12 @@ mod path_hasher; mod progress; pub mod pubsub; pub mod rand; +pub mod rate_limiter; pub mod rendezvous_hasher; pub mod retry; pub mod runtimes; pub mod shared_consts; pub mod sorted_iter; - pub mod stream_utils; pub mod temp_dir; #[cfg(any(test, feature = "testsuite"))] diff --git a/quickwit/quickwit-common/src/pubsub.rs b/quickwit/quickwit-common/src/pubsub.rs index 27cab61ebb2..c22788810b0 100644 --- a/quickwit/quickwit-common/src/pubsub.rs +++ b/quickwit/quickwit-common/src/pubsub.rs @@ -25,6 +25,7 @@ use std::time::Duration; use async_trait::async_trait; use tokio::sync::Mutex as TokioMutex; +use tracing::warn; use crate::type_map::TypeMap; @@ -123,10 +124,17 @@ impl EventBroker { for subscription in typed_subscriptions.values() { let event = event.clone(); let subscriber_clone = subscription.subscriber.clone(); - tokio::spawn(tokio::time::timeout(Duration::from_secs(600), async move { - let mut subscriber_lock = subscriber_clone.lock().await; - subscriber_lock.handle_event(event).await; - })); + let handle_event_fut = async move { + if tokio::time::timeout(Duration::from_secs(1), async { + subscriber_clone.lock().await.handle_event(event).await + }) + .await + .is_err() + { + warn!("`{}` event handler timed out", std::any::type_name::()); + } + }; + tokio::spawn(handle_event_fut); } } } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/rate_limiter.rs b/quickwit/quickwit-common/src/rate_limiter.rs similarity index 57% rename from quickwit/quickwit-ingest/src/ingest_v2/rate_limiter.rs rename to quickwit/quickwit-common/src/rate_limiter.rs index fc6a7c2abce..ddcb3c1baae 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/rate_limiter.rs +++ b/quickwit/quickwit-common/src/rate_limiter.rs @@ -20,7 +20,8 @@ use std::time::{Duration, Instant}; use bytesize::ByteSize; -use quickwit_common::tower::{ConstantRate, Rate}; + +use crate::tower::{ConstantRate, Rate}; #[derive(Debug, Clone, Copy)] pub struct RateLimiterSettings { @@ -28,7 +29,7 @@ pub struct RateLimiterSettings { // up to what we call a `burst_limit`. // // Until these credits are expired, the rate limiter may exceed temporarily its rate limit. - pub burst_limit: ByteSize, + pub burst_limit: u64, pub rate_limit: ConstantRate, // The refill period has an effect on the resolution at which the // rate limiting is enforced. @@ -40,7 +41,7 @@ pub struct RateLimiterSettings { impl Default for RateLimiterSettings { fn default() -> Self { // 10 MB burst limit. - let burst_limit = ByteSize::mb(10); + let burst_limit = ByteSize::mb(10).as_u64(); // 5 MB/s rate limit. let rate_limit = ConstantRate::bytes_per_sec(ByteSize::mb(5)); // Refill every 100ms. @@ -56,9 +57,11 @@ impl Default for RateLimiterSettings { /// A bursty token-based rate limiter. #[derive(Debug, Clone)] -pub(super) struct RateLimiter { - capacity: u64, - available: u64, +pub struct RateLimiter { + // Maximum number of permits that can be accumulated. + max_capacity: u64, + // Number of permits available. + available_permits: u64, refill_amount: u64, refill_period: Duration, refill_period_micros: u64, @@ -68,15 +71,14 @@ pub(super) struct RateLimiter { impl RateLimiter { /// Creates a new rate limiter from the given settings. pub fn from_settings(settings: RateLimiterSettings) -> Self { - let capacity = settings.burst_limit.as_u64(); - + let max_capacity = settings.burst_limit; let refill_period = settings.refill_period; let rate_limit = settings.rate_limit.rescale(refill_period); let now = Instant::now(); Self { - capacity, - available: capacity, + max_capacity, + available_permits: max_capacity, refill_amount: rate_limit.work(), refill_period, refill_period_micros: refill_period.as_micros() as u64, @@ -84,19 +86,33 @@ impl RateLimiter { } } - /// Acquires some capacity from the rate limiter. Returns whether the capacity was available. - pub fn acquire(&mut self, capacity: ByteSize) -> bool { - if self.acquire_inner(capacity.as_u64()) { + /// Returns the number of permits available. + pub fn available_permits(&self) -> u64 { + self.available_permits + } + + /// Acquires some permits from the rate limiter. Returns whether the permits were acquired. + pub fn acquire(&mut self, num_permits: u64) -> bool { + if self.acquire_inner(num_permits) { true } else { self.refill(Instant::now()); - self.acquire_inner(capacity.as_u64()) + self.acquire_inner(num_permits) } } - fn acquire_inner(&mut self, capacity: u64) -> bool { - if self.available >= capacity { - self.available -= capacity; + pub fn acquire_bytes(&mut self, bytes: ByteSize) -> bool { + self.acquire(bytes.as_u64()) + } + + /// Gives back some unused permits to the rate limiter. + pub fn release(&mut self, num_permits: u64) { + self.available_permits = self.max_capacity.min(self.available_permits + num_permits); + } + + fn acquire_inner(&mut self, num_permits: u64) -> bool { + if self.available_permits >= num_permits { + self.available_permits -= num_permits; true } else { false @@ -110,7 +126,7 @@ impl RateLimiter { let elapsed = (now - self.refill_at).as_micros() as u64; // More than one refill period may have elapsed so we need to take that into account. let refill = self.refill_amount + self.refill_amount * elapsed / self.refill_period_micros; - self.available = std::cmp::min(self.available + refill, self.capacity); + self.available_permits = self.max_capacity.min(self.available_permits + refill); self.refill_at = now + self.refill_period; } } @@ -120,61 +136,79 @@ mod tests { use super::*; #[test] - fn test_rate_limiter() { + fn test_rate_limiter_acquire() { let settings = RateLimiterSettings { - burst_limit: ByteSize::mb(2), + burst_limit: ByteSize::mb(2).as_u64(), rate_limit: ConstantRate::bytes_per_sec(ByteSize::mb(1)), refill_period: Duration::from_millis(100), }; let mut rate_limiter = RateLimiter::from_settings(settings); - assert_eq!(rate_limiter.capacity, ByteSize::mb(2).as_u64()); - assert_eq!(rate_limiter.available, ByteSize::mb(2).as_u64()); + assert_eq!(rate_limiter.max_capacity, ByteSize::mb(2).as_u64()); + assert_eq!(rate_limiter.available_permits, ByteSize::mb(2).as_u64()); assert_eq!(rate_limiter.refill_amount, ByteSize::kb(100).as_u64()); assert_eq!(rate_limiter.refill_period, Duration::from_millis(100)); - assert!(rate_limiter.acquire(ByteSize::mb(1))); - assert!(rate_limiter.acquire(ByteSize::mb(1))); - assert!(!rate_limiter.acquire(ByteSize::kb(1))); + assert!(rate_limiter.acquire_bytes(ByteSize::mb(1))); + assert!(rate_limiter.acquire_bytes(ByteSize::mb(1))); + assert!(!rate_limiter.acquire_bytes(ByteSize::kb(1))); std::thread::sleep(Duration::from_millis(100)); - assert!(rate_limiter.acquire(ByteSize::kb(100))); - assert!(!rate_limiter.acquire(ByteSize::kb(20))); + assert!(rate_limiter.acquire_bytes(ByteSize::kb(100))); + assert!(!rate_limiter.acquire_bytes(ByteSize::kb(20))); std::thread::sleep(Duration::from_millis(250)); - assert!(rate_limiter.acquire(ByteSize::kb(125))); - assert!(rate_limiter.acquire(ByteSize::kb(125))); - assert!(!rate_limiter.acquire(ByteSize::kb(20))); + assert!(rate_limiter.acquire_bytes(ByteSize::kb(125))); + assert!(rate_limiter.acquire_bytes(ByteSize::kb(125))); + assert!(!rate_limiter.acquire_bytes(ByteSize::kb(20))); + } + + #[test] + fn test_rate_limiter_release() { + let settings = RateLimiterSettings { + burst_limit: 1, + rate_limit: ConstantRate::bytes_per_sec(ByteSize::mb(1)), + refill_period: Duration::from_millis(100), + }; + let mut rate_limiter = RateLimiter::from_settings(settings); + rate_limiter.acquire(1); + assert_eq!(rate_limiter.available_permits, 0); + + rate_limiter.release(1); + assert_eq!(rate_limiter.available_permits, 1); + + rate_limiter.release(1); + assert_eq!(rate_limiter.available_permits, 1); } #[test] fn test_rate_limiter_refill() { let settings = RateLimiterSettings { - burst_limit: ByteSize::mb(2), + burst_limit: ByteSize::mb(2).as_u64(), rate_limit: ConstantRate::bytes_per_sec(ByteSize::mb(1)), refill_period: Duration::from_millis(100), }; let mut rate_limiter = RateLimiter::from_settings(settings); - rate_limiter.available = 0; + rate_limiter.available_permits = 0; let now = Instant::now(); rate_limiter.refill(now); - assert_eq!(rate_limiter.available, 0); + assert_eq!(rate_limiter.available_permits, 0); - rate_limiter.available = 0; + rate_limiter.available_permits = 0; let now = now + Duration::from_millis(100); rate_limiter.refill(now); - assert_eq!(rate_limiter.available, ByteSize::kb(100).as_u64()); + assert_eq!(rate_limiter.available_permits, ByteSize::kb(100).as_u64()); - rate_limiter.available = 0; + rate_limiter.available_permits = 0; let now = now + Duration::from_millis(110); rate_limiter.refill(now); - assert_eq!(rate_limiter.available, ByteSize::kb(110).as_u64()); + assert_eq!(rate_limiter.available_permits, ByteSize::kb(110).as_u64()); - rate_limiter.available = 0; + rate_limiter.available_permits = 0; let now = now + Duration::from_millis(210); rate_limiter.refill(now); - assert_eq!(rate_limiter.available, ByteSize::kb(210).as_u64()); + assert_eq!(rate_limiter.available_permits, ByteSize::kb(210).as_u64()); } } diff --git a/quickwit/quickwit-common/src/tower/rate.rs b/quickwit/quickwit-common/src/tower/rate.rs index 661359c4179..ab8da1d0ebe 100644 --- a/quickwit/quickwit-common/src/tower/rate.rs +++ b/quickwit/quickwit-common/src/tower/rate.rs @@ -47,18 +47,18 @@ impl ConstantRate { /// # Panics /// /// This function panics if `period` is 0. - pub fn new(work: u64, period: Duration) -> Self { + pub const fn new(work: u64, period: Duration) -> Self { assert!(!period.is_zero()); Self { work, period } } - pub fn bytes_per_period(bytes: ByteSize, period: Duration) -> Self { + pub const fn bytes_per_period(bytes: ByteSize, period: Duration) -> Self { let work = bytes.as_u64(); Self::new(work, period) } - pub fn bytes_per_sec(bytes: ByteSize) -> Self { + pub const fn bytes_per_sec(bytes: ByteSize) -> Self { Self::bytes_per_period(bytes, Duration::from_secs(1)) } diff --git a/quickwit/quickwit-control-plane/Cargo.toml b/quickwit/quickwit-control-plane/Cargo.toml index 7ed4b15a785..263205fd2af 100644 --- a/quickwit/quickwit-control-plane/Cargo.toml +++ b/quickwit/quickwit-control-plane/Cargo.toml @@ -12,8 +12,8 @@ documentation = "https://quickwit.io/docs/" [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } -fnv = { workspace = true } dyn-clone = { workspace = true } +fnv = { workspace = true } http = { workspace = true } hyper = { workspace = true } itertools = { workspace = true } diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index ef81c644585..3cc934edc46 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -24,9 +24,11 @@ use async_trait::async_trait; use fnv::FnvHashSet; use quickwit_actors::{ Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, Mailbox, Supervisor, Universe, + WeakMailbox, }; +use quickwit_common::pubsub::EventSubscriber; use quickwit_config::SourceConfig; -use quickwit_ingest::IngesterPool; +use quickwit_ingest::{IngesterPool, LocalShardsUpdate}; use quickwit_metastore::IndexMetadata; use quickwit_proto::control_plane::{ ControlPlaneError, ControlPlaneResult, GetOrCreateOpenShardsRequest, @@ -43,9 +45,9 @@ use quickwit_proto::types::{IndexUid, NodeId, Position, ShardId, SourceUid}; use serde::Serialize; use tracing::error; -use crate::control_plane_model::{ControlPlaneModel, ControlPlaneModelMetrics}; use crate::indexing_scheduler::{IndexingScheduler, IndexingSchedulerState}; use crate::ingest::IngestController; +use crate::model::{ControlPlaneModel, ControlPlaneModelMetrics}; use crate::IndexerPool; /// Interval between two controls (or checks) of the desired plan VS running plan. @@ -166,8 +168,7 @@ impl ControlPlane { .delete_shards(delete_shards_request) .await .context("failed to delete shards in metastore")?; - self.model - .delete_shards(&source_uid.index_uid, &source_uid.source_id, shards); + self.model.delete_shards(source_uid, shards); self.indexing_scheduler .schedule_indexing_plan_if_needed(&self.model); Ok(()) @@ -183,10 +184,12 @@ impl Handler for ControlPlane { shard_positions_update: ShardPositionsUpdate, _ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { - let known_shard_ids: FnvHashSet = self - .model - .list_shards(&shard_positions_update.source_uid) - .into_iter() + let Some(shard_entries) = self.model.list_shards(&shard_positions_update.source_uid) else { + // The source no longer exists. + return Ok(()); + }; + let known_shard_ids: FnvHashSet = shard_entries + .map(|shard_entry| shard_entry.shard_id) .collect(); // let's identify the shard that have reached EOF but have not yet been removed. let shard_ids_to_close: Vec = shard_positions_update @@ -200,7 +203,7 @@ impl Handler for ControlPlane { if shard_ids_to_close.is_empty() { return Ok(()); } - self.delete_shards(&shard_positions_update.source_uid, &shard_ids_to_close[..]) + self.delete_shards(&shard_positions_update.source_uid, &shard_ids_to_close) .await?; Ok(()) } @@ -465,6 +468,61 @@ impl Handler for ControlPlane { } } +#[async_trait] +impl Handler for ControlPlane { + type Reply = ControlPlaneResult<()>; + + async fn handle( + &mut self, + local_shards_update: LocalShardsUpdate, + ctx: &ActorContext, + ) -> Result { + self.ingest_controller + .handle_local_shards_update(local_shards_update, &mut self.model, ctx.progress()) + .await; + self.indexing_scheduler + .schedule_indexing_plan_if_needed(&self.model); + Ok(Ok(())) + } +} + +#[derive(Clone)] +pub struct ControlPlaneEventSubscriber(WeakMailbox); + +impl ControlPlaneEventSubscriber { + pub fn new(weak_control_plane_mailbox: WeakMailbox) -> Self { + Self(weak_control_plane_mailbox) + } +} + +#[async_trait] +impl EventSubscriber for ControlPlaneEventSubscriber { + async fn handle_event(&mut self, local_shards_update: LocalShardsUpdate) { + if let Some(control_plane_mailbox) = self.0.upgrade() { + if let Err(error) = control_plane_mailbox + .send_message(local_shards_update) + .await + { + error!(error=%error, "failed to forward local shards update to control plane"); + } + } + } +} + +#[async_trait] +impl EventSubscriber for ControlPlaneEventSubscriber { + async fn handle_event(&mut self, shard_positions_update: ShardPositionsUpdate) { + if let Some(control_plane_mailbox) = self.0.upgrade() { + if let Err(error) = control_plane_mailbox + .send_message(shard_positions_update) + .await + { + error!(error=%error, "failed to forward shard positions update to control plane"); + } + } + } +} + #[cfg(test)] mod tests { use quickwit_actors::{AskError, Observe, SupervisorMetrics}; diff --git a/quickwit/quickwit-control-plane/src/control_plane_model.rs b/quickwit/quickwit-control-plane/src/control_plane_model.rs deleted file mode 100644 index aea7ac4bac7..00000000000 --- a/quickwit/quickwit-control-plane/src/control_plane_model.rs +++ /dev/null @@ -1,945 +0,0 @@ -// Copyright (C) 2023 Quickwit, Inc. -// -// Quickwit is offered under the AGPL v3.0 and as commercial software. -// For commercial licensing, contact us at hello@quickwit.io. -// -// AGPL: -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -use std::collections::hash_map::Entry; -use std::time::Instant; - -use anyhow::bail; -use fnv::{FnvHashMap, FnvHashSet}; -#[cfg(test)] -use itertools::Itertools; -use quickwit_common::Progress; -use quickwit_config::SourceConfig; -use quickwit_metastore::{IndexMetadata, ListIndexesMetadataResponseExt}; -use quickwit_proto::control_plane::ControlPlaneResult; -use quickwit_proto::ingest::{Shard, ShardState}; -use quickwit_proto::metastore::{ - self, EntityKind, ListIndexesMetadataRequest, ListShardsSubrequest, MetastoreError, - MetastoreService, MetastoreServiceClient, SourceType, -}; -use quickwit_proto::types::{IndexId, IndexUid, NodeId, NodeIdRef, ShardId, SourceId, SourceUid}; -use serde::Serialize; -use tracing::{error, info, warn}; - -type NextShardId = ShardId; -#[derive(Debug, Eq, PartialEq)] -struct ShardTableEntry { - shards: FnvHashMap, - next_shard_id: NextShardId, -} - -impl Default for ShardTableEntry { - fn default() -> Self { - Self { - shards: Default::default(), - next_shard_id: Self::DEFAULT_NEXT_SHARD_ID, - } - } -} - -impl ShardTableEntry { - const DEFAULT_NEXT_SHARD_ID: NextShardId = 1; // `1` matches the PostgreSQL sequence min value. - - fn is_empty(&self) -> bool { - self.shards.is_empty() - } - - fn is_default(&self) -> bool { - self.is_empty() && self.next_shard_id == Self::DEFAULT_NEXT_SHARD_ID - } - - #[cfg(test)] - fn shards(&self) -> Vec { - self.shards - .values() - .cloned() - .sorted_unstable_by_key(|shard| shard.shard_id) - .collect() - } -} - -/// The control plane maintains a model in sync with the metastore. -/// -/// The model stays consistent with the metastore, because all -/// of the mutations go through the control plane. -/// -/// If a mutation yields an error, the control plane is killed -/// and restarted. -/// -/// Upon starts, it loads its entire state from the metastore. -#[derive(Default, Debug)] -pub(crate) struct ControlPlaneModel { - index_uid_table: FnvHashMap, - index_table: FnvHashMap, - shard_table: ShardTable, -} - -#[derive(Clone, Copy, Debug, Default, Serialize)] -pub struct ControlPlaneModelMetrics { - pub num_shards: usize, -} - -impl ControlPlaneModel { - /// Clears the entire state of the model. - pub fn clear(&mut self) { - *self = Default::default(); - } - - pub fn observable_state(&self) -> ControlPlaneModelMetrics { - ControlPlaneModelMetrics { - num_shards: self.shard_table.table_entries.len(), - } - } - - pub async fn load_from_metastore( - &mut self, - metastore: &mut MetastoreServiceClient, - progress: &Progress, - ) -> ControlPlaneResult<()> { - let now = Instant::now(); - self.clear(); - - let index_metadatas = progress - .protect_future(metastore.list_indexes_metadata(ListIndexesMetadataRequest::all())) - .await? - .deserialize_indexes_metadata()?; - - let num_indexes = index_metadatas.len(); - self.index_table.reserve(num_indexes); - - let mut num_sources = 0; - let mut num_shards = 0; - - let mut subrequests = Vec::with_capacity(index_metadatas.len()); - - for index_metadata in index_metadatas { - self.add_index(index_metadata); - } - - for index_metadata in self.index_table.values() { - for source_config in index_metadata.sources.values() { - num_sources += 1; - - if source_config.source_type() != SourceType::IngestV2 || !source_config.enabled { - continue; - } - let request = ListShardsSubrequest { - index_uid: index_metadata.index_uid.clone().into(), - source_id: source_config.source_id.clone(), - shard_state: Some(ShardState::Open as i32), - }; - subrequests.push(request); - } - } - if !subrequests.is_empty() { - let list_shards_request = metastore::ListShardsRequest { subrequests }; - let list_shard_response = progress - .protect_future(metastore.list_shards(list_shards_request)) - .await?; - - self.shard_table - .table_entries - .reserve(list_shard_response.subresponses.len()); - - for list_shards_subresponse in list_shard_response.subresponses { - num_shards += list_shards_subresponse.shards.len(); - - let source_uid = SourceUid { - index_uid: list_shards_subresponse.index_uid.into(), - source_id: list_shards_subresponse.source_id, - }; - let shards: FnvHashMap = list_shards_subresponse - .shards - .into_iter() - .map(|shard| (shard.shard_id, shard)) - .collect(); - let table_entry = ShardTableEntry { - shards, - next_shard_id: list_shards_subresponse.next_shard_id, - }; - self.shard_table - .table_entries - .insert(source_uid, table_entry); - } - } - info!( - "synced internal state with metastore in {} seconds ({} indexes, {} sources, {} \ - shards)", - now.elapsed().as_secs(), - num_indexes, - num_sources, - num_shards, - ); - Ok(()) - } - - pub fn list_shards(&self, source_uid: &SourceUid) -> Vec { - self.shard_table.list_shards(source_uid) - } - - pub(crate) fn get_source_configs( - &self, - ) -> impl Iterator + '_ { - self.index_table.values().flat_map(|index_metadata| { - index_metadata - .sources - .iter() - .map(move |(source_id, source_config)| { - ( - SourceUid { - index_uid: index_metadata.index_uid.clone(), - source_id: source_id.clone(), - }, - source_config, - ) - }) - }) - } - - pub(crate) fn add_index(&mut self, index_metadata: IndexMetadata) { - let index_uid = index_metadata.index_uid.clone(); - self.index_uid_table - .insert(index_metadata.index_id().to_string(), index_uid.clone()); - self.index_table.insert(index_uid, index_metadata); - } - - pub(crate) fn delete_index(&mut self, index_uid: &IndexUid) { - // TODO: We need to let the routers and ingesters know. - self.index_table.remove(index_uid); - self.shard_table.delete_index(index_uid.index_id()); - } - - /// Adds a source to a given index. Returns an error if a source with the same source_id already - /// exists. - pub(crate) fn add_source( - &mut self, - index_uid: &IndexUid, - source_config: SourceConfig, - ) -> ControlPlaneResult<()> { - self.shard_table - .add_source(index_uid, &source_config.source_id); - let index_metadata = self.index_table.get_mut(index_uid).ok_or_else(|| { - MetastoreError::NotFound(EntityKind::Index { - index_id: index_uid.to_string(), - }) - })?; - index_metadata.add_source(source_config)?; - Ok(()) - } - - pub(crate) fn delete_source(&mut self, index_uid: &IndexUid, source_id: &SourceId) { - // Removing shards from shard table. - self.shard_table.delete_source(index_uid, source_id); - // Remove source from index config. - let Some(index_model) = self.index_table.get_mut(index_uid) else { - warn!(index_uid=%index_uid, source_id=%source_id, "delete source: index not found"); - return; - }; - if index_model.sources.remove(source_id).is_none() { - warn!(index_uid=%index_uid, source_id=%source_id, "delete source: source not found"); - }; - } - - /// Returns `true` if the source status has changed, `false` otherwise. - /// Returns an error if the source could not be found. - pub(crate) fn toggle_source( - &mut self, - index_uid: &IndexUid, - source_id: &SourceId, - enable: bool, - ) -> anyhow::Result { - let Some(index_model) = self.index_table.get_mut(index_uid) else { - bail!("index `{index_uid}` not found"); - }; - let Some(source_config) = index_model.sources.get_mut(source_id) else { - bail!("source `{source_id}` not found."); - }; - let has_changed = source_config.enabled != enable; - source_config.enabled = enable; - Ok(has_changed) - } - - /// Removes the shards identified by their index UID, source ID, and shard IDs. - pub fn delete_shards( - &mut self, - index_uid: &IndexUid, - source_id: &SourceId, - shard_ids: &[ShardId], - ) { - self.shard_table - .delete_shards(index_uid, source_id, shard_ids); - } - - #[cfg(test)] - pub fn shards(&mut self) -> impl Iterator + '_ { - self.shard_table - .table_entries - .values() - .flat_map(|table_entry| table_entry.shards.values()) - } - - pub fn shards_mut(&mut self) -> impl Iterator + '_ { - self.shard_table - .table_entries - .values_mut() - .flat_map(|table_entry| table_entry.shards.values_mut()) - } - - /// Sets the state of the shards identified by their index UID, source ID, and shard IDs to - /// `Closed`. - pub fn close_shards( - &mut self, - index_uid: &IndexUid, - source_id: &SourceId, - shard_ids: &[ShardId], - ) -> Vec { - self.shard_table - .close_shards(index_uid, source_id, shard_ids) - } - - pub fn index_uid(&self, index_id: &str) -> Option { - self.index_uid_table.get(index_id).cloned() - } - - /// Inserts the shards that have just been opened by calling `open_shards` on the metastore. - pub fn insert_newly_opened_shards( - &mut self, - index_uid: &IndexUid, - source_id: &SourceId, - shards: Vec, - next_shard_id: NextShardId, - ) { - self.shard_table - .insert_newly_opened_shards(index_uid, source_id, shards, next_shard_id); - } - - /// Finds open shards for a given index and source and whose leaders are not in the set of - /// unavailable ingesters. - pub fn find_open_shards( - &self, - index_uid: &IndexUid, - source_id: &SourceId, - unavailable_leaders: &FnvHashSet, - ) -> Option<(Vec, NextShardId)> { - self.shard_table - .find_open_shards(index_uid, source_id, unavailable_leaders) - } -} - -// A table that keeps track of the existing shards for each index and source. -#[derive(Debug, Default)] -struct ShardTable { - table_entries: FnvHashMap, -} - -impl ShardTable { - /// Adds a new empty entry for the given index and source. - /// - /// TODO check and document the behavior on error (if the source was already here). - fn add_source(&mut self, index_uid: &IndexUid, source_id: &SourceId) { - let source_uid = SourceUid { - index_uid: index_uid.clone(), - source_id: source_id.clone(), - }; - let table_entry = ShardTableEntry::default(); - let previous_table_entry_opt = self.table_entries.insert(source_uid, table_entry); - if let Some(previous_table_entry) = previous_table_entry_opt { - if !previous_table_entry.is_default() { - error!( - "shard table entry for index `{}` and source `{}` already exists", - index_uid.index_id(), - source_id - ); - } - } - } - - fn delete_source(&mut self, index_uid: &IndexUid, source_id: &SourceId) { - let source_uid = SourceUid { - index_uid: index_uid.clone(), - source_id: source_id.clone(), - }; - self.table_entries.remove(&source_uid); - } - - /// Removes all the entries that match the target index ID. - fn delete_index(&mut self, index_id: &str) { - self.table_entries - .retain(|source_uid, _| source_uid.index_uid.index_id() != index_id); - } - - fn list_shards(&self, source_uid: &SourceUid) -> Vec { - let Some(shard_table_entry) = self.table_entries.get(source_uid) else { - return Vec::new(); - }; - shard_table_entry - .shards - .values() - .map(|shard| shard.shard_id) - .collect() - } - - /// Finds open shards for a given index and source and whose leaders are not in the set of - /// unavailable ingesters. - fn find_open_shards( - &self, - index_uid: &IndexUid, - source_id: &SourceId, - unavailable_leaders: &FnvHashSet, - ) -> Option<(Vec, NextShardId)> { - let source_uid = SourceUid { - index_uid: index_uid.clone(), - source_id: source_id.clone(), - }; - let table_entry = self.table_entries.get(&source_uid)?; - let open_shards: Vec = table_entry - .shards - .values() - .filter(|shard| { - shard.is_open() - && !unavailable_leaders.contains(NodeIdRef::from_str(&shard.leader_id)) - }) - .cloned() - .collect(); - - #[cfg(test)] - let open_shards = open_shards - .into_iter() - .sorted_by_key(|shard| shard.shard_id) - .collect(); - - Some((open_shards, table_entry.next_shard_id)) - } - - /// Updates the shard table. - pub fn insert_newly_opened_shards( - &mut self, - index_uid: &IndexUid, - source_id: &SourceId, - opened_shards: Vec, - next_shard_id: NextShardId, - ) { - let source_uid = SourceUid { - index_uid: index_uid.clone(), - source_id: source_id.clone(), - }; - match self.table_entries.entry(source_uid) { - Entry::Occupied(mut entry) => { - let table_entry = entry.get_mut(); - - for opened_shard in opened_shards { - // We only insert shards that we don't know about because the control plane - // knows more about the state of the shards than the metastore. - table_entry - .shards - .entry(opened_shard.shard_id) - .or_insert(opened_shard); - } - table_entry.next_shard_id = next_shard_id; - } - // This should never happen if the control plane view is consistent with the state of - // the metastore, so should we panic here? Warnings are most likely going to go - // unnoticed. - Entry::Vacant(entry) => { - let shards: FnvHashMap = opened_shards - .into_iter() - .map(|shard| (shard.shard_id, shard)) - .collect(); - let table_entry = ShardTableEntry { - shards, - next_shard_id, - }; - entry.insert(table_entry); - } - } - } - - /// Sets the state of the shards identified by their index UID, source ID, and shard IDs to - /// `Closed`. - pub fn close_shards( - &mut self, - index_uid: &IndexUid, - source_id: &SourceId, - shard_ids: &[ShardId], - ) -> Vec { - let mut closed_shard_ids = Vec::new(); - - let source_uid = SourceUid { - index_uid: index_uid.clone(), - source_id: source_id.clone(), - }; - if let Some(table_entry) = self.table_entries.get_mut(&source_uid) { - for shard_id in shard_ids { - if let Some(shard) = table_entry.shards.get_mut(shard_id) { - if !shard.is_closed() { - shard.shard_state = ShardState::Closed as i32; - closed_shard_ids.push(*shard_id); - } - } - } - } - closed_shard_ids - } - - /// Removes the shards identified by their index UID, source ID, and shard IDs. - pub fn delete_shards( - &mut self, - index_uid: &IndexUid, - source_id: &SourceId, - shard_ids: &[ShardId], - ) { - let source_uid = SourceUid { - index_uid: index_uid.clone(), - source_id: source_id.clone(), - }; - if let Some(table_entry) = self.table_entries.get_mut(&source_uid) { - for shard_id in shard_ids { - if table_entry.shards.remove(shard_id).is_none() { - warn!(shard = *shard_id, "deleting a non-existing shard"); - } - } - } - } -} - -#[cfg(test)] -mod tests { - use quickwit_config::{SourceConfig, SourceParams, INGEST_SOURCE_ID}; - use quickwit_metastore::IndexMetadata; - use quickwit_proto::ingest::Shard; - use quickwit_proto::metastore::ListIndexesMetadataResponse; - - use super::*; - - #[test] - fn test_shard_table_add_source() { - let index_uid: IndexUid = "test-index:0".into(); - let source_id = "test-source".to_string(); - let mut shard_table = ShardTable::default(); - shard_table.add_source(&index_uid, &source_id); - assert_eq!(shard_table.table_entries.len(), 1); - let source_uid = SourceUid { - index_uid: index_uid.clone(), - source_id: source_id.clone(), - }; - let table_entry = shard_table.table_entries.get(&source_uid).unwrap(); - assert!(table_entry.shards.is_empty()); - assert_eq!(table_entry.next_shard_id, 1); - } - - #[test] - fn test_shard_table_find_open_shards() { - let index_uid: IndexUid = "test-index:0".into(); - let source_id = "test-source".to_string(); - - let mut shard_table = ShardTable::default(); - shard_table.add_source(&index_uid, &source_id); - - let mut unavailable_ingesters = FnvHashSet::default(); - - let (open_shards, next_shard_id) = shard_table - .find_open_shards(&index_uid, &source_id, &unavailable_ingesters) - .unwrap(); - assert_eq!(open_shards.len(), 0); - assert_eq!(next_shard_id, 1); - - let shard_01 = Shard { - index_uid: index_uid.clone().into(), - source_id: source_id.clone(), - shard_id: 1, - leader_id: "test-leader-0".to_string(), - shard_state: ShardState::Closed as i32, - ..Default::default() - }; - let shard_02 = Shard { - index_uid: index_uid.clone().into(), - source_id: source_id.clone(), - shard_id: 2, - leader_id: "test-leader-0".to_string(), - shard_state: ShardState::Unavailable as i32, - ..Default::default() - }; - let shard_03 = Shard { - index_uid: index_uid.clone().into(), - source_id: source_id.clone(), - shard_id: 3, - leader_id: "test-leader-0".to_string(), - shard_state: ShardState::Open as i32, - ..Default::default() - }; - let shard_04 = Shard { - index_uid: index_uid.clone().into(), - source_id: source_id.clone(), - shard_id: 4, - leader_id: "test-leader-1".to_string(), - shard_state: ShardState::Open as i32, - ..Default::default() - }; - shard_table.insert_newly_opened_shards( - &index_uid, - &source_id, - vec![shard_01, shard_02, shard_03.clone(), shard_04.clone()], - 5, - ); - let (open_shards, next_shard_id) = shard_table - .find_open_shards(&index_uid, &source_id, &unavailable_ingesters) - .unwrap(); - assert_eq!(open_shards.len(), 2); - assert_eq!(open_shards[0], shard_03); - assert_eq!(open_shards[1], shard_04); - assert_eq!(next_shard_id, 5); - - unavailable_ingesters.insert("test-leader-0".into()); - - let (open_shards, next_shard_id) = shard_table - .find_open_shards(&index_uid, &source_id, &unavailable_ingesters) - .unwrap(); - assert_eq!(open_shards.len(), 1); - assert_eq!(open_shards[0], shard_04); - assert_eq!(next_shard_id, 5); - } - - #[test] - fn test_shard_table_insert_newly_opened_shards() { - let index_uid_0: IndexUid = "test-index:0".into(); - let source_id = "test-source".to_string(); - - let mut shard_table = ShardTable::default(); - - let shard_01 = Shard { - index_uid: index_uid_0.clone().into(), - source_id: source_id.clone(), - shard_id: 1, - leader_id: "test-leader-0".to_string(), - shard_state: ShardState::Open as i32, - ..Default::default() - }; - shard_table.insert_newly_opened_shards(&index_uid_0, &source_id, vec![shard_01.clone()], 2); - - assert_eq!(shard_table.table_entries.len(), 1); - - let source_uid = SourceUid { - index_uid: index_uid_0.clone(), - source_id: source_id.clone(), - }; - let table_entry = shard_table.table_entries.get(&source_uid).unwrap(); - let shards = table_entry.shards(); - assert_eq!(shards.len(), 1); - assert_eq!(shards[0], shard_01); - assert_eq!(table_entry.next_shard_id, 2); - - shard_table - .table_entries - .get_mut(&source_uid) - .unwrap() - .shards - .get_mut(&1) - .unwrap() - .shard_state = ShardState::Unavailable as i32; - - let shard_02 = Shard { - index_uid: index_uid_0.clone().into(), - source_id: source_id.clone(), - shard_id: 2, - leader_id: "test-leader-0".to_string(), - shard_state: ShardState::Open as i32, - ..Default::default() - }; - - shard_table.insert_newly_opened_shards( - &index_uid_0, - &source_id, - vec![shard_01.clone(), shard_02.clone()], - 3, - ); - - assert_eq!(shard_table.table_entries.len(), 1); - - let source_uid = SourceUid { - index_uid: index_uid_0.clone(), - source_id: source_id.clone(), - }; - let table_entry = shard_table.table_entries.get(&source_uid).unwrap(); - let shards = table_entry.shards(); - assert_eq!(shards.len(), 2); - assert_eq!(shards[0].shard_state(), ShardState::Unavailable); - assert_eq!(shards[1], shard_02); - assert_eq!(table_entry.next_shard_id, 3); - } - - #[test] - fn test_shard_table_close_shards() { - let index_uid_0: IndexUid = "test-index:0".into(); - let index_uid_1: IndexUid = "test-index:1".into(); - let source_id = "test-source".to_string(); - - let mut shard_table = ShardTable::default(); - - let shard_01 = Shard { - index_uid: index_uid_0.clone().into(), - source_id: source_id.clone(), - shard_id: 1, - leader_id: "test-leader-0".to_string(), - shard_state: ShardState::Open as i32, - ..Default::default() - }; - let shard_02 = Shard { - index_uid: index_uid_0.clone().into(), - source_id: source_id.clone(), - shard_id: 2, - leader_id: "test-leader-0".to_string(), - shard_state: ShardState::Closed as i32, - ..Default::default() - }; - let shard_11 = Shard { - index_uid: index_uid_1.clone().into(), - source_id: source_id.clone(), - shard_id: 1, - leader_id: "test-leader-0".to_string(), - shard_state: ShardState::Open as i32, - ..Default::default() - }; - shard_table.insert_newly_opened_shards( - &index_uid_0, - &source_id, - vec![shard_01, shard_02], - 3, - ); - shard_table.insert_newly_opened_shards(&index_uid_0, &source_id, vec![shard_11], 2); - - let closed_shard_ids = shard_table.close_shards(&index_uid_0, &source_id, &[1, 2, 3]); - assert_eq!(closed_shard_ids, &[1]); - - let source_uid_0 = SourceUid { - index_uid: index_uid_0, - source_id, - }; - let table_entry = shard_table.table_entries.get(&source_uid_0).unwrap(); - let shards = table_entry.shards(); - assert_eq!(shards[0].shard_state, ShardState::Closed as i32); - } - - #[test] - fn test_shard_table_delete_shards() { - let index_uid_0: IndexUid = "test-index:0".into(); - let index_uid_1: IndexUid = "test-index:1".into(); - let source_id = "test-source".to_string(); - - let mut shard_table = ShardTable::default(); - - let shard_01 = Shard { - index_uid: index_uid_0.clone().into(), - source_id: source_id.clone(), - shard_id: 1, - leader_id: "test-leader-0".to_string(), - shard_state: ShardState::Open as i32, - ..Default::default() - }; - let shard_02 = Shard { - index_uid: index_uid_0.clone().into(), - source_id: source_id.clone(), - shard_id: 2, - leader_id: "test-leader-0".to_string(), - shard_state: ShardState::Open as i32, - ..Default::default() - }; - let shard_11 = Shard { - index_uid: index_uid_1.clone().into(), - source_id: source_id.clone(), - shard_id: 1, - leader_id: "test-leader-0".to_string(), - shard_state: ShardState::Open as i32, - ..Default::default() - }; - shard_table.insert_newly_opened_shards( - &index_uid_0, - &source_id, - vec![shard_01.clone(), shard_02], - 3, - ); - shard_table.insert_newly_opened_shards(&index_uid_1, &source_id, vec![shard_11], 2); - shard_table.delete_shards(&index_uid_0, &source_id, &[2]); - shard_table.delete_shards(&index_uid_1, &source_id, &[1]); - - assert_eq!(shard_table.table_entries.len(), 2); - - let source_uid_0 = SourceUid { - index_uid: index_uid_0.clone(), - source_id: source_id.clone(), - }; - let table_entry = shard_table.table_entries.get(&source_uid_0).unwrap(); - let shards = table_entry.shards(); - assert_eq!(shards.len(), 1); - assert_eq!(shards[0], shard_01); - assert_eq!(table_entry.next_shard_id, 3); - - let source_uid_1 = SourceUid { - index_uid: index_uid_1.clone(), - source_id: source_id.clone(), - }; - let table_entry = shard_table.table_entries.get(&source_uid_1).unwrap(); - assert!(table_entry.is_empty()); - assert_eq!(table_entry.next_shard_id, 2); - } - - #[tokio::test] - async fn test_control_plane_model_load_shard_table() { - let progress = Progress::default(); - - let mut mock_metastore = MetastoreServiceClient::mock(); - mock_metastore - .expect_list_indexes_metadata() - .returning(|request| { - assert_eq!(request, ListIndexesMetadataRequest::all()); - - let mut index_0 = IndexMetadata::for_test("test-index-0", "ram:///test-index-0"); - let mut source_config = SourceConfig::ingest_v2_default(); - source_config.enabled = true; - index_0.add_source(source_config.clone()).unwrap(); - - let mut index_1 = IndexMetadata::for_test("test-index-1", "ram:///test-index-1"); - index_1.add_source(source_config.clone()).unwrap(); - - let mut index_2 = IndexMetadata::for_test("test-index-2", "ram:///test-index-2"); - source_config.enabled = false; - index_2.add_source(source_config.clone()).unwrap(); - - let indexes = vec![index_0, index_1, index_2]; - Ok(ListIndexesMetadataResponse::try_from_indexes_metadata(indexes).unwrap()) - }); - mock_metastore.expect_list_shards().returning(|request| { - assert_eq!(request.subrequests.len(), 2); - - assert_eq!(request.subrequests[0].index_uid, "test-index-0:0"); - assert_eq!(request.subrequests[0].source_id, INGEST_SOURCE_ID); - assert_eq!(request.subrequests[0].shard_state(), ShardState::Open); - - assert_eq!(request.subrequests[1].index_uid, "test-index-1:0"); - assert_eq!(request.subrequests[1].source_id, INGEST_SOURCE_ID); - assert_eq!(request.subrequests[1].shard_state(), ShardState::Open); - - let subresponses = vec![ - metastore::ListShardsSubresponse { - index_uid: "test-index-0:0".to_string(), - source_id: INGEST_SOURCE_ID.to_string(), - shards: vec![Shard { - shard_id: 42, - ..Default::default() - }], - next_shard_id: 43, - }, - metastore::ListShardsSubresponse { - index_uid: "test-index-1:0".to_string(), - source_id: INGEST_SOURCE_ID.to_string(), - shards: Vec::new(), - next_shard_id: 1, - }, - ]; - let response = metastore::ListShardsResponse { subresponses }; - Ok(response) - }); - let mut model = ControlPlaneModel::default(); - let mut metastore_client = MetastoreServiceClient::from(mock_metastore); - model - .load_from_metastore(&mut metastore_client, &progress) - .await - .unwrap(); - - assert_eq!(model.index_table.len(), 3); - assert_eq!( - model.index_uid("test-index-0").unwrap().as_str(), - "test-index-0:0" - ); - assert_eq!( - model.index_uid("test-index-1").unwrap().as_str(), - "test-index-1:0" - ); - assert_eq!( - model.index_uid("test-index-2").unwrap().as_str(), - "test-index-2:0" - ); - - assert_eq!(model.shard_table.table_entries.len(), 2); - - let source_uid_0 = SourceUid { - index_uid: "test-index-0:0".into(), - source_id: INGEST_SOURCE_ID.to_string(), - }; - let table_entry = model.shard_table.table_entries.get(&source_uid_0).unwrap(); - let shards = table_entry.shards(); - assert_eq!(shards.len(), 1); - assert_eq!(shards[0].shard_id, 42); - assert_eq!(table_entry.next_shard_id, 43); - - let source_uid_1 = SourceUid { - index_uid: "test-index-1:0".into(), - source_id: INGEST_SOURCE_ID.to_string(), - }; - let table_entry = model.shard_table.table_entries.get(&source_uid_1).unwrap(); - let shards = table_entry.shards(); - assert_eq!(shards.len(), 0); - assert_eq!(table_entry.next_shard_id, 1); - } - - #[test] - fn test_control_plane_model_toggle_source() { - let mut model = ControlPlaneModel::default(); - let index_metadata = IndexMetadata::for_test("test-index", "ram://"); - let index_uid = index_metadata.index_uid.clone(); - model.add_index(index_metadata); - let source_config = SourceConfig::for_test("test-source", SourceParams::void()); - model.add_source(&index_uid, source_config).unwrap(); - { - let has_changed = model - .toggle_source(&index_uid, &"test-source".to_string(), true) - .unwrap(); - assert!(!has_changed); - } - { - let has_changed = model - .toggle_source(&index_uid, &"test-source".to_string(), true) - .unwrap(); - assert!(!has_changed); - } - { - let has_changed = model - .toggle_source(&index_uid, &"test-source".to_string(), false) - .unwrap(); - assert!(has_changed); - } - { - let has_changed = model - .toggle_source(&index_uid, &"test-source".to_string(), false) - .unwrap(); - assert!(!has_changed); - } - { - let has_changed = model - .toggle_source(&index_uid, &"test-source".to_string(), true) - .unwrap(); - assert!(has_changed); - } - { - let has_changed = model - .toggle_source(&index_uid, &"test-source".to_string(), true) - .unwrap(); - assert!(!has_changed); - } - } -} diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs index f52de199ceb..d47f6072806 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs @@ -30,14 +30,14 @@ use quickwit_proto::indexing::{ ApplyIndexingPlanRequest, CpuCapacity, IndexingService, IndexingTask, PIPELINE_FULL_CAPACITY, }; use quickwit_proto::metastore::SourceType; -use quickwit_proto::types::NodeId; +use quickwit_proto::types::{NodeId, ShardId}; use scheduling::{SourceToSchedule, SourceToScheduleType}; use serde::Serialize; use tracing::{debug, error, info, warn}; -use crate::control_plane_model::ControlPlaneModel; use crate::indexing_plan::PhysicalIndexingPlan; use crate::indexing_scheduler::scheduling::build_physical_indexing_plan; +use crate::model::ControlPlaneModel; use crate::{IndexerNodeInfo, IndexerPool}; pub(crate) const MIN_DURATION_BETWEEN_SCHEDULING: Duration = @@ -117,6 +117,7 @@ impl fmt::Debug for IndexingScheduler { fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec { let mut sources = Vec::new(); + for (source_uid, source_config) in model.get_source_configs() { if !source_config.enabled { continue; @@ -137,11 +138,17 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec { }); } SourceType::IngestV2 => { - let shards = model.list_shards(&source_uid); + // Expect: the source should exist since we just read it from `get_source_configs`. + let shard_ids: Vec = model + .list_shards(&source_uid) + .expect("source should exist") + .map(|shard| shard.shard_id) + .collect(); + sources.push(SourceToSchedule { source_uid, source_type: SourceToScheduleType::Sharded { - shards, + shard_ids, // FIXME load_per_shard: NonZeroU32::new(250u32).unwrap(), }, @@ -787,8 +794,8 @@ mod tests { match &source_to_schedule.source_type { SourceToScheduleType::IngestV1 => {} SourceToScheduleType::Sharded { - shards: _, load_per_shard, + .. } => { load_in_node += load_per_shard.get() * task.shard_ids.len() as u32; } diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs index 398f55f8251..e1a724d484d 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs @@ -59,10 +59,10 @@ fn populate_problem( None } SourceToScheduleType::Sharded { - shards, + shard_ids, load_per_shard, } => { - let num_shards = shards.len() as u32; + let num_shards = shard_ids.len() as u32; let source_ord = problem.add_source(num_shards, *load_per_shard); Some(source_ord) } @@ -140,7 +140,7 @@ pub struct SourceToSchedule { #[derive(Debug)] pub enum SourceToScheduleType { Sharded { - shards: Vec, + shard_ids: Vec, load_per_shard: NonZeroU32, }, NonSharded { @@ -153,10 +153,7 @@ pub enum SourceToScheduleType { fn compute_max_num_shards_per_pipeline(source_type: &SourceToScheduleType) -> NonZeroU32 { match &source_type { - SourceToScheduleType::Sharded { - shards: _, - load_per_shard, - } => { + SourceToScheduleType::Sharded { load_per_shard, .. } => { NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis() / load_per_shard.get()) .unwrap_or_else(|| { // We throttle shard at ingestion to ensure that a shard does not @@ -187,11 +184,11 @@ fn convert_scheduling_solution_to_physical_plan_single_node_single_source( ) -> Vec { match &source.source_type { SourceToScheduleType::Sharded { - shards, + shard_ids, load_per_shard, } => { // For the moment we do something voluntarily suboptimal. - let max_num_pipelines = (shards.len() as u32) * load_per_shard.get() + let max_num_pipelines = (shard_ids.len() as u32) * load_per_shard.get() / CPU_PER_PIPELINE_LOAD_THRESHOLD.cpu_millis(); if previous_tasks.len() > max_num_pipelines as usize { previous_tasks = &previous_tasks[..max_num_pipelines as usize]; @@ -208,7 +205,7 @@ fn convert_scheduling_solution_to_physical_plan_single_node_single_source( .shard_ids .iter() .copied() - .filter(|shard_id| shards.contains(shard_id)) + .filter(|shard_id| shard_ids.contains(shard_id)) .take(max_shard_in_pipeline) .collect(); remaining_num_shards_to_schedule_on_node -= shard_ids.len() as u32; @@ -333,11 +330,7 @@ fn convert_scheduling_solution_to_physical_plan( } for source in sources { - let SourceToScheduleType::Sharded { - shards, - load_per_shard: _, - } = &source.source_type - else { + let SourceToScheduleType::Sharded { shard_ids, .. } = &source.source_type else { continue; }; let source_ord = id_to_ord_map.source_ord(&source.source_uid).unwrap(); @@ -369,9 +362,9 @@ fn convert_scheduling_solution_to_physical_plan( } // Missing shards is the list of shards that is not scheduled into a pipeline yet. - let missing_shards: Vec = shards + let missing_shards: Vec = shard_ids .iter() - .filter(|&shard| !scheduled_shards.contains(shard)) + .filter(|shard_id| !scheduled_shards.contains(shard_id)) .copied() .collect(); @@ -536,7 +529,7 @@ mod tests { let source_0 = SourceToSchedule { source_uid: source_uid0.clone(), source_type: SourceToScheduleType::Sharded { - shards: vec![0, 1, 2, 3, 4, 5, 6, 7], + shard_ids: vec![0, 1, 2, 3, 4, 5, 6, 7], load_per_shard: NonZeroU32::new(1_000).unwrap(), }, }; @@ -649,7 +642,7 @@ mod tests { let sources = vec![SourceToSchedule { source_uid: source_uid.clone(), source_type: SourceToScheduleType::Sharded { - shards: vec![0, 1, 3, 4, 5], + shard_ids: vec![0, 1, 3, 4, 5], load_per_shard: NonZeroU32::new(1_000).unwrap(), }, }]; @@ -680,7 +673,7 @@ mod tests { let sources = vec![SourceToSchedule { source_uid: source_uid.clone(), source_type: SourceToScheduleType::Sharded { - shards: shard_ids.to_vec(), + shard_ids: shard_ids.to_vec(), load_per_shard: NonZeroU32::new(load_per_shard.cpu_millis()).unwrap(), }, }]; diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs index 3ce5816141d..dd0d9dbbfa4 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs @@ -25,6 +25,7 @@ use quickwit_proto::indexing::CpuCapacity; pub type SourceOrd = u32; pub type IndexerOrd = usize; + #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub struct Source { pub source_ord: SourceOrd, diff --git a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs index 56541ee3d25..59f54535943 100644 --- a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs +++ b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs @@ -17,28 +17,42 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use std::collections::HashMap; use std::fmt; use std::time::Duration; use fnv::{FnvHashMap, FnvHashSet}; use itertools::Itertools; use quickwit_common::{PrettySample, Progress}; -use quickwit_ingest::IngesterPool; +use quickwit_ingest::{IngesterPool, LocalShardsUpdate}; use quickwit_proto::control_plane::{ ControlPlaneError, ControlPlaneResult, GetOrCreateOpenShardsFailure, GetOrCreateOpenShardsFailureReason, GetOrCreateOpenShardsRequest, GetOrCreateOpenShardsResponse, GetOrCreateOpenShardsSuccess, }; -use quickwit_proto::ingest::ingester::{IngesterService, InitShardsRequest, PingRequest}; +use quickwit_proto::ingest::ingester::{ + CloseShardsRequest, IngesterService, InitShardsRequest, PingRequest, +}; use quickwit_proto::ingest::{IngestV2Error, Shard, ShardIds, ShardState}; use quickwit_proto::metastore; use quickwit_proto::metastore::{MetastoreService, MetastoreServiceClient}; -use quickwit_proto::types::{IndexUid, NodeId}; +use quickwit_proto::types::{IndexUid, NodeId, ShardId, SourceUid}; use rand::seq::SliceRandom; use tokio::time::timeout; use tracing::{info, warn}; -use crate::control_plane_model::ControlPlaneModel; +use crate::metrics::CONTROL_PLANE_METRICS; +use crate::model::{ControlPlaneModel, ScalingMode, ShardEntry, ShardStats}; + +const MAX_SHARD_INGESTION_THROUGHPUT_MIB_PER_SEC: f32 = 5.; + +/// Threshold in MiB/s above which we increase the number of shards. +const SCALE_UP_SHARDS_THRESHOLD_MIB_PER_SEC: f32 = + MAX_SHARD_INGESTION_THROUGHPUT_MIB_PER_SEC * 8. / 10.; + +/// Threshold in MiB/s below which we decrease the number of shards. +const SCALE_DOWN_SHARDS_THRESHOLD_MIB_PER_SEC: f32 = + MAX_SHARD_INGESTION_THROUGHPUT_MIB_PER_SEC * 2. / 10.; const PING_LEADER_TIMEOUT: Duration = if cfg!(test) { Duration::from_millis(50) @@ -176,13 +190,17 @@ impl IngestController { for closed_shard in closed_shards { let index_uid: IndexUid = closed_shard.index_uid.into(); let source_id = closed_shard.source_id; - let closed_shard_ids = - model.close_shards(&index_uid, &source_id, &closed_shard.shard_ids); + + let source_uid = SourceUid { + index_uid, + source_id, + }; + let closed_shard_ids = model.close_shards(&source_uid, &closed_shard.shard_ids); if !closed_shard_ids.is_empty() { info!( - index_id=%index_uid.index_id(), - source_id=%source_id, + index_id=%source_uid.index_uid.index_id(), + source_id=%source_uid.source_id, shard_ids=?PrettySample::new(&closed_shard_ids, 5), "closed {} shard(s) reported by router", closed_shard_ids.len() @@ -191,6 +209,32 @@ impl IngestController { } } + pub(crate) async fn handle_local_shards_update( + &mut self, + local_shards_update: LocalShardsUpdate, + model: &mut ControlPlaneModel, + progress: &Progress, + ) { + let shard_stats = model.update_shards( + &local_shards_update.source_uid, + &local_shards_update.shard_infos, + ); + if shard_stats.avg_ingestion_rate >= SCALE_UP_SHARDS_THRESHOLD_MIB_PER_SEC { + self.try_scale_up_shards(local_shards_update.source_uid, shard_stats, model, progress) + .await; + } else if shard_stats.avg_ingestion_rate <= SCALE_DOWN_SHARDS_THRESHOLD_MIB_PER_SEC + && shard_stats.num_open_shards > 1 + { + self.try_scale_down_shards( + local_shards_update.source_uid, + shard_stats, + model, + progress, + ) + .await; + } + } + fn handle_unavailable_leaders( &self, unavailable_leaders: &FnvHashSet, @@ -207,11 +251,11 @@ impl IngestController { } } if !confirmed_unavailable_leaders.is_empty() { - for shard in model.shards_mut() { - if shard.shard_state().is_open() - && confirmed_unavailable_leaders.contains(&shard.leader_id) + for shard_entry in model.all_shards_mut() { + if shard_entry.is_open() + && confirmed_unavailable_leaders.contains(&shard_entry.leader_id) { - shard.shard_state = ShardState::Unavailable as i32; + shard_entry.set_shard_state(ShardState::Unavailable); } } } @@ -253,7 +297,7 @@ impl IngestController { get_or_create_open_shards_failures.push(get_or_create_open_shards_failure); continue; }; - let Some((open_shards, next_shard_id)) = model.find_open_shards( + let Some((open_shard_entries, next_shard_id)) = model.find_open_shards( &index_uid, &get_open_shards_subrequest.source_id, &unavailable_leaders, @@ -267,7 +311,11 @@ impl IngestController { get_or_create_open_shards_failures.push(get_or_create_open_shards_failure); continue; }; - if !open_shards.is_empty() { + if !open_shard_entries.is_empty() { + let open_shards: Vec = open_shard_entries + .into_iter() + .map(|shard_entry| shard_entry.shard) + .collect(); let get_or_create_open_shards_success = GetOrCreateOpenShardsSuccess { subrequest_id: get_open_shards_subrequest.subrequest_id, index_uid: index_uid.into(), @@ -282,7 +330,7 @@ impl IngestController { .find_leader_and_follower(&mut unavailable_leaders, progress) .await .ok_or_else(|| { - ControlPlaneError::Unavailable("no available ingester".to_string()) + ControlPlaneError::Unavailable("no ingester available".to_string()) })?; let open_shards_subrequest = metastore::OpenShardsSubrequest { subrequest_id: get_open_shards_subrequest.subrequest_id, @@ -303,7 +351,8 @@ impl IngestController { .protect_future(self.metastore.open_shards(open_shards_request)) .await?; - self.init_shards(&open_shards_response, progress).await; + // TODO: Handle failures. + let _ = self.init_shards(&open_shards_response, progress).await; for open_shards_subresponse in open_shards_response.subresponses { let index_uid: IndexUid = open_shards_subresponse.index_uid.clone().into(); @@ -315,9 +364,13 @@ impl IngestController { open_shards_subresponse.opened_shards, open_shards_subresponse.next_shard_id, ); - if let Some((open_shards, _next_shard_id)) = + if let Some((open_shard_entries, _next_shard_id)) = model.find_open_shards(&index_uid, &source_id, &unavailable_leaders) { + let open_shards = open_shard_entries + .into_iter() + .map(|shard_entry| shard_entry.shard) + .collect(); let get_or_create_open_shards_success = GetOrCreateOpenShardsSuccess { subrequest_id: open_shards_subresponse.subrequest_id, index_uid: index_uid.into(), @@ -335,14 +388,15 @@ impl IngestController { } /// Calls init shards on the leaders hosting newly opened shards. + // TODO: Return partial failures instead of failing the whole request. async fn init_shards( &self, - open_shard_response: &metastore::OpenShardsResponse, + open_shards_response: &metastore::OpenShardsResponse, progress: &Progress, - ) { + ) -> Result<(), IngestV2Error> { let mut per_leader_opened_shards: FnvHashMap<&String, Vec> = FnvHashMap::default(); - for subresponse in &open_shard_response.subresponses { + for subresponse in &open_shards_response.subresponses { for shard in &subresponse.opened_shards { per_leader_opened_shards .entry(&shard.leader_id) @@ -350,6 +404,7 @@ impl IngestController { .push(shard.clone()); } } + // TODO: Init shards in parallel. for (leader_id, shards) in per_leader_opened_shards { let init_shards_request = InitShardsRequest { shards }; @@ -357,16 +412,183 @@ impl IngestController { warn!("failed to init shards: ingester `{leader_id}` is unavailable"); continue; }; - if let Err(error) = progress + progress .protect_future(leader.init_shards(init_shards_request)) - .await - { - warn!("failed to init shards: {error}"); + .await?; + } + Ok(()) + } + + /// Attempts to increase the number of shards. This operation is rate limited to avoid creating + /// to many shards in a short period of time. As a result, this method may not create any + /// shard. + async fn try_scale_up_shards( + &mut self, + source_uid: SourceUid, + shard_stats: ShardStats, + model: &mut ControlPlaneModel, + progress: &Progress, + ) { + const NUM_PERMITS: u64 = 1; + + if !model + .acquire_scaling_permits(&source_uid, ScalingMode::Up, NUM_PERMITS) + .unwrap_or(false) + { + return; + } + let new_num_open_shards = shard_stats.num_open_shards + 1; + + info!( + index_id=%source_uid.index_uid.index_id(), + source_id=%source_uid.source_id, + "scaling up number of shards to {new_num_open_shards}" + ); + // Expect: the source should exist because we just acquired a permit. + let next_shard_id = model + .next_shard_id(&source_uid) + .expect("source should exist"); + + let mut unavailable_leaders: FnvHashSet = FnvHashSet::default(); + + let Some((leader_id, follower_id)) = self + .find_leader_and_follower(&mut unavailable_leaders, progress) + .await + else { + warn!("failed to scale up number of shards: no ingester available"); + model.release_scaling_permits(&source_uid, ScalingMode::Up, NUM_PERMITS); + return; + }; + let open_shards_subrequest = metastore::OpenShardsSubrequest { + subrequest_id: 0, + index_uid: source_uid.index_uid.clone().into(), + source_id: source_uid.source_id.clone(), + leader_id: leader_id.into(), + follower_id: follower_id.map(Into::into), + next_shard_id, + }; + let open_shards_request = metastore::OpenShardsRequest { + subrequests: vec![open_shards_subrequest], + }; + let open_shards_response = match progress + .protect_future(self.metastore.open_shards(open_shards_request)) + .await + { + Ok(open_shards_response) => open_shards_response, + Err(error) => { + warn!("failed to scale up number of shards: {error}"); + model.release_scaling_permits(&source_uid, ScalingMode::Up, NUM_PERMITS); + return; } + }; + if let Err(error) = self.init_shards(&open_shards_response, progress).await { + warn!("failed to scale up number of shards: {error}"); + model.release_scaling_permits(&source_uid, ScalingMode::Up, NUM_PERMITS); + return; } + for open_shards_subresponse in open_shards_response.subresponses { + let index_uid: IndexUid = open_shards_subresponse.index_uid.into(); + let source_id = open_shards_subresponse.source_id; + + model.insert_newly_opened_shards( + &index_uid, + &source_id, + open_shards_subresponse.opened_shards, + open_shards_subresponse.next_shard_id, + ); + } + let label_values = [source_uid.index_uid.index_id(), &source_uid.source_id]; + CONTROL_PLANE_METRICS + .open_shards_total + .with_label_values(label_values) + .set(new_num_open_shards as i64); + } + + /// Attempts to decrease the number of shards. This operation is rate limited to avoid closing + /// shards too aggressively. As a result, this method may not close any shard. + async fn try_scale_down_shards( + &self, + source_uid: SourceUid, + shard_stats: ShardStats, + model: &mut ControlPlaneModel, + progress: &Progress, + ) { + const NUM_PERMITS: u64 = 1; + + if !model + .acquire_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS) + .unwrap_or(false) + { + return; + } + let new_num_open_shards = shard_stats.num_open_shards - 1; + + info!( + index_id=%source_uid.index_uid.index_id(), + source_id=%source_uid.source_id, + "scaling down number of shards to {new_num_open_shards}" + ); + let Some((leader_id, shard_id)) = find_scale_down_candidate(&source_uid, model) else { + model.release_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS); + return; + }; + let Some(mut ingester) = self.ingester_pool.get(&leader_id) else { + model.release_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS); + return; + }; + let shards = vec![ShardIds { + index_uid: source_uid.index_uid.clone().into(), + source_id: source_uid.source_id.clone(), + shard_ids: vec![shard_id], + }]; + let close_shards_request = CloseShardsRequest { shards }; + + if let Err(error) = progress + .protect_future(ingester.close_shards(close_shards_request)) + .await + { + warn!("failed to scale down number of shards: {error}"); + model.release_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS); + return; + } + model.close_shards(&source_uid, &[shard_id]); + + let label_values = [source_uid.index_uid.index_id(), &source_uid.source_id]; + CONTROL_PLANE_METRICS + .open_shards_total + .with_label_values(label_values) + .set(new_num_open_shards as i64); } } +/// Finds the shard with the highest ingestion rate on the ingester with the least number of open +/// shards. +fn find_scale_down_candidate( + source_uid: &SourceUid, + model: &ControlPlaneModel, +) -> Option<(NodeId, ShardId)> { + let mut per_leader_candidates: HashMap<&String, (usize, &ShardEntry)> = HashMap::new(); + + for shard in model.list_shards(source_uid)? { + if shard.is_open() { + per_leader_candidates + .entry(&shard.leader_id) + .and_modify(|(num_shards, candidate)| { + *num_shards += 1; + + if candidate.ingestion_rate < shard.ingestion_rate { + *candidate = shard; + } + }) + .or_insert((1, shard)); + } + } + per_leader_candidates + .into_iter() + .min_by_key(|(_leader_id, (num_shards, _shard))| *num_shards) + .map(|(leader_id, (_num_shards, shard))| (leader_id.clone().into(), shard.shard_id)) +} + #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum PingError { LeaderUnavailable, @@ -376,13 +598,18 @@ pub enum PingError { #[cfg(test)] mod tests { - use quickwit_config::{SourceConfig, SourceParams}; + use std::collections::BTreeSet; + + use quickwit_config::{SourceConfig, SourceParams, INGEST_SOURCE_ID}; + use quickwit_ingest::{RateMibPerSec, ShardInfo}; use quickwit_metastore::IndexMetadata; use quickwit_proto::control_plane::GetOrCreateOpenShardsSubrequest; use quickwit_proto::ingest::ingester::{ - IngesterServiceClient, InitShardsResponse, MockIngesterService, PingResponse, + CloseShardsResponse, IngesterServiceClient, InitShardsResponse, MockIngesterService, + PingResponse, }; use quickwit_proto::ingest::{Shard, ShardState}; + use quickwit_proto::metastore::MetastoreError; use quickwit_proto::types::SourceId; use super::*; @@ -821,8 +1048,11 @@ mod tests { .await .unwrap(); - let shard_1 = model.shards().find(|shard| shard.shard_id == 1).unwrap(); - assert!(shard_1.shard_state().is_closed()); + let shard_1 = model + .all_shards_mut() + .find(|shard| shard.shard_id == 1) + .unwrap(); + assert!(shard_1.is_closed()); } #[tokio::test] @@ -876,13 +1106,474 @@ mod tests { .await .unwrap(); - let shard_1 = model.shards().find(|shard| shard.shard_id == 1).unwrap(); - assert!(shard_1.shard_state().is_unavailable()); + let shard_1 = model + .all_shards_mut() + .find(|shard| shard.shard_id == 1) + .unwrap(); + assert!(shard_1.is_unavailable()); + + let shard_2 = model + .all_shards_mut() + .find(|shard| shard.shard_id == 2) + .unwrap(); + assert!(shard_2.is_closed()); + + let shard_3 = model + .all_shards_mut() + .find(|shard| shard.shard_id == 3) + .unwrap(); + assert!(shard_3.is_open()); + } + + #[tokio::test] + async fn test_ingest_controller_handle_local_shards_update() { + let metastore = MetastoreServiceClient::mock().into(); + let ingester_pool = IngesterPool::default(); + let replication_factor = 1; + + let mut ingest_controller = + IngestController::new(metastore, ingester_pool.clone(), replication_factor); + + let index_uid: IndexUid = "test-index:0".into(); + let source_id: SourceId = "test-source".into(); + + let source_uid = SourceUid { + index_uid: index_uid.clone(), + source_id: source_id.clone(), + }; + let mut model = ControlPlaneModel::default(); + let progress = Progress::default(); + + let shards = vec![Shard { + shard_id: 1, + leader_id: "test-ingester".to_string(), + shard_state: ShardState::Open as i32, + ..Default::default() + }]; + model.insert_newly_opened_shards(&index_uid, &source_id, shards, 2); + + let shard_entries: Vec = model + .all_shards_mut() + .map(|shard_entry| shard_entry.clone()) + .collect(); + assert_eq!(shard_entries.len(), 1); + assert_eq!(shard_entries[0].ingestion_rate, 0); + + // Test update shard ingestion rate but no scale down because num open shards is 1. + let shard_infos = BTreeSet::from_iter([ShardInfo { + shard_id: 1, + shard_state: ShardState::Open, + ingestion_rate: RateMibPerSec(1), + }]); + let local_shards_update = LocalShardsUpdate { + source_uid: source_uid.clone(), + shard_infos, + }; + ingest_controller + .handle_local_shards_update(local_shards_update, &mut model, &progress) + .await; + + let shard_entries: Vec = model + .all_shards_mut() + .map(|shard_entry| shard_entry.clone()) + .collect(); + assert_eq!(shard_entries.len(), 1); + assert_eq!(shard_entries[0].ingestion_rate, 1); + + // Test update shard ingestion rate with failing scale down. + let shards = vec![Shard { + shard_id: 2, + leader_id: "test-ingester".to_string(), + shard_state: ShardState::Open as i32, + ..Default::default() + }]; + model.insert_newly_opened_shards(&index_uid, &source_id, shards, 2); + + let shard_entries: Vec = model + .all_shards_mut() + .map(|shard_entry| shard_entry.clone()) + .collect(); + assert_eq!(shard_entries.len(), 2); + + let mut ingester_mock = IngesterServiceClient::mock(); + + ingester_mock.expect_close_shards().returning(|request| { + assert_eq!(request.shards.len(), 1); + assert_eq!(request.shards[0].index_uid, "test-index:0"); + assert_eq!(request.shards[0].source_id, "test-source"); + assert_eq!(request.shards[0].shard_ids, vec![1]); + + Err(IngestV2Error::Internal( + "failed to close shards".to_string(), + )) + }); + ingester_mock.expect_ping().returning(|request| { + assert_eq!(request.leader_id, "test-ingester"); + + Err(IngestV2Error::Internal("failed ping ingester".to_string())) + }); + ingester_pool.insert("test-ingester".into(), ingester_mock.into()); + + let shard_infos = BTreeSet::from_iter([ + ShardInfo { + shard_id: 1, + shard_state: ShardState::Open, + ingestion_rate: RateMibPerSec(1), + }, + ShardInfo { + shard_id: 2, + shard_state: ShardState::Open, + ingestion_rate: RateMibPerSec(1), + }, + ]); + let local_shards_update = LocalShardsUpdate { + source_uid: source_uid.clone(), + shard_infos, + }; + ingest_controller + .handle_local_shards_update(local_shards_update, &mut model, &progress) + .await; + + // Test update shard ingestion rate with failing scale up. + let shard_infos = BTreeSet::from_iter([ + ShardInfo { + shard_id: 1, + shard_state: ShardState::Open, + ingestion_rate: RateMibPerSec(4), + }, + ShardInfo { + shard_id: 2, + shard_state: ShardState::Open, + ingestion_rate: RateMibPerSec(4), + }, + ]); + let local_shards_update = LocalShardsUpdate { + source_uid: source_uid.clone(), + shard_infos, + }; + ingest_controller + .handle_local_shards_update(local_shards_update, &mut model, &progress) + .await; + } + + #[tokio::test] + async fn test_ingest_controller_try_scale_up_shards() { + let mut mock_metastore = MetastoreServiceClient::mock(); + + mock_metastore + .expect_open_shards() + .once() + .returning(|request| { + assert_eq!(request.subrequests.len(), 1); + assert_eq!(request.subrequests[0].index_uid, "test-index:0"); + assert_eq!(request.subrequests[0].source_id, INGEST_SOURCE_ID); + assert_eq!(request.subrequests[0].leader_id, "test-ingester"); + assert_eq!(request.subrequests[0].next_shard_id, 1); + + Err(MetastoreError::InvalidArgument { + message: "failed to open shards".to_string(), + }) + }); + mock_metastore.expect_open_shards().returning(|request| { + assert_eq!(request.subrequests.len(), 1); + assert_eq!(request.subrequests[0].index_uid, "test-index:0"); + assert_eq!(request.subrequests[0].source_id, INGEST_SOURCE_ID); + assert_eq!(request.subrequests[0].leader_id, "test-ingester"); + assert_eq!(request.subrequests[0].next_shard_id, 1); + + let subresponses = vec![metastore::OpenShardsSubresponse { + subrequest_id: 0, + index_uid: "test-index:0".into(), + source_id: INGEST_SOURCE_ID.to_string(), + opened_shards: vec![Shard { + index_uid: "test-index:0".into(), + source_id: INGEST_SOURCE_ID.to_string(), + shard_id: 1, + leader_id: "test-ingester".to_string(), + shard_state: ShardState::Open as i32, + ..Default::default() + }], + next_shard_id: 2, + }]; + let response = metastore::OpenShardsResponse { subresponses }; + Ok(response) + }); + + let ingester_pool = IngesterPool::default(); + let replication_factor = 1; + + let mut ingest_controller = IngestController::new( + mock_metastore.into(), + ingester_pool.clone(), + replication_factor, + ); + + let index_uid: IndexUid = "test-index:0".into(); + let source_id: SourceId = INGEST_SOURCE_ID.to_string(); + + let source_uid = SourceUid { + index_uid: index_uid.clone(), + source_id: source_id.clone(), + }; + let shard_stats = ShardStats { + num_open_shards: 2, + ..Default::default() + }; + let mut model = ControlPlaneModel::default(); + let index_metadata = + IndexMetadata::for_test(index_uid.index_id(), "ram://indexes/test-index:0"); + model.add_index(index_metadata); + + let souce_config = SourceConfig::ingest_v2_default(); + model.add_source(&index_uid, souce_config).unwrap(); + + let progress = Progress::default(); + + // Test could not find leader. + ingest_controller + .try_scale_up_shards(source_uid.clone(), shard_stats, &mut model, &progress) + .await; + + let mut ingester_mock = IngesterServiceClient::mock(); + + ingester_mock.expect_ping().returning(|request| { + assert_eq!(request.leader_id, "test-ingester"); + + Ok(PingResponse {}) + }); + ingester_mock + .expect_init_shards() + .once() + .returning(|request| { + assert_eq!(request.shards.len(), 1); + assert_eq!(request.shards[0].index_uid, "test-index:0"); + assert_eq!(request.shards[0].source_id, INGEST_SOURCE_ID); + assert_eq!(request.shards[0].shard_id, 1); + assert_eq!(request.shards[0].leader_id, "test-ingester"); + + Err(IngestV2Error::Internal("failed to init shards".to_string())) + }); + ingester_mock.expect_init_shards().returning(|request| { + assert_eq!(request.shards.len(), 1); + assert_eq!(request.shards[0].index_uid, "test-index:0"); + assert_eq!(request.shards[0].source_id, INGEST_SOURCE_ID); + assert_eq!(request.shards[0].shard_id, 1); + assert_eq!(request.shards[0].leader_id, "test-ingester"); + + Ok(InitShardsResponse {}) + }); + ingester_pool.insert("test-ingester".into(), ingester_mock.into()); + + // Test failed to open shards. + ingest_controller + .try_scale_up_shards(source_uid.clone(), shard_stats, &mut model, &progress) + .await; + assert_eq!(model.all_shards_mut().count(), 0); + + // Test failed to init shards. + ingest_controller + .try_scale_up_shards(source_uid.clone(), shard_stats, &mut model, &progress) + .await; + assert_eq!(model.all_shards_mut().count(), 0); + + // Test successfully opened shard. + ingest_controller + .try_scale_up_shards(source_uid.clone(), shard_stats, &mut model, &progress) + .await; + assert_eq!( + model + .all_shards_mut() + .filter(|shard| shard.is_open()) + .count(), + 1 + ); + } + + #[tokio::test] + async fn test_ingest_controller_try_scale_down_shards() { + let metastore = MetastoreServiceClient::mock().into(); + let ingester_pool = IngesterPool::default(); + let replication_factor = 1; + + let ingest_controller = + IngestController::new(metastore, ingester_pool.clone(), replication_factor); + + let index_uid: IndexUid = "test-index:0".into(); + let source_id: SourceId = "test-source".into(); + + let source_uid = SourceUid { + index_uid: index_uid.clone(), + source_id: source_id.clone(), + }; + let shard_stats = ShardStats { + num_open_shards: 2, + ..Default::default() + }; + let mut model = ControlPlaneModel::default(); + let progress = Progress::default(); + + // Test could not find a scale down candidate. + ingest_controller + .try_scale_down_shards(source_uid.clone(), shard_stats, &mut model, &progress) + .await; + + let shards = vec![Shard { + shard_id: 1, + leader_id: "test-ingester".to_string(), + shard_state: ShardState::Open as i32, + ..Default::default() + }]; + model.insert_newly_opened_shards(&index_uid, &source_id, shards, 2); + + // Test ingester is unavailable. + ingest_controller + .try_scale_down_shards(source_uid.clone(), shard_stats, &mut model, &progress) + .await; + + let mut ingester_mock = IngesterServiceClient::mock(); + + ingester_mock + .expect_close_shards() + .once() + .returning(|request| { + assert_eq!(request.shards.len(), 1); + assert_eq!(request.shards[0].index_uid, "test-index:0"); + assert_eq!(request.shards[0].source_id, "test-source"); + assert_eq!(request.shards[0].shard_ids, vec![1]); + + Err(IngestV2Error::Internal( + "failed to close shards".to_string(), + )) + }); + ingester_mock + .expect_close_shards() + .once() + .returning(|request| { + assert_eq!(request.shards.len(), 1); + assert_eq!(request.shards[0].index_uid, "test-index:0"); + assert_eq!(request.shards[0].source_id, "test-source"); + assert_eq!(request.shards[0].shard_ids, vec![1]); + + Ok(CloseShardsResponse {}) + }); + ingester_pool.insert("test-ingester".into(), ingester_mock.into()); - let shard_2 = model.shards().find(|shard| shard.shard_id == 2).unwrap(); - assert!(shard_2.shard_state().is_closed()); + // Test failed to close shard. + ingest_controller + .try_scale_down_shards(source_uid.clone(), shard_stats, &mut model, &progress) + .await; + assert!(model.all_shards_mut().all(|shard| shard.is_open())); + + // Test successfully closed shard. + ingest_controller + .try_scale_down_shards(source_uid.clone(), shard_stats, &mut model, &progress) + .await; + assert!(model.all_shards_mut().all(|shard| shard.is_closed())); + + let shards = vec![Shard { + shard_id: 2, + leader_id: "test-ingester".to_string(), + shard_state: ShardState::Open as i32, + ..Default::default() + }]; + model.insert_newly_opened_shards(&index_uid, &source_id, shards, 3); + + // Test rate limited. + ingest_controller + .try_scale_down_shards(source_uid.clone(), shard_stats, &mut model, &progress) + .await; + assert!(model.all_shards_mut().any(|shard| shard.is_open())); + } + + #[test] + fn test_find_scale_down_candidate() { + let index_uid: IndexUid = "test-index:0".into(); + let source_id: SourceId = "test-source".into(); + + let source_uid = SourceUid { + index_uid: index_uid.clone(), + source_id: source_id.clone(), + }; + let mut model = ControlPlaneModel::default(); + + assert!(find_scale_down_candidate(&source_uid, &model).is_none()); + + let shards = vec![ + Shard { + shard_id: 1, + leader_id: "test-ingester-0".to_string(), + shard_state: ShardState::Open as i32, + ..Default::default() + }, + Shard { + shard_id: 2, + leader_id: "test-ingester-0".to_string(), + shard_state: ShardState::Open as i32, + ..Default::default() + }, + Shard { + shard_id: 3, + leader_id: "test-ingester-0".to_string(), + shard_state: ShardState::Closed as i32, + ..Default::default() + }, + Shard { + shard_id: 4, + leader_id: "test-ingester-1".to_string(), + shard_state: ShardState::Open as i32, + ..Default::default() + }, + Shard { + shard_id: 5, + leader_id: "test-ingester-1".to_string(), + shard_state: ShardState::Open as i32, + ..Default::default() + }, + Shard { + shard_id: 6, + leader_id: "test-ingester-1".to_string(), + shard_state: ShardState::Open as i32, + ..Default::default() + }, + ]; + model.insert_newly_opened_shards(&index_uid, &source_id, shards, 7); + + let shard_infos = BTreeSet::from_iter([ + ShardInfo { + shard_id: 1, + shard_state: ShardState::Open, + ingestion_rate: quickwit_ingest::RateMibPerSec(1), + }, + ShardInfo { + shard_id: 2, + shard_state: ShardState::Open, + ingestion_rate: quickwit_ingest::RateMibPerSec(2), + }, + ShardInfo { + shard_id: 3, + shard_state: ShardState::Open, + ingestion_rate: quickwit_ingest::RateMibPerSec(3), + }, + ShardInfo { + shard_id: 4, + shard_state: ShardState::Open, + ingestion_rate: quickwit_ingest::RateMibPerSec(4), + }, + ShardInfo { + shard_id: 5, + shard_state: ShardState::Open, + ingestion_rate: quickwit_ingest::RateMibPerSec(5), + }, + ShardInfo { + shard_id: 6, + shard_state: ShardState::Open, + ingestion_rate: quickwit_ingest::RateMibPerSec(6), + }, + ]); + model.update_shards(&source_uid, &shard_infos); - let shard_3 = model.shards().find(|shard| shard.shard_id == 3).unwrap(); - assert!(shard_3.shard_state().is_open()); + let (leader_id, shard_id) = find_scale_down_candidate(&source_uid, &model).unwrap(); + assert_eq!(leader_id, "test-ingester-0"); + assert_eq!(shard_id, 2); } } diff --git a/quickwit/quickwit-control-plane/src/lib.rs b/quickwit/quickwit-control-plane/src/lib.rs index cdb6487c352..9134e5f07fa 100644 --- a/quickwit/quickwit-control-plane/src/lib.rs +++ b/quickwit/quickwit-control-plane/src/lib.rs @@ -18,11 +18,11 @@ // along with this program. If not, see . pub mod control_plane; -pub(crate) mod control_plane_model; pub mod indexing_plan; pub mod indexing_scheduler; pub mod ingest; pub(crate) mod metrics; +pub(crate) mod model; use quickwit_common::tower::Pool; use quickwit_proto::indexing::{CpuCapacity, IndexingServiceClient, IndexingTask}; diff --git a/quickwit/quickwit-control-plane/src/metrics.rs b/quickwit/quickwit-control-plane/src/metrics.rs index 76049380632..0ef66458891 100644 --- a/quickwit/quickwit-control-plane/src/metrics.rs +++ b/quickwit/quickwit-control-plane/src/metrics.rs @@ -18,13 +18,14 @@ // along with this program. If not, see . use once_cell::sync::Lazy; -use quickwit_common::metrics::{new_counter, IntCounter}; +use quickwit_common::metrics::{new_counter, new_gauge_vec, IntCounter, IntGaugeVec}; pub struct ControlPlaneMetrics { pub restart_total: IntCounter, pub schedule_total: IntCounter, pub metastore_error_aborted: IntCounter, pub metastore_error_maybe_executed: IntCounter, + pub open_shards_total: IntGaugeVec<2>, } impl Default for ControlPlaneMetrics { @@ -52,6 +53,12 @@ impl Default for ControlPlaneMetrics { control plane restart)", "quickwit_control_plane", ), + open_shards_total: new_gauge_vec( + "open_shards_total", + "Number of open shards per source.", + "quickwit_control_plane", + ["index_id", "source_id"], + ), } } } diff --git a/quickwit/quickwit-control-plane/src/model/mod.rs b/quickwit/quickwit-control-plane/src/model/mod.rs new file mode 100644 index 00000000000..c0b7ee87f3d --- /dev/null +++ b/quickwit/quickwit-control-plane/src/model/mod.rs @@ -0,0 +1,474 @@ +// Copyright (C) 2023 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +mod shard_table; + +use std::time::Instant; + +use anyhow::bail; +use fnv::{FnvHashMap, FnvHashSet}; +use quickwit_common::Progress; +use quickwit_config::SourceConfig; +use quickwit_ingest::ShardInfos; +use quickwit_metastore::{IndexMetadata, ListIndexesMetadataResponseExt}; +use quickwit_proto::control_plane::ControlPlaneResult; +use quickwit_proto::ingest::{Shard, ShardState}; +use quickwit_proto::metastore::{ + self, EntityKind, ListIndexesMetadataRequest, ListShardsSubrequest, MetastoreError, + MetastoreService, MetastoreServiceClient, SourceType, +}; +use quickwit_proto::types::{IndexId, IndexUid, NodeId, ShardId, SourceId, SourceUid}; +use serde::Serialize; +pub(super) use shard_table::{ + NextShardId, ScalingMode, ShardEntry, ShardStats, ShardTable, ShardTableEntry, +}; +use tracing::{info, warn}; + +/// The control plane maintains a model in sync with the metastore. +/// +/// The model stays consistent with the metastore, because all +/// of the mutations go through the control plane. +/// +/// If a mutation yields an error, the control plane is killed +/// and restarted. +/// +/// Upon starts, it loads its entire state from the metastore. +#[derive(Default, Debug)] +pub(crate) struct ControlPlaneModel { + index_uid_table: FnvHashMap, + index_table: FnvHashMap, + shard_table: ShardTable, +} + +#[derive(Clone, Copy, Debug, Default, Serialize)] +pub struct ControlPlaneModelMetrics { + pub num_shards: usize, +} + +impl ControlPlaneModel { + /// Clears the entire state of the model. + pub fn clear(&mut self) { + *self = Default::default(); + } + + pub fn observable_state(&self) -> ControlPlaneModelMetrics { + ControlPlaneModelMetrics { + num_shards: self.shard_table.table_entries.len(), + } + } + + pub async fn load_from_metastore( + &mut self, + metastore: &mut MetastoreServiceClient, + progress: &Progress, + ) -> ControlPlaneResult<()> { + let now = Instant::now(); + self.clear(); + + let index_metadatas = progress + .protect_future(metastore.list_indexes_metadata(ListIndexesMetadataRequest::all())) + .await? + .deserialize_indexes_metadata()?; + + let num_indexes = index_metadatas.len(); + self.index_table.reserve(num_indexes); + + let mut num_sources = 0; + let mut num_shards = 0; + + let mut subrequests = Vec::with_capacity(index_metadatas.len()); + + for index_metadata in index_metadatas { + self.add_index(index_metadata); + } + + for index_metadata in self.index_table.values() { + for source_config in index_metadata.sources.values() { + num_sources += 1; + + if source_config.source_type() != SourceType::IngestV2 || !source_config.enabled { + continue; + } + let request = ListShardsSubrequest { + index_uid: index_metadata.index_uid.clone().into(), + source_id: source_config.source_id.clone(), + shard_state: Some(ShardState::Open as i32), + }; + subrequests.push(request); + } + } + if !subrequests.is_empty() { + let list_shards_request = metastore::ListShardsRequest { subrequests }; + let list_shard_response = progress + .protect_future(metastore.list_shards(list_shards_request)) + .await?; + + self.shard_table + .table_entries + .reserve(list_shard_response.subresponses.len()); + + for list_shards_subresponse in list_shard_response.subresponses { + num_shards += list_shards_subresponse.shards.len(); + + let source_uid = SourceUid { + index_uid: list_shards_subresponse.index_uid.into(), + source_id: list_shards_subresponse.source_id, + }; + let table_entry = ShardTableEntry::from_shards( + list_shards_subresponse.shards, + list_shards_subresponse.next_shard_id, + ); + self.shard_table + .table_entries + .insert(source_uid, table_entry); + } + } + info!( + "synced internal state with metastore in {} seconds ({} indexes, {} sources, {} \ + shards)", + now.elapsed().as_secs(), + num_indexes, + num_sources, + num_shards, + ); + Ok(()) + } + + pub fn index_uid(&self, index_id: &str) -> Option { + self.index_uid_table.get(index_id).cloned() + } + + pub(crate) fn get_source_configs( + &self, + ) -> impl Iterator + '_ { + self.index_table.values().flat_map(|index_metadata| { + index_metadata + .sources + .iter() + .map(move |(source_id, source_config)| { + ( + SourceUid { + index_uid: index_metadata.index_uid.clone(), + source_id: source_id.clone(), + }, + source_config, + ) + }) + }) + } + + pub(crate) fn add_index(&mut self, index_metadata: IndexMetadata) { + let index_uid = index_metadata.index_uid.clone(); + self.index_uid_table + .insert(index_metadata.index_id().to_string(), index_uid.clone()); + self.index_table.insert(index_uid, index_metadata); + } + + pub(crate) fn delete_index(&mut self, index_uid: &IndexUid) { + // TODO: We need to let the routers and ingesters know. + self.index_table.remove(index_uid); + self.shard_table.delete_index(index_uid.index_id()); + } + + /// Adds a source to a given index. Returns an error if a source with the same source_id already + /// exists. + pub(crate) fn add_source( + &mut self, + index_uid: &IndexUid, + source_config: SourceConfig, + ) -> ControlPlaneResult<()> { + let index_metadata = self.index_table.get_mut(index_uid).ok_or_else(|| { + MetastoreError::NotFound(EntityKind::Index { + index_id: index_uid.to_string(), + }) + })?; + let source_id = source_config.source_id.clone(); + index_metadata.add_source(source_config)?; + self.shard_table.add_source(index_uid, &source_id); + Ok(()) + } + + pub(crate) fn delete_source(&mut self, index_uid: &IndexUid, source_id: &SourceId) { + // Removing shards from shard table. + self.shard_table.delete_source(index_uid, source_id); + // Remove source from index config. + let Some(index_model) = self.index_table.get_mut(index_uid) else { + warn!(index_uid=%index_uid, source_id=%source_id, "delete source: index not found"); + return; + }; + if index_model.sources.remove(source_id).is_none() { + warn!(index_uid=%index_uid, source_id=%source_id, "delete source: source not found"); + }; + } + + /// Returns `true` if the source status has changed, `false` otherwise. + /// Returns an error if the source could not be found. + pub(crate) fn toggle_source( + &mut self, + index_uid: &IndexUid, + source_id: &SourceId, + enable: bool, + ) -> anyhow::Result { + let Some(index_model) = self.index_table.get_mut(index_uid) else { + bail!("index `{}` not found", index_uid.index_id()); + }; + let Some(source_config) = index_model.sources.get_mut(source_id) else { + bail!("source `{source_id}` not found"); + }; + let has_changed = source_config.enabled != enable; + source_config.enabled = enable; + Ok(has_changed) + } + + pub fn all_shards_mut(&mut self) -> impl Iterator + '_ { + self.shard_table.all_shards_mut() + } + + /// Lists the shards of a given source. Returns `None` if the source does not exist. + pub fn list_shards(&self, source_uid: &SourceUid) -> Option> { + self.shard_table.list_shards(source_uid) + } + + pub fn next_shard_id(&self, source_uid: &SourceUid) -> Option { + self.shard_table.next_shard_id(source_uid) + } + + /// Inserts the shards that have just been opened by calling `open_shards` on the metastore. + pub fn insert_newly_opened_shards( + &mut self, + index_uid: &IndexUid, + source_id: &SourceId, + shards: Vec, + next_shard_id: NextShardId, + ) { + self.shard_table + .insert_newly_opened_shards(index_uid, source_id, shards, next_shard_id); + } + + /// Finds open shards for a given index and source and whose leaders are not in the set of + /// unavailable ingesters. + pub fn find_open_shards( + &self, + index_uid: &IndexUid, + source_id: &SourceId, + unavailable_leaders: &FnvHashSet, + ) -> Option<(Vec, NextShardId)> { + self.shard_table + .find_open_shards(index_uid, source_id, unavailable_leaders) + } + + /// Updates the state and ingestion rate of the shards according to the given shard infos. + pub fn update_shards( + &mut self, + source_uid: &SourceUid, + shard_infos: &ShardInfos, + ) -> ShardStats { + self.shard_table.update_shards(source_uid, shard_infos) + } + + /// Sets the state of the shards identified by their index UID, source ID, and shard IDs to + /// `Closed`. + pub fn close_shards(&mut self, source_uid: &SourceUid, shard_ids: &[ShardId]) -> Vec { + self.shard_table.close_shards(source_uid, shard_ids) + } + + /// Removes the shards identified by their index UID, source ID, and shard IDs. + pub fn delete_shards(&mut self, source_uid: &SourceUid, shard_ids: &[ShardId]) { + self.shard_table.delete_shards(source_uid, shard_ids); + } + + pub fn acquire_scaling_permits( + &mut self, + source_uid: &SourceUid, + scaling_mode: ScalingMode, + num_permits: u64, + ) -> Option { + self.shard_table + .acquire_scaling_permits(source_uid, scaling_mode, num_permits) + } + + pub fn release_scaling_permits( + &mut self, + source_uid: &SourceUid, + scaling_mode: ScalingMode, + num_permits: u64, + ) { + self.shard_table + .release_scaling_permits(source_uid, scaling_mode, num_permits) + } +} + +#[cfg(test)] +mod tests { + use quickwit_config::{SourceConfig, SourceParams, INGEST_SOURCE_ID}; + use quickwit_metastore::IndexMetadata; + use quickwit_proto::ingest::Shard; + use quickwit_proto::metastore::ListIndexesMetadataResponse; + + use super::*; + + #[tokio::test] + async fn test_control_plane_model_load_shard_table() { + let progress = Progress::default(); + + let mut mock_metastore = MetastoreServiceClient::mock(); + mock_metastore + .expect_list_indexes_metadata() + .returning(|request| { + assert_eq!(request, ListIndexesMetadataRequest::all()); + + let mut index_0 = IndexMetadata::for_test("test-index-0", "ram:///test-index-0"); + let mut source_config = SourceConfig::ingest_v2_default(); + source_config.enabled = true; + index_0.add_source(source_config.clone()).unwrap(); + + let mut index_1 = IndexMetadata::for_test("test-index-1", "ram:///test-index-1"); + index_1.add_source(source_config.clone()).unwrap(); + + let mut index_2 = IndexMetadata::for_test("test-index-2", "ram:///test-index-2"); + source_config.enabled = false; + index_2.add_source(source_config.clone()).unwrap(); + + let indexes = vec![index_0, index_1, index_2]; + Ok(ListIndexesMetadataResponse::try_from_indexes_metadata(indexes).unwrap()) + }); + mock_metastore.expect_list_shards().returning(|request| { + assert_eq!(request.subrequests.len(), 2); + + assert_eq!(request.subrequests[0].index_uid, "test-index-0:0"); + assert_eq!(request.subrequests[0].source_id, INGEST_SOURCE_ID); + assert_eq!(request.subrequests[0].shard_state(), ShardState::Open); + + assert_eq!(request.subrequests[1].index_uid, "test-index-1:0"); + assert_eq!(request.subrequests[1].source_id, INGEST_SOURCE_ID); + assert_eq!(request.subrequests[1].shard_state(), ShardState::Open); + + let subresponses = vec![ + metastore::ListShardsSubresponse { + index_uid: "test-index-0:0".to_string(), + source_id: INGEST_SOURCE_ID.to_string(), + shards: vec![Shard { + shard_id: 42, + ..Default::default() + }], + next_shard_id: 43, + }, + metastore::ListShardsSubresponse { + index_uid: "test-index-1:0".to_string(), + source_id: INGEST_SOURCE_ID.to_string(), + shards: Vec::new(), + next_shard_id: 1, + }, + ]; + let response = metastore::ListShardsResponse { subresponses }; + Ok(response) + }); + let mut model = ControlPlaneModel::default(); + let mut metastore_client = MetastoreServiceClient::from(mock_metastore); + model + .load_from_metastore(&mut metastore_client, &progress) + .await + .unwrap(); + + assert_eq!(model.index_table.len(), 3); + assert_eq!( + model.index_uid("test-index-0").unwrap().as_str(), + "test-index-0:0" + ); + assert_eq!( + model.index_uid("test-index-1").unwrap().as_str(), + "test-index-1:0" + ); + assert_eq!( + model.index_uid("test-index-2").unwrap().as_str(), + "test-index-2:0" + ); + + assert_eq!(model.shard_table.table_entries.len(), 2); + + let source_uid_0 = SourceUid { + index_uid: "test-index-0:0".into(), + source_id: INGEST_SOURCE_ID.to_string(), + }; + let table_entry = model.shard_table.table_entries.get(&source_uid_0).unwrap(); + let shards = table_entry.shards(); + assert_eq!(shards.len(), 1); + assert_eq!(shards[0].shard_id, 42); + + let next_shard_id = model.next_shard_id(&source_uid_0).unwrap(); + assert_eq!(next_shard_id, 43); + + let source_uid_1 = SourceUid { + index_uid: "test-index-1:0".into(), + source_id: INGEST_SOURCE_ID.to_string(), + }; + let table_entry = model.shard_table.table_entries.get(&source_uid_1).unwrap(); + let shards = table_entry.shards(); + assert_eq!(shards.len(), 0); + + let next_shard_id = model.next_shard_id(&source_uid_1).unwrap(); + assert_eq!(next_shard_id, 1); + } + + #[test] + fn test_control_plane_model_toggle_source() { + let mut model = ControlPlaneModel::default(); + let index_metadata = IndexMetadata::for_test("test-index", "ram://"); + let index_uid = index_metadata.index_uid.clone(); + model.add_index(index_metadata); + let source_config = SourceConfig::for_test("test-source", SourceParams::void()); + model.add_source(&index_uid, source_config).unwrap(); + { + let has_changed = model + .toggle_source(&index_uid, &"test-source".to_string(), true) + .unwrap(); + assert!(!has_changed); + } + { + let has_changed = model + .toggle_source(&index_uid, &"test-source".to_string(), true) + .unwrap(); + assert!(!has_changed); + } + { + let has_changed = model + .toggle_source(&index_uid, &"test-source".to_string(), false) + .unwrap(); + assert!(has_changed); + } + { + let has_changed = model + .toggle_source(&index_uid, &"test-source".to_string(), false) + .unwrap(); + assert!(!has_changed); + } + { + let has_changed = model + .toggle_source(&index_uid, &"test-source".to_string(), true) + .unwrap(); + assert!(has_changed); + } + { + let has_changed = model + .toggle_source(&index_uid, &"test-source".to_string(), true) + .unwrap(); + assert!(!has_changed); + } + } +} diff --git a/quickwit/quickwit-control-plane/src/model/shard_table.rs b/quickwit/quickwit-control-plane/src/model/shard_table.rs new file mode 100644 index 00000000000..331a06c7978 --- /dev/null +++ b/quickwit/quickwit-control-plane/src/model/shard_table.rs @@ -0,0 +1,972 @@ +// Copyright (C) 2023 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::collections::hash_map::Entry; +use std::ops::{Deref, DerefMut}; +use std::time::Duration; + +use fnv::{FnvHashMap, FnvHashSet}; +use quickwit_common::rate_limiter::{RateLimiter, RateLimiterSettings}; +use quickwit_common::tower::ConstantRate; +use quickwit_ingest::{RateMibPerSec, ShardInfo, ShardInfos}; +use quickwit_proto::ingest::{Shard, ShardState}; +use quickwit_proto::types::{IndexUid, NodeId, ShardId, SourceId, SourceUid}; +use tracing::{error, warn}; + +/// Limits the number of shards that can be opened for scaling up a source to 5 per minute. +const SCALING_UP_RATE_LIMITER_SETTINGS: RateLimiterSettings = RateLimiterSettings { + burst_limit: 5, + rate_limit: ConstantRate::new(5, Duration::from_secs(60)), + refill_period: Duration::from_secs(12), +}; + +/// Limits the number of shards that can be closed for scaling down a source to 1 per minute. +const SCALING_DOWN_RATE_LIMITER_SETTINGS: RateLimiterSettings = RateLimiterSettings { + burst_limit: 1, + rate_limit: ConstantRate::new(1, Duration::from_secs(60)), + refill_period: Duration::from_secs(60), +}; + +#[derive(Debug, Clone, Copy)] +pub(crate) enum ScalingMode { + Up, + Down, +} + +pub(crate) type NextShardId = ShardId; + +#[derive(Debug, Clone)] +pub(crate) struct ShardEntry { + pub shard: Shard, + pub ingestion_rate: RateMibPerSec, +} + +impl Deref for ShardEntry { + type Target = Shard; + + fn deref(&self) -> &Self::Target { + &self.shard + } +} + +impl DerefMut for ShardEntry { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.shard + } +} + +impl From for ShardEntry { + fn from(shard: Shard) -> Self { + Self { + shard, + ingestion_rate: RateMibPerSec::default(), + } + } +} + +#[derive(Debug)] +pub(crate) struct ShardTableEntry { + shard_entries: FnvHashMap, + next_shard_id: NextShardId, + scaling_up_rate_limiter: RateLimiter, + scaling_down_rate_limiter: RateLimiter, +} + +impl Default for ShardTableEntry { + fn default() -> Self { + Self { + shard_entries: Default::default(), + next_shard_id: Self::DEFAULT_NEXT_SHARD_ID, + scaling_up_rate_limiter: RateLimiter::from_settings(SCALING_UP_RATE_LIMITER_SETTINGS), + scaling_down_rate_limiter: RateLimiter::from_settings( + SCALING_DOWN_RATE_LIMITER_SETTINGS, + ), + } + } +} + +impl ShardTableEntry { + const DEFAULT_NEXT_SHARD_ID: NextShardId = 1; // `1` matches the PostgreSQL sequence min value. + + pub fn from_shards(shards: Vec, next_shard_id: NextShardId) -> Self { + let shard_entries = shards + .into_iter() + .map(|shard| (shard.shard_id, shard.into())) + .collect(); + Self { + shard_entries, + next_shard_id, + ..Default::default() + } + } + + fn is_empty(&self) -> bool { + self.shard_entries.is_empty() + } + + fn is_default(&self) -> bool { + self.is_empty() && self.next_shard_id == Self::DEFAULT_NEXT_SHARD_ID + } +} + +// A table that keeps track of the existing shards for each index and source. +#[derive(Debug, Default)] +pub(crate) struct ShardTable { + pub table_entries: FnvHashMap, +} + +impl ShardTable { + /// Removes all the entries that match the target index ID. + pub fn delete_index(&mut self, index_id: &str) { + self.table_entries + .retain(|source_uid, _| source_uid.index_uid.index_id() != index_id); + } + + /// Adds a new empty entry for the given index and source. + /// + /// TODO check and document the behavior on error (if the source was already here). + pub fn add_source(&mut self, index_uid: &IndexUid, source_id: &SourceId) { + let source_uid = SourceUid { + index_uid: index_uid.clone(), + source_id: source_id.clone(), + }; + let table_entry = ShardTableEntry::default(); + let previous_table_entry_opt = self.table_entries.insert(source_uid, table_entry); + if let Some(previous_table_entry) = previous_table_entry_opt { + if !previous_table_entry.is_default() { + error!( + "shard table entry for index `{}` and source `{}` already exists", + index_uid.index_id(), + source_id + ); + } + } + } + + pub fn delete_source(&mut self, index_uid: &IndexUid, source_id: &SourceId) { + let source_uid = SourceUid { + index_uid: index_uid.clone(), + source_id: source_id.clone(), + }; + self.table_entries.remove(&source_uid); + } + + pub fn all_shards_mut(&mut self) -> impl Iterator + '_ { + self.table_entries + .values_mut() + .flat_map(|table_entry| table_entry.shard_entries.values_mut()) + } + + /// Lists the shards of a given source. Returns `None` if the source does not exist. + pub fn list_shards(&self, source_uid: &SourceUid) -> Option> { + self.table_entries + .get(source_uid) + .map(|table_entry| table_entry.shard_entries.values()) + } + + pub fn next_shard_id(&self, source_uid: &SourceUid) -> Option { + self.table_entries + .get(source_uid) + .map(|table_entry| table_entry.next_shard_id) + } + + /// Updates the shard table. + pub fn insert_newly_opened_shards( + &mut self, + index_uid: &IndexUid, + source_id: &SourceId, + opened_shards: Vec, + next_shard_id: NextShardId, + ) { + let source_uid = SourceUid { + index_uid: index_uid.clone(), + source_id: source_id.clone(), + }; + match self.table_entries.entry(source_uid) { + Entry::Occupied(mut entry) => { + let table_entry = entry.get_mut(); + + for opened_shard in opened_shards { + // We only insert shards that we don't know about because the control plane + // knows more about the state of the shards than the metastore. + table_entry + .shard_entries + .entry(opened_shard.shard_id) + .or_insert(opened_shard.into()); + } + table_entry.next_shard_id = next_shard_id; + } + // This should never happen if the control plane view is consistent with the state of + // the metastore, so should we panic here? Warnings are most likely going to go + // unnoticed. + Entry::Vacant(entry) => { + let shard_entries: FnvHashMap = opened_shards + .into_iter() + .map(|shard| (shard.shard_id, shard.into())) + .collect(); + let table_entry = ShardTableEntry { + shard_entries, + next_shard_id, + ..Default::default() + }; + entry.insert(table_entry); + } + } + } + + /// Finds open shards for a given index and source and whose leaders are not in the set of + /// unavailable ingesters. + pub fn find_open_shards( + &self, + index_uid: &IndexUid, + source_id: &SourceId, + unavailable_leaders: &FnvHashSet, + ) -> Option<(Vec, NextShardId)> { + let source_uid = SourceUid { + index_uid: index_uid.clone(), + source_id: source_id.clone(), + }; + let table_entry = self.table_entries.get(&source_uid)?; + let open_shards: Vec = table_entry + .shard_entries + .values() + .filter(|shard_entry| { + shard_entry.shard.is_open() && !unavailable_leaders.contains(&shard_entry.leader_id) + }) + .cloned() + .collect(); + + Some((open_shards, table_entry.next_shard_id)) + } + + pub fn update_shards( + &mut self, + source_uid: &SourceUid, + shard_infos: &ShardInfos, + ) -> ShardStats { + let mut num_open_shards = 0; + let mut ingestion_rate_sum = RateMibPerSec::default(); + + if let Some(table_entry) = self.table_entries.get_mut(source_uid) { + for shard_info in shard_infos { + let ShardInfo { + shard_id, + shard_state, + ingestion_rate, + } = shard_info; + + if let Some(shard_entry) = table_entry.shard_entries.get_mut(shard_id) { + shard_entry.ingestion_rate = *ingestion_rate; + // `ShardInfos` are broadcasted via Chitchat and eventually consistent. As a + // result, we can only trust the `Closed` state, which is final. + if shard_state.is_closed() { + shard_entry.set_shard_state(ShardState::Closed); + } + } + } + for shard_entry in table_entry.shard_entries.values() { + if shard_entry.is_open() { + num_open_shards += 1; + ingestion_rate_sum += shard_entry.ingestion_rate; + } + } + } + let avg_ingestion_rate = if num_open_shards > 0 { + ingestion_rate_sum.0 as f32 / num_open_shards as f32 + } else { + 0.0 + }; + ShardStats { + num_open_shards, + avg_ingestion_rate, + } + } + + /// Sets the state of the shards identified by their index UID, source ID, and shard IDs to + /// `Closed`. + pub fn close_shards(&mut self, source_uid: &SourceUid, shard_ids: &[ShardId]) -> Vec { + let mut closed_shard_ids = Vec::new(); + + if let Some(table_entry) = self.table_entries.get_mut(source_uid) { + for shard_id in shard_ids { + if let Some(shard_entry) = table_entry.shard_entries.get_mut(shard_id) { + if !shard_entry.is_closed() { + shard_entry.set_shard_state(ShardState::Closed); + closed_shard_ids.push(*shard_id); + } + } + } + } + closed_shard_ids + } + + /// Removes the shards identified by their index UID, source ID, and shard IDs. + pub fn delete_shards(&mut self, source_uid: &SourceUid, shard_ids: &[ShardId]) { + if let Some(table_entry) = self.table_entries.get_mut(source_uid) { + for shard_id in shard_ids { + if table_entry.shard_entries.remove(shard_id).is_none() { + warn!(shard = *shard_id, "deleting a non-existing shard"); + } + } + } + } + + pub fn acquire_scaling_permits( + &mut self, + source_uid: &SourceUid, + scaling_mode: ScalingMode, + num_permits: u64, + ) -> Option { + let table_entry = self.table_entries.get_mut(source_uid)?; + let scaling_rate_limiter = match scaling_mode { + ScalingMode::Up => &mut table_entry.scaling_up_rate_limiter, + ScalingMode::Down => &mut table_entry.scaling_down_rate_limiter, + }; + Some(scaling_rate_limiter.acquire(num_permits)) + } + + pub fn release_scaling_permits( + &mut self, + source_uid: &SourceUid, + scaling_mode: ScalingMode, + num_permits: u64, + ) { + if let Some(table_entry) = self.table_entries.get_mut(source_uid) { + let scaling_rate_limiter = match scaling_mode { + ScalingMode::Up => &mut table_entry.scaling_up_rate_limiter, + ScalingMode::Down => &mut table_entry.scaling_down_rate_limiter, + }; + scaling_rate_limiter.release(num_permits); + } + } +} + +#[derive(Clone, Copy, Default)] +pub(crate) struct ShardStats { + pub num_open_shards: usize, + pub avg_ingestion_rate: f32, +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeSet; + + use itertools::Itertools; + use quickwit_proto::ingest::Shard; + + use super::*; + + impl ShardTableEntry { + pub fn shards(&self) -> Vec { + self.shard_entries + .values() + .map(|shard_entry| shard_entry.shard.clone()) + .sorted_unstable_by_key(|shard| shard.shard_id) + .collect() + } + } + + impl ShardTable { + pub fn find_open_shards_sorted( + &self, + index_uid: &IndexUid, + source_id: &SourceId, + unavailable_leaders: &FnvHashSet, + ) -> Option<(Vec, NextShardId)> { + self.find_open_shards(index_uid, source_id, unavailable_leaders) + .map(|(mut shards, next_shard_id)| { + shards.sort_by_key(|shard_entry| shard_entry.shard.shard_id); + (shards, next_shard_id) + }) + } + } + + #[test] + fn test_shard_table_delete_index() { + let mut shard_table = ShardTable::default(); + shard_table.delete_index("test-index"); + + let index_uid_0: IndexUid = "test-index-foo:0".into(); + let source_id_0 = "test-source-0".to_string(); + shard_table.add_source(&index_uid_0, &source_id_0); + + let source_id_1 = "test-source-1".to_string(); + shard_table.add_source(&index_uid_0, &source_id_1); + + let index_uid_1: IndexUid = "test-index-bar:1".into(); + shard_table.add_source(&index_uid_1, &source_id_0); + + shard_table.delete_index("test-index-foo"); + assert_eq!(shard_table.table_entries.len(), 1); + + assert!(shard_table.table_entries.contains_key(&SourceUid { + index_uid: index_uid_1, + source_id: source_id_0 + })); + } + + #[test] + fn test_shard_table_add_source() { + let index_uid: IndexUid = "test-index:0".into(); + let source_id = "test-source".to_string(); + + let mut shard_table = ShardTable::default(); + shard_table.add_source(&index_uid, &source_id); + assert_eq!(shard_table.table_entries.len(), 1); + + let source_uid = SourceUid { + index_uid, + source_id, + }; + let table_entry = shard_table.table_entries.get(&source_uid).unwrap(); + assert!(table_entry.shard_entries.is_empty()); + assert_eq!(table_entry.next_shard_id, 1); + } + + #[test] + fn test_shard_table_list_shards() { + let index_uid: IndexUid = "test-index:0".into(); + let source_id = "test-source".to_string(); + let source_uid = SourceUid { + index_uid: index_uid.clone(), + source_id: source_id.clone(), + }; + let mut shard_table = ShardTable::default(); + + assert!(shard_table.list_shards(&source_uid).is_none()); + + shard_table.add_source(&index_uid, &source_id); + let shards = shard_table.list_shards(&source_uid).unwrap(); + assert_eq!(shards.count(), 0); + + let shard_01 = Shard { + index_uid: index_uid.clone().into(), + source_id: source_id.clone(), + shard_id: 1, + leader_id: "test-leader-0".to_string(), + shard_state: ShardState::Closed as i32, + ..Default::default() + }; + shard_table.insert_newly_opened_shards(&index_uid, &source_id, vec![shard_01], 2); + + let shards = shard_table.list_shards(&source_uid).unwrap(); + assert_eq!(shards.count(), 1); + } + + #[test] + fn test_shard_table_insert_newly_opened_shards() { + let index_uid_0: IndexUid = "test-index:0".into(); + let source_id = "test-source".to_string(); + + let mut shard_table = ShardTable::default(); + + let shard_01 = Shard { + index_uid: index_uid_0.clone().into(), + source_id: source_id.clone(), + shard_id: 1, + leader_id: "test-leader-0".to_string(), + shard_state: ShardState::Open as i32, + ..Default::default() + }; + shard_table.insert_newly_opened_shards(&index_uid_0, &source_id, vec![shard_01.clone()], 2); + + assert_eq!(shard_table.table_entries.len(), 1); + + let source_uid = SourceUid { + index_uid: index_uid_0.clone(), + source_id: source_id.clone(), + }; + let table_entry = shard_table.table_entries.get(&source_uid).unwrap(); + let shards = table_entry.shards(); + assert_eq!(shards.len(), 1); + assert_eq!(shards[0], shard_01); + assert_eq!(table_entry.next_shard_id, 2); + + shard_table + .table_entries + .get_mut(&source_uid) + .unwrap() + .shard_entries + .get_mut(&1) + .unwrap() + .set_shard_state(ShardState::Unavailable); + + let shard_02 = Shard { + index_uid: index_uid_0.clone().into(), + source_id: source_id.clone(), + shard_id: 2, + leader_id: "test-leader-0".to_string(), + shard_state: ShardState::Open as i32, + ..Default::default() + }; + + shard_table.insert_newly_opened_shards( + &index_uid_0, + &source_id, + vec![shard_01.clone(), shard_02.clone()], + 3, + ); + + assert_eq!(shard_table.table_entries.len(), 1); + + let source_uid = SourceUid { + index_uid: index_uid_0.clone(), + source_id: source_id.clone(), + }; + let table_entry = shard_table.table_entries.get(&source_uid).unwrap(); + let shards = table_entry.shards(); + assert_eq!(shards.len(), 2); + assert_eq!(shards[0].shard_state(), ShardState::Unavailable); + assert_eq!(shards[1], shard_02); + assert_eq!(table_entry.next_shard_id, 3); + } + + #[test] + fn test_shard_table_find_open_shards() { + let index_uid: IndexUid = "test-index:0".into(); + let source_id = "test-source".to_string(); + + let mut shard_table = ShardTable::default(); + shard_table.add_source(&index_uid, &source_id); + + let mut unavailable_ingesters = FnvHashSet::default(); + + let (open_shards, next_shard_id) = shard_table + .find_open_shards_sorted(&index_uid, &source_id, &unavailable_ingesters) + .unwrap(); + assert_eq!(open_shards.len(), 0); + assert_eq!(next_shard_id, 1); + + let shard_01 = Shard { + index_uid: index_uid.clone().into(), + source_id: source_id.clone(), + shard_id: 1, + leader_id: "test-leader-0".to_string(), + shard_state: ShardState::Closed as i32, + ..Default::default() + }; + let shard_02 = Shard { + index_uid: index_uid.clone().into(), + source_id: source_id.clone(), + shard_id: 2, + leader_id: "test-leader-0".to_string(), + shard_state: ShardState::Unavailable as i32, + ..Default::default() + }; + let shard_03 = Shard { + index_uid: index_uid.clone().into(), + source_id: source_id.clone(), + shard_id: 3, + leader_id: "test-leader-0".to_string(), + shard_state: ShardState::Open as i32, + ..Default::default() + }; + let shard_04 = Shard { + index_uid: index_uid.clone().into(), + source_id: source_id.clone(), + shard_id: 4, + leader_id: "test-leader-1".to_string(), + shard_state: ShardState::Open as i32, + ..Default::default() + }; + shard_table.insert_newly_opened_shards( + &index_uid, + &source_id, + vec![shard_01, shard_02, shard_03.clone(), shard_04.clone()], + 5, + ); + let (open_shards, next_shard_id) = shard_table + .find_open_shards_sorted(&index_uid, &source_id, &unavailable_ingesters) + .unwrap(); + assert_eq!(open_shards.len(), 2); + assert_eq!(open_shards[0].shard, shard_03); + assert_eq!(open_shards[1].shard, shard_04); + assert_eq!(next_shard_id, 5); + + unavailable_ingesters.insert("test-leader-0".into()); + + let (open_shards, next_shard_id) = shard_table + .find_open_shards_sorted(&index_uid, &source_id, &unavailable_ingesters) + .unwrap(); + assert_eq!(open_shards.len(), 1); + assert_eq!(open_shards[0].shard, shard_04); + assert_eq!(next_shard_id, 5); + } + + #[test] + fn test_shard_table_update_shards() { + let index_uid: IndexUid = "test-index:0".into(); + let source_id = "test-source".to_string(); + + let mut shard_table = ShardTable::default(); + + let shard_01 = Shard { + shard_id: 1, + shard_state: ShardState::Open as i32, + ..Default::default() + }; + let shard_02 = Shard { + shard_id: 2, + shard_state: ShardState::Open as i32, + ..Default::default() + }; + let shard_03 = Shard { + shard_id: 3, + shard_state: ShardState::Unavailable as i32, + ..Default::default() + }; + let shard_04 = Shard { + shard_id: 4, + shard_state: ShardState::Open as i32, + ..Default::default() + }; + shard_table.insert_newly_opened_shards( + &index_uid, + &source_id, + vec![shard_01, shard_02, shard_03, shard_04], + 5, + ); + let source_uid = SourceUid { + index_uid, + source_id, + }; + let shard_infos = BTreeSet::from_iter([ + ShardInfo { + shard_id: 1, + shard_state: ShardState::Open, + ingestion_rate: RateMibPerSec(1), + }, + ShardInfo { + shard_id: 2, + shard_state: ShardState::Open, + ingestion_rate: RateMibPerSec(2), + }, + ShardInfo { + shard_id: 3, + shard_state: ShardState::Open, + ingestion_rate: RateMibPerSec(3), + }, + ShardInfo { + shard_id: 4, + shard_state: ShardState::Closed, + ingestion_rate: RateMibPerSec(4), + }, + ShardInfo { + shard_id: 5, + shard_state: ShardState::Open, + ingestion_rate: RateMibPerSec(5), + }, + ]); + let shard_stats = shard_table.update_shards(&source_uid, &shard_infos); + assert_eq!(shard_stats.num_open_shards, 2); + assert_eq!(shard_stats.avg_ingestion_rate, 1.5); + + let shard_entries: Vec = shard_table + .list_shards(&source_uid) + .unwrap() + .cloned() + .sorted_by_key(|shard_entry| shard_entry.shard.shard_id) + .collect(); + assert_eq!(shard_entries.len(), 4); + + assert_eq!(shard_entries[0].shard.shard_id, 1); + assert_eq!(shard_entries[0].shard.shard_state(), ShardState::Open); + assert_eq!(shard_entries[0].ingestion_rate, RateMibPerSec(1)); + + assert_eq!(shard_entries[1].shard.shard_id, 2); + assert_eq!(shard_entries[1].shard.shard_state(), ShardState::Open); + assert_eq!(shard_entries[1].ingestion_rate, RateMibPerSec(2)); + + assert_eq!(shard_entries[2].shard.shard_id, 3); + assert_eq!( + shard_entries[2].shard.shard_state(), + ShardState::Unavailable + ); + assert_eq!(shard_entries[2].ingestion_rate, RateMibPerSec(3)); + + assert_eq!(shard_entries[3].shard.shard_id, 4); + assert_eq!(shard_entries[3].shard.shard_state(), ShardState::Closed); + assert_eq!(shard_entries[3].ingestion_rate, RateMibPerSec(4)); + } + + #[test] + fn test_shard_table_close_shards() { + let index_uid_0: IndexUid = "test-index:0".into(); + let index_uid_1: IndexUid = "test-index:1".into(); + let source_id = "test-source".to_string(); + + let mut shard_table = ShardTable::default(); + + let shard_01 = Shard { + index_uid: index_uid_0.clone().into(), + source_id: source_id.clone(), + shard_id: 1, + leader_id: "test-leader-0".to_string(), + shard_state: ShardState::Open as i32, + ..Default::default() + }; + let shard_02 = Shard { + index_uid: index_uid_0.clone().into(), + source_id: source_id.clone(), + shard_id: 2, + leader_id: "test-leader-0".to_string(), + shard_state: ShardState::Closed as i32, + ..Default::default() + }; + let shard_11 = Shard { + index_uid: index_uid_1.clone().into(), + source_id: source_id.clone(), + shard_id: 1, + leader_id: "test-leader-0".to_string(), + shard_state: ShardState::Open as i32, + ..Default::default() + }; + shard_table.insert_newly_opened_shards( + &index_uid_0, + &source_id, + vec![shard_01, shard_02], + 3, + ); + shard_table.insert_newly_opened_shards(&index_uid_0, &source_id, vec![shard_11], 2); + + let source_uid_0 = SourceUid { + index_uid: index_uid_0, + source_id, + }; + let closed_shard_ids = shard_table.close_shards(&source_uid_0, &[1, 2, 3]); + assert_eq!(closed_shard_ids, &[1]); + + let table_entry = shard_table.table_entries.get(&source_uid_0).unwrap(); + let shards = table_entry.shards(); + assert_eq!(shards[0].shard_state(), ShardState::Closed); + } + + #[test] + fn test_shard_table_delete_shards() { + let mut shard_table = ShardTable::default(); + + let index_uid_0: IndexUid = "test-index:0".into(); + let index_uid_1: IndexUid = "test-index:1".into(); + let source_id = "test-source".to_string(); + + let shard_01 = Shard { + index_uid: index_uid_0.clone().into(), + source_id: source_id.clone(), + shard_id: 1, + leader_id: "test-leader-0".to_string(), + shard_state: ShardState::Open as i32, + ..Default::default() + }; + let shard_02 = Shard { + index_uid: index_uid_0.clone().into(), + source_id: source_id.clone(), + shard_id: 2, + leader_id: "test-leader-0".to_string(), + shard_state: ShardState::Open as i32, + ..Default::default() + }; + let shard_11 = Shard { + index_uid: index_uid_1.clone().into(), + source_id: source_id.clone(), + shard_id: 1, + leader_id: "test-leader-0".to_string(), + shard_state: ShardState::Open as i32, + ..Default::default() + }; + shard_table.insert_newly_opened_shards( + &index_uid_0, + &source_id, + vec![shard_01.clone(), shard_02], + 3, + ); + shard_table.insert_newly_opened_shards(&index_uid_1, &source_id, vec![shard_11], 2); + + let source_uid_0 = SourceUid { + index_uid: index_uid_0.clone(), + source_id: source_id.clone(), + }; + shard_table.delete_shards(&source_uid_0, &[2]); + + let source_uid_1 = SourceUid { + index_uid: index_uid_1.clone(), + source_id: source_id.clone(), + }; + shard_table.delete_shards(&source_uid_1, &[1]); + + assert_eq!(shard_table.table_entries.len(), 2); + + let table_entry = shard_table.table_entries.get(&source_uid_0).unwrap(); + let shards = table_entry.shards(); + assert_eq!(shards.len(), 1); + assert_eq!(shards[0], shard_01); + assert_eq!(table_entry.next_shard_id, 3); + + let table_entry = shard_table.table_entries.get(&source_uid_1).unwrap(); + assert!(table_entry.is_empty()); + assert_eq!(table_entry.next_shard_id, 2); + } + + #[test] + fn test_shard_table_acquire_scaling_up_permits() { + let mut shard_table = ShardTable::default(); + + let index_uid: IndexUid = "test-index:0".into(); + let source_id = "test-source".to_string(); + + let source_uid = SourceUid { + index_uid: index_uid.clone(), + source_id: source_id.clone(), + }; + assert!(shard_table + .acquire_scaling_permits(&source_uid, ScalingMode::Up, 1) + .is_none()); + + shard_table.add_source(&index_uid, &source_id); + + let previous_available_permits = shard_table + .table_entries + .get(&source_uid) + .unwrap() + .scaling_up_rate_limiter + .available_permits(); + + assert!(shard_table + .acquire_scaling_permits(&source_uid, ScalingMode::Up, 1) + .unwrap()); + + let new_available_permits = shard_table + .table_entries + .get(&source_uid) + .unwrap() + .scaling_up_rate_limiter + .available_permits(); + + assert_eq!(new_available_permits, previous_available_permits - 1); + } + + #[test] + fn test_shard_table_acquire_scaling_down_permits() { + let index_uid: IndexUid = "test-index:0".into(); + let source_id = "test-source".to_string(); + + let mut shard_table = ShardTable::default(); + + let source_uid = SourceUid { + index_uid: index_uid.clone(), + source_id: source_id.clone(), + }; + assert!(shard_table + .acquire_scaling_permits(&source_uid, ScalingMode::Down, 1) + .is_none()); + + shard_table.add_source(&index_uid, &source_id); + + let previous_available_permits = shard_table + .table_entries + .get(&source_uid) + .unwrap() + .scaling_down_rate_limiter + .available_permits(); + + assert!(shard_table + .acquire_scaling_permits(&source_uid, ScalingMode::Down, 1) + .unwrap()); + + let new_available_permits = shard_table + .table_entries + .get(&source_uid) + .unwrap() + .scaling_down_rate_limiter + .available_permits(); + + assert_eq!(new_available_permits, previous_available_permits - 1); + } + + #[test] + fn test_shard_table_release_scaling_up_permits() { + let mut shard_table = ShardTable::default(); + + let index_uid: IndexUid = "test-index:0".into(); + let source_id = "test-source".to_string(); + + shard_table.add_source(&index_uid, &source_id); + + let source_uid = SourceUid { + index_uid: index_uid.clone(), + source_id: source_id.clone(), + }; + let previous_available_permits = shard_table + .table_entries + .get(&source_uid) + .unwrap() + .scaling_up_rate_limiter + .available_permits(); + + assert!(shard_table + .acquire_scaling_permits(&source_uid, ScalingMode::Up, 1) + .unwrap()); + + shard_table.release_scaling_permits(&source_uid, ScalingMode::Up, 1); + + let new_available_permits = shard_table + .table_entries + .get(&source_uid) + .unwrap() + .scaling_up_rate_limiter + .available_permits(); + + assert_eq!(new_available_permits, previous_available_permits); + } + + #[test] + fn test_shard_table_release_scaling_down_permits() { + let mut shard_table = ShardTable::default(); + + let index_uid: IndexUid = "test-index:0".into(); + let source_id = "test-source".to_string(); + + shard_table.add_source(&index_uid, &source_id); + + let source_uid = SourceUid { + index_uid: index_uid.clone(), + source_id: source_id.clone(), + }; + let previous_available_permits = shard_table + .table_entries + .get(&source_uid) + .unwrap() + .scaling_up_rate_limiter + .available_permits(); + + assert!(shard_table + .acquire_scaling_permits(&source_uid, ScalingMode::Down, 1) + .unwrap()); + + shard_table.release_scaling_permits(&source_uid, ScalingMode::Down, 1); + + let new_available_permits = shard_table + .table_entries + .get(&source_uid) + .unwrap() + .scaling_up_rate_limiter + .available_permits(); + + assert_eq!(new_available_permits, previous_available_permits); + } +} diff --git a/quickwit/quickwit-indexing/Cargo.toml b/quickwit/quickwit-indexing/Cargo.toml index 1588e1115d7..8b07a3ad92b 100644 --- a/quickwit/quickwit-indexing/Cargo.toml +++ b/quickwit/quickwit-indexing/Cargo.toml @@ -11,8 +11,8 @@ documentation = "https://quickwit.io/docs/" [dependencies] aws-config = { workspace = true, optional = true } -aws-smithy-client = { workspace = true, optional = true } aws-sdk-kinesis = { workspace = true, optional = true } +aws-smithy-client = { workspace = true, optional = true } anyhow = { workspace = true } arc-swap = { workspace = true } @@ -97,11 +97,11 @@ quickwit-cluster = { workspace = true, features = ["testsuite"] } quickwit-common = { workspace = true, features = ["testsuite"] } quickwit-config = { workspace = true, features = ["testsuite"] } quickwit-doc-mapper = { workspace = true, features = ["testsuite"] } +quickwit-indexing = { workspace = true, features = ["testsuite"]} quickwit-ingest = { workspace = true, features = ["testsuite"] } quickwit-metastore = { workspace = true, features = ["testsuite"] } quickwit-proto = { workspace = true, features = ["testsuite"] } quickwit-storage = { workspace = true, features = ["testsuite"] } -quickwit-indexing = { workspace = true, features = ["testsuite"]} [[test]] name = "failpoints" diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs index 1ba1e253a01..90935cdc9b1 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs @@ -315,6 +315,7 @@ mod tests { use mrecordlog::MultiRecordLog; use quickwit_cluster::{create_cluster_for_test, ChannelTransport}; + use quickwit_common::rate_limiter::{RateLimiter, RateLimiterSettings}; use quickwit_proto::ingest::ingester::{IngesterStatus, ObservationMessage}; use quickwit_proto::ingest::ShardState; use quickwit_proto::types::{queue_id, Position}; @@ -322,9 +323,7 @@ mod tests { use super::*; use crate::ingest_v2::models::IngesterShard; - use crate::ingest_v2::rate_limiter::RateLimiter; use crate::ingest_v2::rate_meter::RateMeter; - use crate::RateLimiterSettings; #[test] fn test_shard_info_serde() { diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index ebb8ea83ff6..c22fba85af4 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -32,6 +32,7 @@ use futures::StreamExt; use mrecordlog::error::{CreateQueueError, TruncateError}; use mrecordlog::MultiRecordLog; use quickwit_cluster::Cluster; +use quickwit_common::rate_limiter::{RateLimiter, RateLimiterSettings}; use quickwit_common::tower::Pool; use quickwit_common::ServiceStream; use quickwit_proto::ingest::ingester::{ @@ -53,7 +54,6 @@ use super::fetch::FetchStreamTask; use super::models::IngesterShard; use super::mrecord::MRecord; use super::mrecordlog_utils::{append_eof_record_if_necessary, check_enough_capacity}; -use super::rate_limiter::{RateLimiter, RateLimiterSettings}; use super::rate_meter::RateMeter; use super::replication::{ ReplicationClient, ReplicationStreamTask, ReplicationStreamTaskHandle, ReplicationTask, @@ -467,7 +467,7 @@ impl IngesterService for Ingester { .get_mut(&queue_id) .expect("rate limiter should be initialized"); - if !rate_limiter.acquire(requested_capacity) { + if !rate_limiter.acquire_bytes(requested_capacity) { warn!("failed to persist records to shard `{queue_id}`: rate limited"); let persist_failure = PersistFailure { @@ -1671,7 +1671,7 @@ mod tests { async fn test_ingester_persist_rate_limited() { let (ingester_ctx, mut ingester) = IngesterForTest::default() .with_rate_limiter_settings(RateLimiterSettings { - burst_limit: ByteSize(0), + burst_limit: 0, rate_limit: ConstantRate::bytes_per_sec(ByteSize(0)), refill_period: Duration::from_millis(100), }) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs index 2fa2b9bbdb6..3843fd0c4b2 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs @@ -23,7 +23,6 @@ mod ingester; mod models; mod mrecord; mod mrecordlog_utils; -mod rate_limiter; mod rate_meter; mod replication; mod router; @@ -33,9 +32,9 @@ mod test_utils; mod workbench; use std::fmt; -use std::ops::Add; +use std::ops::{Add, AddAssign}; -pub use broadcast::setup_local_shards_update_listener; +pub use broadcast::{setup_local_shards_update_listener, LocalShardsUpdate, ShardInfo, ShardInfos}; use bytesize::ByteSize; use quickwit_common::tower::Pool; use quickwit_proto::ingest::ingester::IngesterServiceClient; @@ -46,7 +45,6 @@ pub use self::fetch::{FetchStreamError, MultiFetchStream}; pub use self::ingester::{wait_for_ingester_decommission, Ingester}; use self::mrecord::MRECORD_HEADER_LEN; pub use self::mrecord::{decoded_mrecords, MRecord}; -pub use self::rate_limiter::RateLimiterSettings; pub use self::router::IngestRouter; pub type IngesterPool = Pool; @@ -63,7 +61,7 @@ pub(super) fn estimate_size(doc_batch: &DocBatchV2) -> ByteSize { ByteSize(estimate as u64) } -#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)] +#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Ord, PartialOrd)] pub struct RateMibPerSec(pub u16); impl fmt::Display for RateMibPerSec { @@ -87,6 +85,13 @@ impl Add for RateMibPerSec { } } +impl AddAssign for RateMibPerSec { + #[inline(always)] + fn add_assign(&mut self, rhs: RateMibPerSec) { + self.0 += rhs.0; + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 42ec6043e89..cd5c892e27e 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -57,6 +57,7 @@ use quickwit_cluster::{ start_cluster_service, Cluster, ClusterChange, ClusterMember, ListenerHandle, }; use quickwit_common::pubsub::{EventBroker, EventSubscriptionHandle}; +use quickwit_common::rate_limiter::RateLimiterSettings; use quickwit_common::runtimes::RuntimesConfig; use quickwit_common::tower::{ BalanceChannel, BoxFutureInfaillible, BufferLayer, Change, ConstantRate, EstimateRateLayer, @@ -64,7 +65,7 @@ use quickwit_common::tower::{ }; use quickwit_config::service::QuickwitService; use quickwit_config::NodeConfig; -use quickwit_control_plane::control_plane::ControlPlane; +use quickwit_control_plane::control_plane::{ControlPlane, ControlPlaneEventSubscriber}; use quickwit_control_plane::{IndexerNodeInfo, IndexerPool}; use quickwit_index_management::{IndexService as IndexManager, IndexServiceError}; use quickwit_indexing::actors::IndexingService; @@ -73,7 +74,7 @@ use quickwit_indexing::start_indexing_service; use quickwit_ingest::{ setup_local_shards_update_listener, start_ingest_api_service, wait_for_ingester_decommission, GetMemoryCapacity, IngestApiService, IngestRequest, IngestRouter, IngestServiceClient, - Ingester, IngesterPool, RateLimiterSettings, + Ingester, IngesterPool, LocalShardsUpdate, }; use quickwit_janitor::{start_janitor_service, JanitorService}; use quickwit_metastore::{ @@ -211,11 +212,11 @@ async fn start_ingest_client_if_needed( async fn start_control_plane_if_needed( node_config: &NodeConfig, cluster: &Cluster, + event_broker: &EventBroker, metastore_client: &MetastoreServiceClient, universe: &Universe, indexer_pool: &IndexerPool, ingester_pool: &IngesterPool, - event_broker: &EventBroker, ) -> anyhow::Result { if node_config.is_service_enabled(QuickwitService::ControlPlane) { check_cluster_configuration( @@ -235,13 +236,13 @@ async fn start_control_plane_if_needed( .get(); let control_plane_mailbox = setup_control_plane( universe, + event_broker, cluster_id, self_node_id, indexer_pool.clone(), ingester_pool.clone(), metastore_client.clone(), replication_factor, - event_broker, ) .await?; Ok(ControlPlaneServiceClient::from_mailbox( @@ -321,11 +322,11 @@ pub async fn serve_quickwit( let control_plane_service: ControlPlaneServiceClient = start_control_plane_if_needed( &node_config, &cluster, + &event_broker, &metastore_client, &universe, &indexer_pool, &ingester_pool, - &event_broker, ) .await?; @@ -605,10 +606,8 @@ async fn setup_ingest_v2( // We compute the burst limit as something a bit larger than the content length limit, because // we actually rewrite the `\n-delimited format into a tiny bit larger buffer, where the // line length is prefixed. - let burst_limit = ByteSize::b( - (config.ingest_api_config.content_length_limit.as_u64() * 3 / 2) - .clamp(10_000_000, 200_000_000), - ); + let burst_limit = (config.ingest_api_config.content_length_limit.as_u64() * 3 / 2) + .clamp(10_000_000, 200_000_000); let rate_limiter_settings = RateLimiterSettings { burst_limit, ..Default::default() @@ -713,13 +712,13 @@ async fn setup_searcher( #[allow(clippy::too_many_arguments)] async fn setup_control_plane( universe: &Universe, + event_broker: &EventBroker, cluster_id: String, self_node_id: NodeId, indexer_pool: IndexerPool, ingester_pool: IngesterPool, metastore: MetastoreServiceClient, replication_factor: usize, - event_broker: &EventBroker, ) -> anyhow::Result> { let (control_plane_mailbox, _control_plane_handle) = ControlPlane::spawn( universe, @@ -730,20 +729,15 @@ async fn setup_control_plane( metastore, replication_factor, ); - let weak_control_plane_mailbox = control_plane_mailbox.downgrade(); + let subscriber = ControlPlaneEventSubscriber::new(control_plane_mailbox.downgrade()); + event_broker - .subscribe::(move |shard_positions_update| { - let Some(control_plane_mailbox) = weak_control_plane_mailbox.upgrade() else { - return; - }; - if control_plane_mailbox - .try_send_message(shard_positions_update) - .is_err() - { - error!("failed to send shard positions update to control plane"); - } - }) + .subscribe::(subscriber.clone()) + .forever(); + event_broker + .subscribe::(subscriber) .forever(); + Ok(control_plane_mailbox) }