From 298601963a822901b53512988f9ce9cf02cc6092 Mon Sep 17 00:00:00 2001 From: Adrien Guillo Date: Thu, 23 Nov 2023 11:02:13 -0800 Subject: [PATCH] Broadcast shard ingestion throughput and state via Chitchat (#4189) --- quickwit/Cargo.lock | 6 +- quickwit/Cargo.toml | 2 +- quickwit/quickwit-cluster/src/change.rs | 2 +- quickwit/quickwit-cluster/src/cluster.rs | 51 +- quickwit/quickwit-cluster/src/lib.rs | 5 +- quickwit/quickwit-cluster/src/node.rs | 2 +- quickwit/quickwit-common/src/pubsub.rs | 67 +- quickwit/quickwit-common/src/shared_consts.rs | 7 +- quickwit/quickwit-common/src/tower/rate.rs | 35 +- .../src/models/shard_positions.rs | 8 +- .../src/source/ingest/mod.rs | 6 +- quickwit/quickwit-ingest/Cargo.toml | 2 + .../src/ingest_v2/broadcast.rs | 575 ++++++++++++++++++ .../quickwit-ingest/src/ingest_v2/fetch.rs | 8 +- .../quickwit-ingest/src/ingest_v2/ingester.rs | 124 +++- quickwit/quickwit-ingest/src/ingest_v2/mod.rs | 30 + .../quickwit-ingest/src/ingest_v2/models.rs | 4 + .../src/ingest_v2/rate_limiter.rs | 11 +- .../src/ingest_v2/rate_meter.rs | 87 +++ .../src/ingest_v2/replication.rs | 6 +- .../quickwit-ingest/src/ingest_v2/router.rs | 122 +++- .../src/ingest_v2/shard_table.rs | 3 +- quickwit/quickwit-proto/src/ingest/mod.rs | 45 ++ quickwit/quickwit-proto/src/types/mod.rs | 2 +- quickwit/quickwit-serve/src/lib.rs | 29 +- 25 files changed, 1121 insertions(+), 118 deletions(-) create mode 100644 quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs create mode 100644 quickwit/quickwit-ingest/src/ingest_v2/rate_meter.rs diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index f8716aa1af6..9e8b8dd8a82 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -1208,9 +1208,8 @@ dependencies = [ [[package]] name = "chitchat" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb13c7104e3c77027520e984339ab3e805fd19c05c8555cc6338b2a4f455cbfb" +version = "0.8.0" +source = "git+https://github.com/quickwit-oss/chitchat.git?rev=72a994f#72a994fa0acdc7472d60ec8d1179fff92190855e" dependencies = [ "anyhow", "async-trait", @@ -5550,6 +5549,7 @@ dependencies = [ "once_cell", "prost", "quickwit-actors", + "quickwit-cluster", "quickwit-codegen", "quickwit-common", "quickwit-config", diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index 5a821bd6e5a..871b5d70e0b 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -49,7 +49,7 @@ base64 = "0.21" bytes = { version = "1", features = ["serde"] } bytesize = { version = "1.3.0", features = ["serde"] } bytestring = "1.3.0" -chitchat = "0.7" +chitchat = { git = "https://github.com/quickwit-oss/chitchat.git", rev = "72a994f" } chrono = { version = "0.4.23", default-features = false, features = [ "clock", "std", diff --git a/quickwit/quickwit-cluster/src/change.rs b/quickwit/quickwit-cluster/src/change.rs index 268abe58599..13b4b48bbcf 100644 --- a/quickwit/quickwit-cluster/src/change.rs +++ b/quickwit/quickwit-cluster/src/change.rs @@ -346,7 +346,7 @@ mod tests { } pub(crate) fn build(self) -> NodeState { - let mut node_state = NodeState::default(); + let mut node_state = NodeState::for_test(); node_state.set( ENABLED_SERVICES_KEY, diff --git a/quickwit/quickwit-cluster/src/cluster.rs b/quickwit/quickwit-cluster/src/cluster.rs index 34feef7e78e..a2c9b6f136e 100644 --- a/quickwit/quickwit-cluster/src/cluster.rs +++ b/quickwit/quickwit-cluster/src/cluster.rs @@ -27,7 +27,7 @@ use anyhow::Context; use chitchat::transport::Transport; use chitchat::{ spawn_chitchat, Chitchat, ChitchatConfig, ChitchatHandle, ChitchatId, ClusterStateSnapshot, - FailureDetectorConfig, NodeState, + FailureDetectorConfig, ListenerHandle, NodeState, }; use futures::Stream; use itertools::Itertools; @@ -249,9 +249,31 @@ impl Cluster { .await .self_node_state() .get_versioned(key) + .filter(|versioned_value| versioned_value.tombstone.is_none()) .map(|versioned_value| versioned_value.value.clone()) } + pub async fn remove_self_key(&self, key: &str) { + self.chitchat() + .await + .lock() + .await + .self_node_state() + .mark_for_deletion(key) + } + + pub async fn subscribe( + &self, + key_prefix: &str, + callback: impl Fn(&str, &str) + Send + Sync + 'static, + ) -> ListenerHandle { + self.chitchat() + .await + .lock() + .await + .subscribe_event(key_prefix, callback) + } + /// Waits until the predicate holds true for the set of ready members. pub async fn wait_for_ready_members( &self, @@ -643,7 +665,8 @@ pub fn grpc_addr_from_listen_addr_for_test(listen_addr: SocketAddr) -> SocketAdd #[cfg(any(test, feature = "testsuite"))] pub async fn create_cluster_for_test_with_id( - node_id: u16, + node_id: NodeId, + gossip_advertise_port: u16, cluster_id: String, peer_seed_addrs: Vec, enabled_services: &HashSet, @@ -651,8 +674,7 @@ pub async fn create_cluster_for_test_with_id( self_node_readiness: bool, ) -> anyhow::Result { use quickwit_proto::indexing::PIPELINE_FULL_CAPACITY; - let gossip_advertise_addr: SocketAddr = ([127, 0, 0, 1], node_id).into(); - let node_id: NodeId = format!("node_{node_id}").into(); + let gossip_advertise_addr: SocketAddr = ([127, 0, 0, 1], gossip_advertise_port).into(); let self_node = ClusterMember { node_id, generation_id: crate::GenerationId(1), @@ -700,14 +722,17 @@ pub async fn create_cluster_for_test( use quickwit_config::service::QuickwitService; - static NODE_AUTO_INCREMENT: AtomicU16 = AtomicU16::new(1u16); - let node_id = NODE_AUTO_INCREMENT.fetch_add(1, Ordering::Relaxed); + static GOSSIP_ADVERTISE_PORT_SEQUENCE: AtomicU16 = AtomicU16::new(1u16); + let gossip_advertise_port = GOSSIP_ADVERTISE_PORT_SEQUENCE.fetch_add(1, Ordering::Relaxed); + let node_id: NodeId = format!("node-{}", gossip_advertise_port).into(); + let enabled_services = enabled_services .iter() .map(|service_str| QuickwitService::from_str(service_str)) .collect::, _>>()?; let cluster = create_cluster_for_test_with_id( node_id, + gossip_advertise_port, "test-cluster".to_string(), seeds, &enabled_services, @@ -1017,8 +1042,8 @@ mod tests { indexing_tasks_clone.clone(), ) }, - Duration::from_secs(4), - Duration::from_millis(500), + Duration::from_secs(5), + Duration::from_millis(100), ) .await .unwrap(); @@ -1126,7 +1151,8 @@ mod tests { let transport = ChannelTransport::default(); let cluster1a = create_cluster_for_test_with_id( - 11u16, + "node-11".into(), + 11, "cluster1".to_string(), Vec::new(), &HashSet::default(), @@ -1135,7 +1161,8 @@ mod tests { ) .await?; let cluster2a = create_cluster_for_test_with_id( - 21u16, + "node-21".into(), + 21, "cluster2".to_string(), vec![cluster1a.gossip_listen_addr.to_string()], &HashSet::default(), @@ -1144,6 +1171,7 @@ mod tests { ) .await?; let cluster1b = create_cluster_for_test_with_id( + "node-12".into(), 12, "cluster1".to_string(), vec![ @@ -1156,6 +1184,7 @@ mod tests { ) .await?; let cluster2b = create_cluster_for_test_with_id( + "node-22".into(), 22, "cluster2".to_string(), vec![ @@ -1249,7 +1278,7 @@ mod tests { #[test] fn test_serialize_indexing_tasks() { - let mut node_state = NodeState::default(); + let mut node_state = NodeState::for_test(); test_serialize_indexing_tasks_aux(&[], &mut node_state); test_serialize_indexing_tasks_aux( &[IndexingTask { diff --git a/quickwit/quickwit-cluster/src/lib.rs b/quickwit/quickwit-cluster/src/lib.rs index 5821d41c65d..c37e093dab1 100644 --- a/quickwit/quickwit-cluster/src/lib.rs +++ b/quickwit/quickwit-cluster/src/lib.rs @@ -28,6 +28,7 @@ mod node; pub use chitchat::transport::ChannelTransport; use chitchat::transport::UdpTransport; use chitchat::FailureDetectorConfig; +pub use chitchat::ListenerHandle; use quickwit_config::service::QuickwitService; use quickwit_config::NodeConfig; use quickwit_proto::indexing::CpuCapacity; @@ -36,7 +37,9 @@ use time::OffsetDateTime; pub use crate::change::ClusterChange; #[cfg(any(test, feature = "testsuite"))] -pub use crate::cluster::{create_cluster_for_test, grpc_addr_from_listen_addr_for_test}; +pub use crate::cluster::{ + create_cluster_for_test, create_cluster_for_test_with_id, grpc_addr_from_listen_addr_for_test, +}; pub use crate::cluster::{Cluster, ClusterSnapshot, NodeIdSchema}; pub use crate::member::{ClusterMember, INDEXING_CPU_CAPACITY_KEY}; pub use crate::node::ClusterNode; diff --git a/quickwit/quickwit-cluster/src/node.rs b/quickwit/quickwit-cluster/src/node.rs index c3c9c7a2874..7fd58faaafd 100644 --- a/quickwit/quickwit-cluster/src/node.rs +++ b/quickwit/quickwit-cluster/src/node.rs @@ -76,7 +76,7 @@ impl ClusterNode { let grpc_advertise_addr = ([127, 0, 0, 1], port + 1).into(); let chitchat_id = ChitchatId::new(node_id.to_string(), 0, gossip_advertise_addr); let channel = make_channel(grpc_advertise_addr).await; - let mut node_state = NodeState::default(); + let mut node_state = NodeState::for_test(); node_state.set(ENABLED_SERVICES_KEY, enabled_services.join(",")); node_state.set(GRPC_ADVERTISE_ADDR_KEY, grpc_advertise_addr.to_string()); set_indexing_tasks_in_node_state(indexing_tasks, &mut node_state); diff --git a/quickwit/quickwit-common/src/pubsub.rs b/quickwit/quickwit-common/src/pubsub.rs index 1e89191466d..27cab61ebb2 100644 --- a/quickwit/quickwit-common/src/pubsub.rs +++ b/quickwit/quickwit-common/src/pubsub.rs @@ -19,8 +19,6 @@ use std::collections::HashMap; use std::fmt; -use std::fmt::Formatter; -use std::marker::PhantomData; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, Weak}; use std::time::Duration; @@ -37,34 +35,14 @@ pub trait EventSubscriber: Send + Sync + 'static { async fn handle_event(&mut self, event: E); } -struct ClosureSubscriber { - callback: Arc, - _phantom: PhantomData, -} - -impl Clone for ClosureSubscriber { - fn clone(&self) -> Self { - ClosureSubscriber { - callback: self.callback.clone(), - _phantom: self._phantom, - } - } -} - -impl fmt::Debug for ClosureSubscriber { - fn fmt(&self, f: &mut Formatter) -> fmt::Result { - f.debug_struct("ClosureSubscriber") - .field("callback", &std::any::type_name::()) - .finish() - } -} - #[async_trait] -impl EventSubscriber - for ClosureSubscriber +impl EventSubscriber for F +where + E: Event, + F: Fn(E) + Send + Sync + 'static, { async fn handle_event(&mut self, event: E) { - (self.callback)(event); + (self)(event); } } @@ -98,7 +76,7 @@ impl EventBroker { .inner .subscriptions .lock() - .expect("the lock should not be poisoned"); + .expect("lock should not be poisoned"); if !subscriptions.contains::>() { subscriptions.insert::>(HashMap::new()); @@ -113,8 +91,9 @@ impl EventBroker { }; let typed_subscriptions = subscriptions .get_mut::>() - .expect("The subscription map should exist."); + .expect("subscription map should exist"); typed_subscriptions.insert(subscription_id, subscription); + EventSubscriptionHandle { subscription_id, broker: Arc::downgrade(&self.inner), @@ -122,7 +101,7 @@ impl EventBroker { let mut subscriptions = broker .subscriptions .lock() - .expect("the lock should not be poisoned"); + .expect("lock should not be poisoned"); if let Some(typed_subscriptions) = subscriptions.get_mut::>() { typed_subscriptions.remove(&subscription_id); @@ -131,21 +110,6 @@ impl EventBroker { } } - /// Subscribes to an event with a callback function. - #[must_use] - pub fn subscribe_fn( - &self, - callback_fn: impl Fn(E) + Sync + Send + 'static, - ) -> EventSubscriptionHandle - where - E: Event, - { - self.subscribe(ClosureSubscriber { - callback: Arc::new(callback_fn), - _phantom: Default::default(), - }) - } - /// Publishes an event. pub fn publish(&self, event: E) where E: Event { @@ -153,7 +117,7 @@ impl EventBroker { .inner .subscriptions .lock() - .expect("the lock should not be poisoned"); + .expect("lock should not be poisoned"); if let Some(typed_subscriptions) = subscriptions.get::>() { for subscription in typed_subscriptions.values() { @@ -172,6 +136,7 @@ struct EventSubscription { subscriber: Arc>>>, } +#[derive(Clone)] pub struct EventSubscriptionHandle { subscription_id: usize, broker: Weak, @@ -181,8 +146,8 @@ pub struct EventSubscriptionHandle { impl EventSubscriptionHandle { pub fn cancel(self) {} - /// By default, dropping an event cancels the subscription. - /// `forever` consumes the handle and avoid drop + /// By default, dropping a subscription handle cancels the subscription. + /// `forever` consumes the handle and avoids cancelling the subscription on drop. pub fn forever(mut self) { self.broker = Weak::new(); } @@ -251,7 +216,7 @@ mod tests { async fn test_event_broker_handle_drop() { let event_broker = EventBroker::default(); let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - drop(event_broker.subscribe_fn::(move |event| { + drop(event_broker.subscribe(move |event: MyEvent| { tx.send(event.value).unwrap(); })); event_broker.publish(MyEvent { value: 42 }); @@ -263,7 +228,7 @@ mod tests { let event_broker = EventBroker::default(); let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); event_broker - .subscribe_fn::(move |event| { + .subscribe(move |event: MyEvent| { tx.send(event.value).unwrap(); }) .cancel(); @@ -276,7 +241,7 @@ mod tests { let event_broker = EventBroker::default(); let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); event_broker - .subscribe_fn::(move |event| { + .subscribe(move |event: MyEvent| { tx.send(event.value).unwrap(); }) .forever(); diff --git a/quickwit/quickwit-common/src/shared_consts.rs b/quickwit/quickwit-common/src/shared_consts.rs index 5ab1ecc016f..4ee736fa63d 100644 --- a/quickwit/quickwit-common/src/shared_consts.rs +++ b/quickwit/quickwit-common/src/shared_consts.rs @@ -36,5 +36,8 @@ pub const DELETION_GRACE_PERIOD: Duration = Duration::from_secs(60 * 32); // 32 /// being requested. pub const SCROLL_BATCH_LEN: usize = 1_000; -/// Prefix used in chitchat to publish the shard positions. -pub const SHARD_POSITIONS_PREFIX: &str = "shard_positions:"; +/// Prefix used in chitchat to broadcast the positions of the shards assigned to an indexer. +pub const INDEXER_ASSIGNED_SHARDS_POSITIONS_PREFIX: &str = "indexer.assigned_shards_positions:"; + +/// Prefix used in chitchat to broadcast the list of primary shards hosted by a leader. +pub const INGESTER_PRIMARY_SHARDS_PREFIX: &str = "ingester.primary_shards:"; diff --git a/quickwit/quickwit-common/src/tower/rate.rs b/quickwit/quickwit-common/src/tower/rate.rs index 81e7f72fe11..661359c4179 100644 --- a/quickwit/quickwit-common/src/tower/rate.rs +++ b/quickwit/quickwit-common/src/tower/rate.rs @@ -25,6 +25,11 @@ pub trait Rate: Clone { /// Returns the amount of work per time period. fn work(&self) -> u64; + /// Returns the amount of work in bytes per time period. + fn work_bytes(&self) -> ByteSize { + ByteSize(self.work()) + } + /// Returns the duration of a time period. fn period(&self) -> Duration; } @@ -41,9 +46,9 @@ impl ConstantRate { /// /// # Panics /// - /// This function panics if `period` is < 1ms. + /// This function panics if `period` is 0. pub fn new(work: u64, period: Duration) -> Self { - assert!(period.as_millis() > 0); + assert!(!period.is_zero()); Self { work, period } } @@ -56,6 +61,19 @@ impl ConstantRate { pub fn bytes_per_sec(bytes: ByteSize) -> Self { Self::bytes_per_period(bytes, Duration::from_secs(1)) } + + /// Changes the scale of the rate, i.e. the duration of the time period, while keeping the rate + /// constant. + /// + /// # Panics + /// + /// This function panics if `new_period` is 0. + pub fn rescale(&self, new_period: Duration) -> Self { + assert!(!new_period.is_zero()); + + let new_work = self.work() as u128 * new_period.as_nanos() / self.period().as_nanos(); + Self::new(new_work as u64, new_period) + } } impl Rate for ConstantRate { @@ -67,3 +85,16 @@ impl Rate for ConstantRate { self.period } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_rescale() { + let rate = ConstantRate::bytes_per_period(ByteSize::mib(5), Duration::from_secs(5)); + let rescaled_rate = rate.rescale(Duration::from_secs(1)); + assert_eq!(rescaled_rate.work_bytes(), ByteSize::mib(1)); + assert_eq!(rescaled_rate.period(), Duration::from_secs(1)); + } +} diff --git a/quickwit/quickwit-indexing/src/models/shard_positions.rs b/quickwit/quickwit-indexing/src/models/shard_positions.rs index 02d4475fce0..b44741620e7 100644 --- a/quickwit/quickwit-indexing/src/models/shard_positions.rs +++ b/quickwit/quickwit-indexing/src/models/shard_positions.rs @@ -25,7 +25,7 @@ use async_trait::async_trait; use fnv::FnvHashMap; use quickwit_cluster::Cluster; use quickwit_common::pubsub::{Event, EventSubscriber}; -use quickwit_common::shared_consts::SHARD_POSITIONS_PREFIX; +use quickwit_common::shared_consts::INDEXER_ASSIGNED_SHARDS_POSITIONS_PREFIX; use quickwit_proto::types::{Position, ShardId, SourceUid}; use tracing::warn; @@ -76,7 +76,7 @@ impl PublishedShardPositions { index_uid, source_id, } = &source_uid; - let key = format!("{SHARD_POSITIONS_PREFIX}{index_uid}:{source_id}"); + let key = format!("{INDEXER_ASSIGNED_SHARDS_POSITIONS_PREFIX}{index_uid}:{source_id}"); let shard_positions_json = serde_json::to_string(&shard_positions).unwrap(); self.cluster_client .set_self_key_value(key, shard_positions_json) @@ -125,7 +125,7 @@ mod tests { use chitchat::transport::ChannelTransport; use quickwit_cluster::create_cluster_for_test; use quickwit_common::pubsub::EventBroker; - use quickwit_common::shared_consts::SHARD_POSITIONS_PREFIX; + use quickwit_common::shared_consts::INDEXER_ASSIGNED_SHARDS_POSITIONS_PREFIX; use quickwit_proto::types::IndexUid; use super::*; @@ -141,7 +141,7 @@ mod tests { event_broker.subscribe(shard_positions).forever(); let index_uid = IndexUid::new_with_random_ulid("index-test"); let source_id = "test-source".to_string(); - let key = format!("{SHARD_POSITIONS_PREFIX}{index_uid}:{source_id}"); + let key = format!("{INDEXER_ASSIGNED_SHARDS_POSITIONS_PREFIX}{index_uid}:{source_id}"); let source_uid = SourceUid { index_uid, source_id, diff --git a/quickwit/quickwit-indexing/src/source/ingest/mod.rs b/quickwit/quickwit-indexing/src/source/ingest/mod.rs index dc373b36069..5f9cf1afecc 100644 --- a/quickwit/quickwit-indexing/src/source/ingest/mod.rs +++ b/quickwit/quickwit-indexing/src/source/ingest/mod.rs @@ -755,7 +755,7 @@ mod tests { let (shard_positions_update_tx, mut shard_positions_update_rx) = tokio::sync::mpsc::unbounded_channel::(); event_broker - .subscribe_fn::(move |update| { + .subscribe::(move |update| { shard_positions_update_tx.send(update).unwrap(); }) .forever(); @@ -908,7 +908,7 @@ mod tests { let (shard_positions_update_tx, mut shard_positions_update_rx) = tokio::sync::mpsc::unbounded_channel::(); event_broker - .subscribe_fn::(move |update| { + .subscribe::(move |update| { shard_positions_update_tx.send(update).unwrap(); }) .forever(); @@ -1205,7 +1205,7 @@ mod tests { let (shard_positions_update_tx, mut shard_positions_update_rx) = tokio::sync::mpsc::unbounded_channel::(); event_broker - .subscribe_fn::(move |update| { + .subscribe::(move |update| { shard_positions_update_tx.send(update).unwrap(); }) .forever(); diff --git a/quickwit/quickwit-ingest/Cargo.toml b/quickwit/quickwit-ingest/Cargo.toml index 86f6bb11973..90aaf4df335 100644 --- a/quickwit/quickwit-ingest/Cargo.toml +++ b/quickwit/quickwit-ingest/Cargo.toml @@ -31,6 +31,7 @@ ulid = {workspace = true } utoipa = { workspace = true } quickwit-actors = { workspace = true } +quickwit-cluster = { workspace = true } quickwit-common = { workspace = true } quickwit-config = { workspace = true } quickwit-proto = { workspace = true } @@ -43,6 +44,7 @@ rand_distr = { workspace = true } tempfile = { workspace = true } quickwit-actors = { workspace = true, features = ["testsuite"] } +quickwit-cluster = { workspace = true, features = ["testsuite"] } quickwit-common = { workspace = true, features = ["testsuite"] } quickwit-proto = { workspace = true, features = ["testsuite"] } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs new file mode 100644 index 00000000000..1ba1e253a01 --- /dev/null +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs @@ -0,0 +1,575 @@ +// Copyright (C) 2023 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::collections::{BTreeMap, BTreeSet}; +use std::sync::Weak; +use std::time::Duration; + +use bytesize::ByteSize; +use quickwit_cluster::{Cluster, ListenerHandle}; +use quickwit_common::pubsub::{Event, EventBroker}; +use quickwit_common::shared_consts::INGESTER_PRIMARY_SHARDS_PREFIX; +use quickwit_common::sorted_iter::{KeyDiff, SortedByKeyIterator}; +use quickwit_common::tower::Rate; +use quickwit_proto::ingest::ShardState; +use quickwit_proto::types::{split_queue_id, QueueId, ShardId, SourceUid}; +use serde::{Deserialize, Serialize, Serializer}; +use tokio::sync::RwLock; +use tokio::task::JoinHandle; +use tracing::{debug, warn}; + +use super::ingester::IngesterState; +use crate::RateMibPerSec; + +const BROADCAST_INTERVAL_PERIOD: Duration = if cfg!(test) { + Duration::from_millis(50) +} else { + Duration::from_secs(5) +}; + +const ONE_MIB: ByteSize = ByteSize::mib(1); + +/// Broadcasted information about a primary shard. +#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)] +pub struct ShardInfo { + pub shard_id: ShardId, + pub shard_state: ShardState, + /// Shard ingestion rate in MiB/s. + pub ingestion_rate: RateMibPerSec, +} + +impl Serialize for ShardInfo { + fn serialize(&self, serializer: S) -> Result { + serializer.serialize_str(&format!( + "{}:{}:{}", + self.shard_id, + self.shard_state.as_json_str_name(), + self.ingestion_rate.0, + )) + } +} + +impl<'de> Deserialize<'de> for ShardInfo { + fn deserialize(deserializer: D) -> Result + where D: serde::Deserializer<'de> { + let value = String::deserialize(deserializer)?; + let mut parts = value.split(':'); + + let shard_id = parts + .next() + .ok_or_else(|| serde::de::Error::custom("invalid shard info"))? + .parse::() + .map_err(|_| serde::de::Error::custom("invalid shard ID"))?; + + let shard_state_str = parts + .next() + .ok_or_else(|| serde::de::Error::custom("invalid shard info"))?; + let shard_state = ShardState::from_json_str_name(shard_state_str) + .ok_or_else(|| serde::de::Error::custom("invalid shard state"))?; + + let ingestion_rate = parts + .next() + .ok_or_else(|| serde::de::Error::custom("invalid shard info"))? + .parse::() + .map(RateMibPerSec) + .map_err(|_| serde::de::Error::custom("invalid shard ingestion rate"))?; + + Ok(Self { + shard_id, + shard_state, + ingestion_rate, + }) + } +} + +/// A set of primary shards belonging to the same source. +pub type ShardInfos = BTreeSet; + +/// Lists ALL the primary shards hosted by a SINGLE ingester, grouped by source. +#[derive(Debug, Default, Eq, PartialEq)] +struct LocalShardsSnapshot { + per_source_shard_infos: BTreeMap, +} + +#[derive(Debug)] +enum ShardInfosChange<'a> { + Updated { + source_uid: &'a SourceUid, + shard_infos: &'a ShardInfos, + }, + Removed { + source_uid: &'a SourceUid, + }, +} + +impl LocalShardsSnapshot { + pub fn diff<'a>(&'a self, other: &'a Self) -> impl Iterator> + '_ { + self.per_source_shard_infos + .iter() + .diff_by_key(other.per_source_shard_infos.iter()) + .filter_map(|key_diff| match key_diff { + KeyDiff::Added(source_uid, shard_infos) => Some(ShardInfosChange::Updated { + source_uid, + shard_infos, + }), + KeyDiff::Unchanged(source_uid, previous_shard_infos, new_shard_infos) => { + if previous_shard_infos != new_shard_infos { + Some(ShardInfosChange::Updated { + source_uid, + shard_infos: new_shard_infos, + }) + } else { + None + } + } + KeyDiff::Removed(source_uid, _shard_infos) => { + Some(ShardInfosChange::Removed { source_uid }) + } + }) + } +} + +/// Takes a snapshot of the primary shards hosted by the ingester at regular intervals and +/// broadcasts it to other nodes via Chitchat. +pub(super) struct BroadcastLocalShardsTask { + cluster: Cluster, + weak_state: Weak>, +} + +impl BroadcastLocalShardsTask { + pub fn spawn(cluster: Cluster, weak_state: Weak>) -> JoinHandle<()> { + let mut broadcaster = Self { + cluster, + weak_state, + }; + tokio::spawn(async move { broadcaster.run().await }) + } + + async fn snapshot_local_shards(&self) -> Option { + let state = self.weak_state.upgrade()?; + let mut state_guard = state.write().await; + + let mut per_source_shard_infos: BTreeMap = BTreeMap::new(); + + let queue_ids: Vec<(QueueId, ShardState)> = state_guard + .shards + .iter() + .filter_map(|(queue_id, shard)| { + if !shard.is_replica() { + Some((queue_id.clone(), shard.shard_state)) + } else { + None + } + }) + .collect(); + + for (queue_id, shard_state) in queue_ids { + let Some((_rate_limiter, rate_meter)) = state_guard.rate_trackers.get_mut(&queue_id) + else { + warn!("rate limiter `{queue_id}` not found",); + continue; + }; + let Some((index_uid, source_id, shard_id)) = split_queue_id(&queue_id) else { + warn!("failed to parse queue ID `{queue_id}`"); + continue; + }; + let source_uid = SourceUid { + index_uid, + source_id, + }; + // Shard ingestion rate in MiB/s. + let ingestion_rate_u64 = + rate_meter.harvest().rescale(Duration::from_secs(1)).work() / ONE_MIB.as_u64(); + let ingestion_rate = RateMibPerSec(ingestion_rate_u64 as u16); + + let shard_info = ShardInfo { + shard_id, + shard_state, + ingestion_rate, + }; + per_source_shard_infos + .entry(source_uid) + .or_default() + .insert(shard_info); + } + let snapshot = LocalShardsSnapshot { + per_source_shard_infos, + }; + Some(snapshot) + } + + async fn broadcast_local_shards( + &self, + previous_snapshot: &LocalShardsSnapshot, + new_snapshot: &LocalShardsSnapshot, + ) { + for change in previous_snapshot.diff(new_snapshot) { + match change { + ShardInfosChange::Updated { + source_uid, + shard_infos, + } => { + let key = make_key(source_uid); + let value = serde_json::to_string(&shard_infos) + .expect("`ShardInfos` should be JSON serializable"); + self.cluster.set_self_key_value(key, value).await; + } + ShardInfosChange::Removed { source_uid } => { + let key = make_key(source_uid); + self.cluster.remove_self_key(&key).await; + } + } + } + } + + async fn run(&mut self) { + let mut interval = tokio::time::interval(BROADCAST_INTERVAL_PERIOD); + let mut previous_snapshot = LocalShardsSnapshot::default(); + + loop { + interval.tick().await; + + let Some(new_snapshot) = self.snapshot_local_shards().await else { + // The state has been dropped, we can stop the task. + debug!("stopping local shards broadcast task"); + return; + }; + self.broadcast_local_shards(&previous_snapshot, &new_snapshot) + .await; + + previous_snapshot = new_snapshot; + } + } +} + +fn make_key(source_uid: &SourceUid) -> String { + format!( + "{INGESTER_PRIMARY_SHARDS_PREFIX}{}:{}", + source_uid.index_uid, source_uid.source_id + ) +} + +fn parse_key(key: &str) -> Option { + let (index_uid_str, source_id_str) = key.rsplit_once(':')?; + + Some(SourceUid { + index_uid: index_uid_str.into(), + source_id: source_id_str.to_string(), + }) +} + +#[derive(Debug, Clone)] +pub struct LocalShardsUpdate { + // TODO: add leader ID in order to update routing table. + // leader_id: NodeId, + pub source_uid: SourceUid, + pub shard_infos: ShardInfos, +} + +impl Event for LocalShardsUpdate {} + +pub async fn setup_local_shards_update_listener( + cluster: Cluster, + event_broker: EventBroker, +) -> ListenerHandle { + cluster + .subscribe(INGESTER_PRIMARY_SHARDS_PREFIX, move |key, value| { + let Some(source_uid) = parse_key(key) else { + warn!("failed to parse source UID `{key}`"); + return; + }; + let Ok(shard_infos) = serde_json::from_str::(value) else { + warn!("failed to parse shard infos `{value}`"); + return; + }; + let local_shards_update = LocalShardsUpdate { + source_uid, + shard_infos, + }; + event_broker.publish(local_shards_update); + }) + .await +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::Arc; + + use mrecordlog::MultiRecordLog; + use quickwit_cluster::{create_cluster_for_test, ChannelTransport}; + use quickwit_proto::ingest::ingester::{IngesterStatus, ObservationMessage}; + use quickwit_proto::ingest::ShardState; + use quickwit_proto::types::{queue_id, Position}; + use tokio::sync::watch; + + use super::*; + use crate::ingest_v2::models::IngesterShard; + use crate::ingest_v2::rate_limiter::RateLimiter; + use crate::ingest_v2::rate_meter::RateMeter; + use crate::RateLimiterSettings; + + #[test] + fn test_shard_info_serde() { + let shard_info = ShardInfo { + shard_id: 1, + shard_state: ShardState::Open, + ingestion_rate: RateMibPerSec(42), + }; + let serialized = serde_json::to_string(&shard_info).unwrap(); + assert_eq!(serialized, r#""1:open:42""#); + + let deserialized = serde_json::from_str::(&serialized).unwrap(); + assert_eq!(deserialized, shard_info); + } + + #[test] + fn test_local_shards_snapshot_diff() { + let previous_snapshot = LocalShardsSnapshot::default(); + let current_snapshot = LocalShardsSnapshot::default(); + let num_changes = previous_snapshot.diff(¤t_snapshot).count(); + assert_eq!(num_changes, 0); + + let previous_snapshot = LocalShardsSnapshot::default(); + let current_snapshot = LocalShardsSnapshot { + per_source_shard_infos: vec![( + SourceUid { + index_uid: "test-index:0".into(), + source_id: "test-source".to_string(), + }, + vec![ShardInfo { + shard_id: 1, + shard_state: ShardState::Open, + ingestion_rate: RateMibPerSec(42), + }] + .into_iter() + .collect(), + )] + .into_iter() + .collect(), + }; + let changes = previous_snapshot + .diff(¤t_snapshot) + .collect::>(); + assert_eq!(changes.len(), 1); + + let ShardInfosChange::Updated { + source_uid, + shard_infos, + } = &changes[0] + else { + panic!( + "expected `ShardInfosChange::Updated` variant, got {:?}", + changes[0] + ); + }; + assert_eq!(source_uid.index_uid, "test-index:0"); + assert_eq!(source_uid.source_id, "test-source"); + assert_eq!(shard_infos.len(), 1); + + let num_changes = current_snapshot.diff(¤t_snapshot).count(); + assert_eq!(num_changes, 0); + + let previous_snapshot = current_snapshot; + let current_snapshot = LocalShardsSnapshot { + per_source_shard_infos: vec![( + SourceUid { + index_uid: "test-index:0".into(), + source_id: "test-source".to_string(), + }, + vec![ShardInfo { + shard_id: 1, + shard_state: ShardState::Closed, + ingestion_rate: RateMibPerSec(42), + }] + .into_iter() + .collect(), + )] + .into_iter() + .collect(), + }; + let changes = previous_snapshot + .diff(¤t_snapshot) + .collect::>(); + assert_eq!(changes.len(), 1); + + let ShardInfosChange::Updated { + source_uid, + shard_infos, + } = &changes[0] + else { + panic!( + "expected `ShardInfosChange::Updated` variant, got {:?}", + changes[0] + ); + }; + assert_eq!(source_uid.index_uid, "test-index:0"); + assert_eq!(source_uid.source_id, "test-source"); + assert_eq!(shard_infos.len(), 1); + + let previous_snapshot = current_snapshot; + let current_snapshot = LocalShardsSnapshot::default(); + + let changes = previous_snapshot + .diff(¤t_snapshot) + .collect::>(); + assert_eq!(changes.len(), 1); + + let ShardInfosChange::Removed { source_uid } = &changes[0] else { + panic!( + "expected `ShardInfosChange::Removed` variant, got {:?}", + changes[0] + ); + }; + assert_eq!(source_uid.index_uid, "test-index:0"); + assert_eq!(source_uid.source_id, "test-source"); + } + + #[tokio::test] + async fn test_broadcast_local_shards_task() { + let transport = ChannelTransport::default(); + let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, true) + .await + .unwrap(); + + let tempdir = tempfile::tempdir().unwrap(); + let mrecordlog = MultiRecordLog::open(tempdir.path()).await.unwrap(); + let (observation_tx, _observation_rx) = watch::channel(Ok(ObservationMessage::default())); + let state = Arc::new(RwLock::new(IngesterState { + mrecordlog, + shards: HashMap::new(), + rate_trackers: HashMap::new(), + replication_streams: HashMap::new(), + replication_tasks: HashMap::new(), + status: IngesterStatus::Ready, + observation_tx, + })); + let weak_state = Arc::downgrade(&state); + let task = BroadcastLocalShardsTask { + cluster, + weak_state, + }; + let previous_snapshot = task.snapshot_local_shards().await.unwrap(); + assert!(previous_snapshot.per_source_shard_infos.is_empty()); + + let mut state_guard = state.write().await; + + let queue_id_01 = queue_id("test-index:0", "test-source", 1); + let shard = IngesterShard::new_solo(ShardState::Open, Position::Beginning, Position::Eof); + state_guard.shards.insert(queue_id_01.clone(), shard); + + let rate_limiter = RateLimiter::from_settings(RateLimiterSettings::default()); + let rate_meter = RateMeter::default(); + + state_guard + .rate_trackers + .insert(queue_id_01.clone(), (rate_limiter, rate_meter)); + + drop(state_guard); + + let new_snapshot = task.snapshot_local_shards().await.unwrap(); + assert_eq!(new_snapshot.per_source_shard_infos.len(), 1); + + task.broadcast_local_shards(&previous_snapshot, &new_snapshot) + .await; + + tokio::time::sleep(Duration::from_millis(100)).await; + + let key = format!( + "{INGESTER_PRIMARY_SHARDS_PREFIX}{}:{}", + "test-index:0", "test-source" + ); + task.cluster.get_self_key_value(&key).await.unwrap(); + + task.broadcast_local_shards(&new_snapshot, &previous_snapshot) + .await; + + tokio::time::sleep(Duration::from_millis(100)).await; + + let value_opt = task.cluster.get_self_key_value(&key).await; + assert!(value_opt.is_none()); + } + + #[test] + fn test_make_key() { + let source_uid = SourceUid { + index_uid: "test-index:0".into(), + source_id: "test-source".to_string(), + }; + let key = make_key(&source_uid); + assert_eq!(key, "ingester.primary_shards:test-index:0:test-source"); + } + + #[test] + fn test_parse_key() { + let key = "test-index:0:test-source"; + let source_uid = parse_key(key).unwrap(); + assert_eq!(source_uid.index_uid, "test-index:0".to_string(),); + assert_eq!(source_uid.source_id, "test-source".to_string(),); + } + + #[tokio::test] + async fn test_local_shards_update_listener() { + let transport = ChannelTransport::default(); + let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, true) + .await + .unwrap(); + let event_broker = EventBroker::default(); + + let local_shards_update_counter = Arc::new(AtomicUsize::new(0)); + let local_shards_update_counter_clone = local_shards_update_counter.clone(); + + event_broker + .subscribe(move |event: LocalShardsUpdate| { + local_shards_update_counter_clone.fetch_add(1, Ordering::Release); + + assert_eq!(event.source_uid.index_uid, "test-index:0"); + assert_eq!(event.source_uid.source_id, "test-source"); + assert_eq!(event.shard_infos.len(), 1); + + let shard_info = event.shard_infos.iter().next().unwrap(); + assert_eq!(shard_info.shard_id, 1); + assert_eq!(shard_info.shard_state, ShardState::Open); + assert_eq!(shard_info.ingestion_rate, 42u16); + }) + .forever(); + + setup_local_shards_update_listener(cluster.clone(), event_broker.clone()) + .await + .forever(); + + let source_uid = SourceUid { + index_uid: "test-index:0".into(), + source_id: "test-source".to_string(), + }; + let key = make_key(&source_uid); + let value = serde_json::to_string(&vec![ShardInfo { + shard_id: 1, + shard_state: ShardState::Open, + ingestion_rate: RateMibPerSec(42), + }]) + .unwrap(); + + cluster.set_self_key_value(key, value).await; + tokio::time::sleep(Duration::from_millis(50)).await; + + assert_eq!(local_shards_update_counter.load(Ordering::Acquire), 1); + } +} diff --git a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs index ae990f8cfeb..ce2113c1592 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs @@ -582,7 +582,7 @@ mod tests { let state = Arc::new(RwLock::new(IngesterState { mrecordlog, shards: HashMap::new(), - rate_limiters: HashMap::new(), + rate_trackers: HashMap::new(), replication_streams: HashMap::new(), replication_tasks: HashMap::new(), status: IngesterStatus::Ready, @@ -751,7 +751,7 @@ mod tests { let state = Arc::new(RwLock::new(IngesterState { mrecordlog, shards: HashMap::new(), - rate_limiters: HashMap::new(), + rate_trackers: HashMap::new(), replication_streams: HashMap::new(), replication_tasks: HashMap::new(), status: IngesterStatus::Ready, @@ -854,7 +854,7 @@ mod tests { let state = Arc::new(RwLock::new(IngesterState { mrecordlog, shards: HashMap::new(), - rate_limiters: HashMap::new(), + rate_trackers: HashMap::new(), replication_streams: HashMap::new(), replication_tasks: HashMap::new(), status: IngesterStatus::Ready, @@ -895,7 +895,7 @@ mod tests { let state = Arc::new(RwLock::new(IngesterState { mrecordlog, shards: HashMap::new(), - rate_limiters: HashMap::new(), + rate_trackers: HashMap::new(), replication_streams: HashMap::new(), replication_tasks: HashMap::new(), status: IngesterStatus::Ready, diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index c61ad2e6fa7..76786050c38 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -31,6 +31,7 @@ use futures::stream::FuturesUnordered; use futures::StreamExt; use mrecordlog::error::{CreateQueueError, TruncateError}; use mrecordlog::MultiRecordLog; +use quickwit_cluster::Cluster; use quickwit_common::tower::Pool; use quickwit_common::ServiceStream; use quickwit_proto::ingest::ingester::{ @@ -52,11 +53,13 @@ use super::models::IngesterShard; use super::mrecord::MRecord; use super::mrecordlog_utils::{append_eof_record_if_necessary, check_enough_capacity}; use super::rate_limiter::{RateLimiter, RateLimiterSettings}; +use super::rate_meter::RateMeter; use super::replication::{ ReplicationStreamTask, ReplicationStreamTaskHandle, ReplicationTask, ReplicationTaskHandle, SYN_REPLICATION_STREAM_CAPACITY, }; use super::IngesterPool; +use crate::ingest_v2::broadcast::BroadcastLocalShardsTask; use crate::ingest_v2::mrecordlog_utils::get_truncation_position; use crate::metrics::INGEST_METRICS; use crate::{estimate_size, FollowerId, LeaderId}; @@ -92,7 +95,7 @@ impl fmt::Debug for Ingester { pub(super) struct IngesterState { pub mrecordlog: MultiRecordLog, pub shards: HashMap, - pub rate_limiters: HashMap, + pub rate_trackers: HashMap, // Replication stream opened with followers. pub replication_streams: HashMap, // Replication tasks running for each replication stream opened with leaders. @@ -103,7 +106,7 @@ pub(super) struct IngesterState { impl Ingester { pub async fn try_new( - self_node_id: NodeId, + cluster: Cluster, ingester_pool: Pool, wal_dir_path: &Path, disk_capacity: ByteSize, @@ -111,6 +114,7 @@ impl Ingester { rate_limiter_settings: RateLimiterSettings, replication_factor: usize, ) -> IngestV2Result { + let self_node_id: NodeId = cluster.self_node_id().clone().into(); let mrecordlog = MultiRecordLog::open_with_prefs( wal_dir_path, mrecordlog::SyncPolicy::OnDelay(Duration::from_secs(5)), @@ -132,7 +136,7 @@ impl Ingester { let inner = IngesterState { mrecordlog, shards: HashMap::new(), - rate_limiters: HashMap::new(), + rate_trackers: HashMap::new(), replication_streams: HashMap::new(), replication_tasks: HashMap::new(), status: IngesterStatus::Ready, @@ -155,6 +159,9 @@ impl Ingester { ); ingester.init().await?; + let weak_state = Arc::downgrade(&ingester.state); + BroadcastLocalShardsTask::spawn(cluster, weak_state); + Ok(ingester) } @@ -227,7 +234,10 @@ impl Ingester { } }; let rate_limiter = RateLimiter::from_settings(self.rate_limiter_settings); - state.rate_limiters.insert(queue_id.clone(), rate_limiter); + let rate_meter = RateMeter::default(); + state + .rate_trackers + .insert(queue_id.clone(), (rate_limiter, rate_meter)); let shard = if let Some(follower_id) = follower_id_opt { self.init_replication_stream(state, leader_id, follower_id) @@ -417,8 +427,8 @@ impl IngesterService for Ingester { persist_failures.push(persist_failure); continue; } - let rate_limiter = state_guard - .rate_limiters + let (rate_limiter, rate_meter) = state_guard + .rate_trackers .get_mut(&queue_id) .expect("rate limiter should be initialized"); @@ -435,6 +445,11 @@ impl IngesterService for Ingester { persist_failures.push(persist_failure); continue; } + let batch_num_bytes = doc_batch.num_bytes() as u64; + let batch_num_docs = doc_batch.num_docs() as u64; + + rate_meter.update(batch_num_bytes); + let current_position_inclusive: Position = if force_commit { let encoded_mrecords = doc_batch .docs() @@ -454,9 +469,6 @@ impl IngesterService for Ingester { .expect("TODO") // TODO: Io error, close shard? } .into(); - let batch_num_bytes = doc_batch.num_bytes() as u64; - let batch_num_docs = doc_batch.num_docs() as u64; - INGEST_METRICS.ingested_num_bytes.inc_by(batch_num_bytes); INGEST_METRICS.ingested_num_docs.inc_by(batch_num_docs); @@ -797,10 +809,15 @@ pub async fn wait_for_ingester_decommission(ingester_opt: Option (IngesterContext, Ingester) { + static GOSSIP_ADVERTISE_PORT_SEQUENCE: AtomicU16 = AtomicU16::new(1u16); + let tempdir = tempfile::tempdir().unwrap(); let wal_dir_path = tempdir.path(); - let ingester = Ingester::try_new( + let transport = ChannelTransport::default(); + + let gossip_advertise_port = + GOSSIP_ADVERTISE_PORT_SEQUENCE.fetch_add(1, Ordering::Relaxed); + + let cluster = create_cluster_for_test_with_id( self.node_id.clone(), + gossip_advertise_port, + "test-cluster".to_string(), + Vec::new(), + &HashSet::from_iter([QuickwitService::Indexer]), + &transport, + true, + ) + .await + .unwrap(); + + let ingester = Ingester::try_new( + cluster.clone(), self.ingester_pool.clone(), wal_dir_path, self.disk_capacity, @@ -878,9 +915,12 @@ mod tests { ) .await .unwrap(); + let ingester_env = IngesterContext { _tempdir: tempdir, + _transport: transport, node_id: self.node_id, + cluster, ingester_pool: self.ingester_pool, }; (ingester_env, ingester) @@ -889,7 +929,9 @@ mod tests { pub struct IngesterContext { _tempdir: tempfile::TempDir, + _transport: ChannelTransport, node_id: NodeId, + cluster: Cluster, ingester_pool: IngesterPool, } @@ -994,6 +1036,68 @@ mod tests { .assert_records_eq(&queue_id_03, .., &[(0, "\0\x02")]); } + #[tokio::test] + async fn test_ingester_broadcasts_local_shards() { + let (ingester_ctx, ingester) = IngesterForTest::default().build().await; + + let mut state_guard = ingester.state.write().await; + + let queue_id_01 = queue_id("test-index:0", "test-source", 1); + let shard = IngesterShard::new_solo(ShardState::Open, Position::Beginning, Position::Eof); + state_guard.shards.insert(queue_id_01.clone(), shard); + + let rate_limiter = RateLimiter::from_settings(RateLimiterSettings::default()); + let rate_meter = RateMeter::default(); + state_guard + .rate_trackers + .insert(queue_id_01.clone(), (rate_limiter, rate_meter)); + + drop(state_guard); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let key = format!( + "{INGESTER_PRIMARY_SHARDS_PREFIX}{}:{}", + "test-index:0", "test-source" + ); + let value = ingester_ctx.cluster.get_self_key_value(&key).await.unwrap(); + + let shard_infos: ShardInfos = serde_json::from_str(&value).unwrap(); + assert_eq!(shard_infos.len(), 1); + + let shard_info = shard_infos.iter().next().unwrap(); + assert_eq!(shard_info.shard_id, 1); + assert_eq!(shard_info.shard_state, ShardState::Open); + assert_eq!(shard_info.ingestion_rate, 0); + + let mut state_guard = ingester.state.write().await; + state_guard + .shards + .get_mut(&queue_id_01) + .unwrap() + .shard_state = ShardState::Closed; + drop(state_guard); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let value = ingester_ctx.cluster.get_self_key_value(&key).await.unwrap(); + + let shard_infos: ShardInfos = serde_json::from_str(&value).unwrap(); + assert_eq!(shard_infos.len(), 1); + + let shard_info = shard_infos.iter().next().unwrap(); + assert_eq!(shard_info.shard_state, ShardState::Closed); + + let mut state_guard = ingester.state.write().await; + state_guard.shards.remove(&queue_id_01).unwrap(); + drop(state_guard); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let value_opt = ingester_ctx.cluster.get_self_key_value(&key).await; + assert!(value_opt.is_none()); + } + #[tokio::test] async fn test_ingester_persist() { let (ingester_ctx, mut ingester) = IngesterForTest::default().build().await; diff --git a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs index d808381a6f2..2fa2b9bbdb6 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs @@ -17,12 +17,14 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +mod broadcast; mod fetch; mod ingester; mod models; mod mrecord; mod mrecordlog_utils; mod rate_limiter; +mod rate_meter; mod replication; mod router; mod shard_table; @@ -30,6 +32,10 @@ mod shard_table; mod test_utils; mod workbench; +use std::fmt; +use std::ops::Add; + +pub use broadcast::setup_local_shards_update_listener; use bytesize::ByteSize; use quickwit_common::tower::Pool; use quickwit_proto::ingest::ingester::IngesterServiceClient; @@ -57,6 +63,30 @@ pub(super) fn estimate_size(doc_batch: &DocBatchV2) -> ByteSize { ByteSize(estimate as u64) } +#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)] +pub struct RateMibPerSec(pub u16); + +impl fmt::Display for RateMibPerSec { + fn fmt(&self, f: &mut fmt::Formatter) -> std::fmt::Result { + write!(f, "{}MiB/s", self.0) + } +} + +impl PartialEq for RateMibPerSec { + fn eq(&self, other: &u16) -> bool { + self.0 == *other + } +} + +impl Add for RateMibPerSec { + type Output = RateMibPerSec; + + #[inline(always)] + fn add(self, rhs: RateMibPerSec) -> Self::Output { + RateMibPerSec(self.0 + rhs.0) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/quickwit/quickwit-ingest/src/ingest_v2/models.rs b/quickwit/quickwit-ingest/src/ingest_v2/models.rs index 4f1d3805a8c..578a8470b69 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/models.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/models.rs @@ -94,6 +94,10 @@ impl IngesterShard { } } + pub fn is_replica(&self) -> bool { + matches!(self.shard_type, IngesterShardType::Replica { .. }) + } + pub fn notify_new_records(&mut self) { // `new_records_tx` is guaranteed to be open because `self` also holds a receiver. self.new_records_tx diff --git a/quickwit/quickwit-ingest/src/ingest_v2/rate_limiter.rs b/quickwit/quickwit-ingest/src/ingest_v2/rate_limiter.rs index 1f310eca3b9..fc6a7c2abce 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/rate_limiter.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/rate_limiter.rs @@ -66,24 +66,25 @@ pub(super) struct RateLimiter { } impl RateLimiter { + /// Creates a new rate limiter from the given settings. pub fn from_settings(settings: RateLimiterSettings) -> Self { let capacity = settings.burst_limit.as_u64(); - let work = settings.rate_limit.work() as u128; let refill_period = settings.refill_period; - let rate_limit_period = settings.rate_limit.period(); - let refill_amount = work * refill_period.as_nanos() / rate_limit_period.as_nanos(); + let rate_limit = settings.rate_limit.rescale(refill_period); + let now = Instant::now(); Self { capacity, available: capacity, - refill_amount: refill_amount as u64, + refill_amount: rate_limit.work(), refill_period, refill_period_micros: refill_period.as_micros() as u64, - refill_at: Instant::now() + refill_period, + refill_at: now + refill_period, } } + /// Acquires some capacity from the rate limiter. Returns whether the capacity was available. pub fn acquire(&mut self, capacity: ByteSize) -> bool { if self.acquire_inner(capacity.as_u64()) { true diff --git a/quickwit/quickwit-ingest/src/ingest_v2/rate_meter.rs b/quickwit/quickwit-ingest/src/ingest_v2/rate_meter.rs new file mode 100644 index 00000000000..a531c0c3674 --- /dev/null +++ b/quickwit/quickwit-ingest/src/ingest_v2/rate_meter.rs @@ -0,0 +1,87 @@ +// Copyright (C) 2023 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::time::Instant; + +use quickwit_common::tower::ConstantRate; + +/// A naive rate meter that tracks how much work was performed during a period of time defined by +/// two successive calls to `harvest`. +#[derive(Debug, Clone)] +pub(super) struct RateMeter { + total_work: u64, + harvested_at: Instant, +} + +impl Default for RateMeter { + fn default() -> Self { + Self { + total_work: 0, + harvested_at: Instant::now(), + } + } +} + +impl RateMeter { + /// Increments the amount of work performed since the last call to `harvest`. + pub fn update(&mut self, work: u64) { + self.total_work += work; + } + + /// Returns the average work rate since the last call to this method and resets the internal + /// state. + pub fn harvest(&mut self) -> ConstantRate { + self.harvest_inner(Instant::now()) + } + + fn harvest_inner(&mut self, now: Instant) -> ConstantRate { + let elapsed = now.duration_since(self.harvested_at); + let rate = ConstantRate::new(self.total_work, elapsed); + self.total_work = 0; + self.harvested_at = now; + rate + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use quickwit_common::tower::Rate; + + use super::*; + + #[test] + fn test_rate_meter() { + let mut rate_meter = RateMeter::default(); + assert_eq!(rate_meter.total_work, 0); + + let now = Instant::now(); + rate_meter.harvested_at = now; + + let rate = rate_meter.harvest_inner(now + Duration::from_millis(100)); + assert_eq!(rate.work(), 0); + assert_eq!(rate.period(), Duration::from_millis(100)); + + rate_meter.update(1); + let rate = rate_meter.harvest_inner(now + Duration::from_millis(200)); + assert_eq!(rate.work(), 1); + assert_eq!(rate.period(), Duration::from_millis(100)); + } +} diff --git a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs index 8fd602352cf..50755fcab01 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs @@ -709,7 +709,7 @@ mod tests { let state = Arc::new(RwLock::new(IngesterState { mrecordlog, shards: HashMap::new(), - rate_limiters: HashMap::new(), + rate_trackers: HashMap::new(), replication_streams: HashMap::new(), replication_tasks: HashMap::new(), status: IngesterStatus::Ready, @@ -876,7 +876,7 @@ mod tests { let state = Arc::new(RwLock::new(IngesterState { mrecordlog, shards: HashMap::new(), - rate_limiters: HashMap::new(), + rate_trackers: HashMap::new(), replication_streams: HashMap::new(), replication_tasks: HashMap::new(), status: IngesterStatus::Ready, @@ -961,7 +961,7 @@ mod tests { let state = Arc::new(RwLock::new(IngesterState { mrecordlog, shards: HashMap::new(), - rate_limiters: HashMap::new(), + rate_trackers: HashMap::new(), replication_streams: HashMap::new(), replication_tasks: HashMap::new(), status: IngesterStatus::Ready, diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index 832212ad9f3..4da0c39faa3 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -25,6 +25,7 @@ use std::time::Duration; use async_trait::async_trait; use futures::stream::FuturesUnordered; use futures::{Future, StreamExt}; +use quickwit_common::pubsub::{EventBroker, EventSubscriber, EventSubscriptionHandle}; use quickwit_proto::control_plane::{ ControlPlaneService, ControlPlaneServiceClient, GetOrCreateOpenShardsRequest, GetOrCreateOpenShardsSubrequest, @@ -40,6 +41,7 @@ use quickwit_proto::types::{IndexUid, NodeId, ShardId, SourceId, SubrequestId}; use tokio::sync::RwLock; use tracing::{error, info, warn}; +use super::broadcast::LocalShardsUpdate; use super::ingester::PERSIST_REQUEST_TIMEOUT; use super::shard_table::ShardTable; use super::workbench::IngestWorkbench; @@ -65,6 +67,7 @@ pub struct IngestRouter { ingester_pool: IngesterPool, state: Arc>, replication_factor: usize, + _local_shards_update_subscription_handle: EventSubscriptionHandle, } struct RouterState { @@ -83,22 +86,27 @@ impl fmt::Debug for IngestRouter { impl IngestRouter { pub fn new( self_node_id: NodeId, + event_broker: EventBroker, control_plane: ControlPlaneServiceClient, ingester_pool: IngesterPool, replication_factor: usize, ) -> Self { - let state = RouterState { + let state = Arc::new(RwLock::new(RouterState { shard_table: ShardTable { self_node_id: self_node_id.clone(), table: HashMap::default(), }, - }; + })); + let _local_shards_update_subscription_handle = + event_broker.subscribe::(state.clone()); + Self { self_node_id, control_plane, ingester_pool, - state: Arc::new(RwLock::new(state)), + state, replication_factor, + _local_shards_update_subscription_handle, } } @@ -176,7 +184,7 @@ impl IngestRouter { let mut state_guard = self.state.write().await; for success in response.successes { - state_guard.shard_table.insert_shards( + state_guard.shard_table.set_shards( success.index_uid, success.source_id, success.open_shards, @@ -382,6 +390,27 @@ impl IngestRouterService for IngestRouter { } } +#[async_trait] +impl EventSubscriber for Arc> { + async fn handle_event(&mut self, local_shards_update: LocalShardsUpdate) { + // TODO: Insert the new shards in the shard table when `LocalShardsUpdate` also carries the + // leader ID. + let index_uid = local_shards_update.source_uid.index_uid; + let source_id = local_shards_update.source_uid.source_id; + let closed_shard_ids: Vec = local_shards_update + .shard_infos + .into_iter() + .filter(|shard_info| shard_info.shard_state.is_closed()) + .map(|shard_info| shard_info.shard_id) + .collect(); + + self.write() + .await + .shard_table + .close_shards(&index_uid, &source_id, &closed_shard_ids); + } +} + struct PersistRequestSummary { leader_id: NodeId, subrequest_ids: Vec, @@ -389,6 +418,7 @@ struct PersistRequestSummary { #[cfg(test)] mod tests { + use std::collections::BTreeSet; use std::iter; use std::sync::atomic::AtomicUsize; @@ -401,20 +431,24 @@ mod tests { }; use quickwit_proto::ingest::router::IngestSubrequest; use quickwit_proto::ingest::{CommitTypeV2, DocBatchV2, Shard, ShardState}; - use quickwit_proto::types::Position; + use quickwit_proto::types::{Position, SourceUid}; use super::*; + use crate::ingest_v2::broadcast::ShardInfo; use crate::ingest_v2::shard_table::ShardTableEntry; use crate::ingest_v2::workbench::SubworkbenchFailure; + use crate::RateMibPerSec; #[tokio::test] async fn test_router_make_get_or_create_open_shard_request() { let self_node_id = "test-router".into(); + let event_broker = EventBroker::default(); let control_plane: ControlPlaneServiceClient = ControlPlaneServiceClient::mock().into(); let ingester_pool = IngesterPool::default(); let replication_factor = 1; let router = IngestRouter::new( self_node_id, + event_broker, control_plane, ingester_pool.clone(), replication_factor, @@ -525,6 +559,7 @@ mod tests { #[tokio::test] async fn test_router_populate_shard_table() { let self_node_id = "test-router".into(); + let event_broker = EventBroker::default(); let mut control_plane_mock = ControlPlaneServiceClient::mock(); control_plane_mock @@ -601,6 +636,7 @@ mod tests { let replication_factor = 1; let mut router = IngestRouter::new( self_node_id, + event_broker, control_plane, ingester_pool.clone(), replication_factor, @@ -708,11 +744,13 @@ mod tests { #[tokio::test] async fn test_router_process_persist_results_record_persist_successes() { let self_node_id = "test-router".into(); + let event_broker = EventBroker::default(); let control_plane = ControlPlaneServiceClient::mock().into(); let ingester_pool = IngesterPool::default(); let replication_factor = 1; let mut router = IngestRouter::new( self_node_id, + event_broker, control_plane, ingester_pool.clone(), replication_factor, @@ -758,11 +796,13 @@ mod tests { #[tokio::test] async fn test_router_process_persist_results_record_persist_failures() { let self_node_id = "test-router".into(); + let event_broker = EventBroker::default(); let control_plane = ControlPlaneServiceClient::mock().into(); let ingester_pool = IngesterPool::default(); let replication_factor = 1; let mut router = IngestRouter::new( self_node_id, + event_broker, control_plane, ingester_pool.clone(), replication_factor, @@ -808,17 +848,19 @@ mod tests { #[tokio::test] async fn test_router_process_persist_results_closes_shards() { let self_node_id = "test-router".into(); + let event_broker = EventBroker::default(); let control_plane = ControlPlaneServiceClient::mock().into(); let ingester_pool = IngesterPool::default(); let replication_factor = 1; let mut router = IngestRouter::new( self_node_id, + event_broker, control_plane, ingester_pool.clone(), replication_factor, ); let mut state_guard = router.state.write().await; - state_guard.shard_table.insert_shards( + state_guard.shard_table.set_shards( "test-index-0:0", "test-source", vec![Shard { @@ -872,6 +914,7 @@ mod tests { #[tokio::test] async fn test_router_process_persist_results_removes_unavailable_leaders() { let self_node_id = "test-router".into(); + let event_broker = EventBroker::default(); let control_plane = ControlPlaneServiceClient::mock().into(); let ingester_pool = IngesterPool::default(); @@ -887,6 +930,7 @@ mod tests { let replication_factor = 1; let mut router = IngestRouter::new( self_node_id, + event_broker, control_plane, ingester_pool.clone(), replication_factor, @@ -952,17 +996,19 @@ mod tests { #[tokio::test] async fn test_router_ingest() { let self_node_id = "test-router".into(); + let event_broker = EventBroker::default(); let control_plane = ControlPlaneServiceClient::mock().into(); let ingester_pool = IngesterPool::default(); let replication_factor = 1; let mut router = IngestRouter::new( self_node_id, + event_broker, control_plane, ingester_pool.clone(), replication_factor, ); let mut state_guard = router.state.write().await; - state_guard.shard_table.insert_shards( + state_guard.shard_table.set_shards( "test-index-0:0", "test-source", vec![Shard { @@ -973,7 +1019,7 @@ mod tests { ..Default::default() }], ); - state_guard.shard_table.insert_shards( + state_guard.shard_table.set_shards( "test-index-1:0", "test-source", vec![ @@ -1163,17 +1209,19 @@ mod tests { #[tokio::test] async fn test_router_ingest_retry() { let self_node_id = "test-router".into(); + let event_broker = EventBroker::default(); let control_plane = ControlPlaneServiceClient::mock().into(); let ingester_pool = IngesterPool::default(); let replication_factor = 1; let mut router = IngestRouter::new( self_node_id, + event_broker, control_plane, ingester_pool.clone(), replication_factor, ); let mut state_guard = router.state.write().await; - state_guard.shard_table.insert_shards( + state_guard.shard_table.set_shards( "test-index-0:0", "test-source", vec![Shard { @@ -1265,4 +1313,60 @@ mod tests { }; router.ingest(ingest_request).await.unwrap(); } + + #[tokio::test] + async fn test_router_closes_shards_on_local_shards_update() { + let self_node_id = "test-router".into(); + let event_broker = EventBroker::default(); + let control_plane = ControlPlaneServiceClient::mock().into(); + let ingester_pool = IngesterPool::default(); + let replication_factor = 1; + let router = IngestRouter::new( + self_node_id, + event_broker.clone(), + control_plane, + ingester_pool.clone(), + replication_factor, + ); + + let mut state_guard = router.state.write().await; + state_guard.shard_table.set_shards( + "test-index-0:0", + "test-source", + vec![Shard { + index_uid: "test-index-0:0".to_string(), + shard_id: 1, + shard_state: ShardState::Open as i32, + leader_id: "test-ingester".to_string(), + ..Default::default() + }], + ); + drop(state_guard); + + let local_shards_update = LocalShardsUpdate { + source_uid: SourceUid { + index_uid: "test-index-0:0".into(), + source_id: "test-source".to_string(), + }, + shard_infos: BTreeSet::from_iter([ShardInfo { + shard_id: 1, + shard_state: ShardState::Closed, + ingestion_rate: RateMibPerSec(0), + }]), + }; + event_broker.publish(local_shards_update); + + // Wait for the router to process the event. + tokio::time::sleep(Duration::from_millis(50)).await; + + let state_guard = router.state.read().await; + let shards = state_guard + .shard_table + .find_entry("test-index-0", "test-source") + .unwrap() + .shards(); + assert_eq!(shards.len(), 1); + assert_eq!(shards[0].shard_id, 1); + assert_eq!(shards[0].shard_state, ShardState::Closed as i32); + } } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/shard_table.rs b/quickwit/quickwit-ingest/src/ingest_v2/shard_table.rs index 3ea210f80e3..3fad37f5ab6 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/shard_table.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/shard_table.rs @@ -215,7 +215,8 @@ impl ShardTable { } } - pub fn insert_shards( + /// Sets the shards for the given index and source. + pub fn set_shards( &mut self, index_uid: impl Into, source_id: impl Into, diff --git a/quickwit/quickwit-proto/src/ingest/mod.rs b/quickwit/quickwit-proto/src/ingest/mod.rs index 75b3ff0fab4..ed4cf5f886c 100644 --- a/quickwit/quickwit-proto/src/ingest/mod.rs +++ b/quickwit/quickwit-proto/src/ingest/mod.rs @@ -191,6 +191,25 @@ impl ShardState { pub fn is_closed(&self) -> bool { *self == ShardState::Closed } + + pub fn as_json_str_name(&self) -> &'static str { + match self { + ShardState::Unspecified => "unspecified", + ShardState::Open => "open", + ShardState::Unavailable => "unavailable", + ShardState::Closed => "closed", + } + } + + pub fn from_json_str_name(shard_state_json_name: &str) -> Option { + match shard_state_json_name { + "unspecified" => Some(Self::Unspecified), + "open" => Some(Self::Open), + "unavailable" => Some(Self::Unavailable), + "closed" => Some(Self::Closed), + _ => None, + } + } } impl ShardIds { @@ -200,3 +219,29 @@ impl ShardIds { .map(|shard_id| queue_id(&self.index_uid, &self.source_id, *shard_id)) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_shard_state_json_str_name() { + let shard_state_json_name = ShardState::Unspecified.as_json_str_name(); + let shard_state = ShardState::from_json_str_name(shard_state_json_name).unwrap(); + assert_eq!(shard_state, ShardState::Unspecified); + + let shard_state_json_name = ShardState::Open.as_json_str_name(); + let shard_state = ShardState::from_json_str_name(shard_state_json_name).unwrap(); + assert_eq!(shard_state, ShardState::Open); + + let shard_state_json_name = ShardState::Unavailable.as_json_str_name(); + let shard_state = ShardState::from_json_str_name(shard_state_json_name).unwrap(); + assert_eq!(shard_state, ShardState::Unavailable); + + let shard_state_json_name = ShardState::Closed.as_json_str_name(); + let shard_state = ShardState::from_json_str_name(shard_state_json_name).unwrap(); + assert_eq!(shard_state, ShardState::Closed); + + assert!(ShardState::from_json_str_name("unknown").is_none()); + } +} diff --git a/quickwit/quickwit-proto/src/types/mod.rs b/quickwit/quickwit-proto/src/types/mod.rs index c950031ee37..f38714eceff 100644 --- a/quickwit/quickwit-proto/src/types/mod.rs +++ b/quickwit/quickwit-proto/src/types/mod.rs @@ -164,7 +164,7 @@ impl From for IndexUid { Ok(index_uid) => index_uid, Err(invalid_index_uid) => { panic!( - "invalid index uid {}", + "invalid index UID {}", invalid_index_uid.invalid_index_uid_str ); } diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index bf4c22591e9..7f2c6b27472 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -53,7 +53,9 @@ pub use format::BodyFormat; use futures::{Stream, StreamExt}; use itertools::Itertools; use quickwit_actors::{ActorExitStatus, Mailbox, Universe}; -use quickwit_cluster::{start_cluster_service, Cluster, ClusterChange, ClusterMember}; +use quickwit_cluster::{ + start_cluster_service, Cluster, ClusterChange, ClusterMember, ListenerHandle, +}; use quickwit_common::pubsub::{EventBroker, EventSubscriptionHandle}; use quickwit_common::runtimes::RuntimesConfig; use quickwit_common::tower::{ @@ -68,8 +70,9 @@ use quickwit_index_management::{IndexService as IndexManager, IndexServiceError} use quickwit_indexing::actors::IndexingService; use quickwit_indexing::start_indexing_service; use quickwit_ingest::{ - start_ingest_api_service, wait_for_ingester_decommission, GetMemoryCapacity, IngestApiService, - IngestRequest, IngestRouter, IngestServiceClient, Ingester, IngesterPool, RateLimiterSettings, + setup_local_shards_update_listener, start_ingest_api_service, wait_for_ingester_decommission, + GetMemoryCapacity, IngestApiService, IngestRequest, IngestRouter, IngestServiceClient, + Ingester, IngesterPool, RateLimiterSettings, }; use quickwit_janitor::{start_janitor_service, JanitorService}; use quickwit_metastore::{ @@ -130,9 +133,10 @@ struct QuickwitServices { /// the root requests. pub search_service: Arc, - /// The control plane listens to metastore events. + /// The control plane listens to various events. /// We must maintain a reference to the subscription handles to continue receiving /// notifications. Otherwise, the subscriptions are dropped. + _local_shards_update_listener_handle_opt: Option, _report_splits_subscription_handle_opt: Option, } @@ -363,6 +367,7 @@ pub async fn serve_quickwit( let (ingest_router_service, ingester_service_opt) = setup_ingest_v2( &node_config, &cluster, + event_broker.clone(), control_plane_service.clone(), ingester_pool, ) @@ -426,6 +431,17 @@ pub async fn serve_quickwit( ) .await?; + // The control plane listens for local shards updates to learn about each shard's ingestion + // throughput. Ingesters (routers) do so to update their shard table. + let local_shards_update_listener_handle_opt = if node_config + .is_service_enabled(QuickwitService::ControlPlane) + || node_config.is_service_enabled(QuickwitService::Indexer) + { + Some(setup_local_shards_update_listener(cluster.clone(), event_broker.clone()).await) + } else { + None + }; + let report_splits_subscription_handle_opt = // DISCLAIMER: This is quirky here: We base our decision to forward the split report depending // on the current searcher configuration. @@ -459,6 +475,7 @@ pub async fn serve_quickwit( metastore_server_opt, metastore_client: metastore_through_control_plane.clone(), control_plane_service, + _local_shards_update_listener_handle_opt: local_shards_update_listener_handle_opt, _report_splits_subscription_handle_opt: report_splits_subscription_handle_opt, index_manager, indexing_service_opt, @@ -550,6 +567,7 @@ pub async fn serve_quickwit( async fn setup_ingest_v2( config: &NodeConfig, cluster: &Cluster, + event_broker: EventBroker, control_plane: ControlPlaneServiceClient, ingester_pool: IngesterPool, ) -> anyhow::Result<(IngestRouterServiceClient, Option)> { @@ -562,6 +580,7 @@ async fn setup_ingest_v2( .get(); let ingest_router = IngestRouter::new( self_node_id.clone(), + event_broker, control_plane, ingester_pool.clone(), replication_factor, @@ -584,7 +603,7 @@ async fn setup_ingest_v2( let wal_dir_path = config.data_dir_path.join("wal"); fs::create_dir_all(&wal_dir_path)?; let ingester = Ingester::try_new( - self_node_id.clone(), + cluster.clone(), ingester_pool.clone(), &wal_dir_path, config.ingest_api_config.max_queue_disk_usage,