diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index a96b476948e..d08f2a51a47 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -5534,6 +5534,7 @@ dependencies = [ "futures", "itertools 0.12.1", "once_cell", + "pin-project", "quickwit-common", "quickwit-config", "quickwit-proto", @@ -5684,7 +5685,6 @@ dependencies = [ "prost", "quickwit-actors", "quickwit-cluster", - "quickwit-codegen", "quickwit-common", "quickwit-config", "quickwit-indexing", diff --git a/quickwit/quickwit-cluster/Cargo.toml b/quickwit/quickwit-cluster/Cargo.toml index bb123c81300..2fc26418097 100644 --- a/quickwit/quickwit-cluster/Cargo.toml +++ b/quickwit/quickwit-cluster/Cargo.toml @@ -17,6 +17,7 @@ chitchat = { workspace = true } futures = { workspace = true } itertools = { workspace = true } once_cell = { workspace = true } +pin-project = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } thiserror = { workspace = true } diff --git a/quickwit/quickwit-cluster/src/change.rs b/quickwit/quickwit-cluster/src/change.rs index c06bca93b72..ae08d412187 100644 --- a/quickwit/quickwit-cluster/src/change.rs +++ b/quickwit/quickwit-cluster/src/change.rs @@ -19,17 +19,24 @@ use std::collections::btree_map::Entry; use std::collections::BTreeMap; +use std::pin::Pin; +use std::task::{Context, Poll}; use chitchat::{ChitchatId, NodeState}; +use futures::Stream; +use pin_project::pin_project; use quickwit_common::sorted_iter::{KeyDiff, SortedByKeyIterator}; use quickwit_common::tower::{make_channel, warmup_channel}; use quickwit_proto::types::NodeId; +use tokio::sync::mpsc; +use tokio_stream::wrappers::UnboundedReceiverStream; use tonic::transport::Channel; use tracing::{info, warn}; use crate::member::NodeStateExt; use crate::ClusterNode; +/// Describes a change in the cluster. #[derive(Debug, Clone)] pub enum ClusterChange { Add(ClusterNode), @@ -37,6 +44,31 @@ pub enum ClusterChange { Remove(ClusterNode), } +#[pin_project] +pub struct ClusterChangeStream(#[pin] UnboundedReceiverStream); + +impl ClusterChangeStream { + pub fn new_unbounded() -> (Self, mpsc::UnboundedSender) { + let (change_stream_tx, change_stream_rx) = mpsc::unbounded_channel(); + ( + Self(UnboundedReceiverStream::new(change_stream_rx)), + change_stream_tx, + ) + } +} + +impl Stream for ClusterChangeStream { + type Item = ClusterChange; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().0.poll_next(cx) + } +} + +pub trait ClusterChangeStreamFactory: Clone + Send + 'static { + fn create(&self) -> ClusterChangeStream; +} + /// Compares the digests of the previous and new set of lives nodes, identifies the changes that /// occurred in the cluster, and emits the corresponding events, focusing on ready nodes only. pub(crate) async fn compute_cluster_change_events( @@ -294,6 +326,34 @@ async fn try_new_node( } } +#[cfg(any(test, feature = "testsuite"))] +pub mod for_test { + use std::sync::{Arc, Mutex}; + + use tokio::sync::mpsc; + + use super::*; + + #[derive(Clone, Default)] + pub struct ClusterChangeStreamFactoryForTest { + inner: Arc>>>, + } + + impl ClusterChangeStreamFactoryForTest { + pub fn change_stream_tx(&self) -> mpsc::UnboundedSender { + self.inner.lock().unwrap().take().unwrap() + } + } + + impl ClusterChangeStreamFactory for ClusterChangeStreamFactoryForTest { + fn create(&self) -> ClusterChangeStream { + let (change_stream, change_stream_tx) = ClusterChangeStream::new_unbounded(); + *self.inner.lock().unwrap() = Some(change_stream_tx); + change_stream + } + } +} + #[cfg(test)] mod tests { use std::collections::HashSet; diff --git a/quickwit/quickwit-cluster/src/cluster.rs b/quickwit/quickwit-cluster/src/cluster.rs index 9dc17e3a61f..36691a8e7bb 100644 --- a/quickwit/quickwit-cluster/src/cluster.rs +++ b/quickwit/quickwit-cluster/src/cluster.rs @@ -30,25 +30,24 @@ use chitchat::{ spawn_chitchat, Chitchat, ChitchatConfig, ChitchatHandle, ChitchatId, ClusterStateSnapshot, FailureDetectorConfig, KeyChangeEvent, ListenerHandle, NodeState, }; -use futures::Stream; use itertools::Itertools; use quickwit_proto::indexing::{IndexingPipelineId, IndexingTask, PipelineMetrics}; use quickwit_proto::types::{NodeId, PipelineUid, ShardId}; use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc, watch, Mutex, RwLock}; use tokio::time::timeout; -use tokio_stream::wrappers::{UnboundedReceiverStream, WatchStream}; +use tokio_stream::wrappers::WatchStream; use tokio_stream::StreamExt; use tracing::{info, warn}; -use crate::change::{compute_cluster_change_events, ClusterChange}; +use crate::change::{compute_cluster_change_events, ClusterChange, ClusterChangeStreamFactory}; use crate::member::{ build_cluster_member, ClusterMember, NodeStateExt, ENABLED_SERVICES_KEY, GRPC_ADVERTISE_ADDR_KEY, PIPELINE_METRICS_PREFIX, READINESS_KEY, READINESS_VALUE_NOT_READY, READINESS_VALUE_READY, }; use crate::metrics::spawn_metrics_task; -use crate::ClusterNode; +use crate::{ClusterChangeStream, ClusterNode}; const MARKED_FOR_DELETION_GRACE_PERIOD: usize = if cfg!(any(test, feature = "testsuite")) { 100 // ~ HEARTBEAT * 100 = 2.5 seconds. @@ -197,20 +196,23 @@ impl Cluster { } /// Returns a stream of changes affecting the set of ready nodes in the cluster. - pub async fn ready_nodes_change_stream(&self) -> impl Stream { - // The subscriber channel must be unbounded because we do no want to block when sending the - // events. - let (change_stream_tx, change_stream_rx) = mpsc::unbounded_channel(); - let mut inner = self.inner.write().await; - for node in inner.live_nodes.values() { - if node.is_ready() { - change_stream_tx - .send(ClusterChange::Add(node.clone())) - .expect("The receiver end of the channel should be open."); + pub fn change_stream(&self) -> ClusterChangeStream { + let (change_stream, change_stream_tx) = ClusterChangeStream::new_unbounded(); + let inner = self.inner.clone(); + // We spawn a task so the signature of this function is sync. + let future = async move { + let mut inner = inner.write().await; + for node in inner.live_nodes.values() { + if node.is_ready() { + change_stream_tx + .send(ClusterChange::Add(node.clone())) + .expect("receiver end of the channel should be open"); + } } - } - inner.change_stream_subscribers.push(change_stream_tx); - UnboundedReceiverStream::new(change_stream_rx) + inner.change_stream_subscribers.push(change_stream_tx); + }; + tokio::spawn(future); + change_stream } /// Returns whether the self node is ready. @@ -387,6 +389,12 @@ impl Cluster { } } +impl ClusterChangeStreamFactory for Cluster { + fn create(&self) -> ClusterChangeStream { + self.change_stream() + } +} + /// Deprecated: this is going away soon. fn spawn_ready_members_task( cluster_id: String, @@ -787,7 +795,7 @@ mod tests { async fn test_cluster_multiple_nodes() -> anyhow::Result<()> { let transport = ChannelTransport::default(); let node_1 = create_cluster_for_test(Vec::new(), &[], &transport, true).await?; - let node_1_change_stream = node_1.ready_nodes_change_stream().await; + let node_1_change_stream = node_1.change_stream(); let peer_seeds = vec![node_1.gossip_listen_addr.to_string()]; let node_2 = create_cluster_for_test(peer_seeds, &[], &transport, true).await?; diff --git a/quickwit/quickwit-cluster/src/lib.rs b/quickwit/quickwit-cluster/src/lib.rs index ab98c004f35..47ddee6c07f 100644 --- a/quickwit/quickwit-cluster/src/lib.rs +++ b/quickwit/quickwit-cluster/src/lib.rs @@ -39,7 +39,9 @@ use quickwit_proto::indexing::CpuCapacity; use quickwit_proto::types::NodeId; use time::OffsetDateTime; -pub use crate::change::ClusterChange; +#[cfg(any(test, feature = "testsuite"))] +pub use crate::change::for_test::*; +pub use crate::change::{ClusterChange, ClusterChangeStream, ClusterChangeStreamFactory}; #[cfg(any(test, feature = "testsuite"))] pub use crate::cluster::{ create_cluster_for_test, create_cluster_for_test_with_id, grpc_addr_from_listen_addr_for_test, diff --git a/quickwit/quickwit-config/src/cluster_config/mod.rs b/quickwit/quickwit-config/src/cluster_config/mod.rs index 34e7c1bb9a7..6dda71921ee 100644 --- a/quickwit/quickwit-config/src/cluster_config/mod.rs +++ b/quickwit/quickwit-config/src/cluster_config/mod.rs @@ -20,7 +20,7 @@ use quickwit_common::uri::Uri; /// An embryo of a cluster config. -// TODO: Version object. +// TODO: Move to `quickwit-config` and version object. #[derive(Debug, Clone)] pub struct ClusterConfig { pub cluster_id: String, diff --git a/quickwit/quickwit-control-plane/Cargo.toml b/quickwit/quickwit-control-plane/Cargo.toml index 23effe3a621..0a749ed2091 100644 --- a/quickwit/quickwit-control-plane/Cargo.toml +++ b/quickwit/quickwit-control-plane/Cargo.toml @@ -35,6 +35,7 @@ tracing = { workspace = true } ulid = { workspace = true } quickwit-actors = { workspace = true } +quickwit-cluster = { workspace = true } quickwit-common = { workspace = true } quickwit-config = { workspace = true } quickwit-ingest = { workspace = true } @@ -56,8 +57,5 @@ quickwit-metastore = { workspace = true, features = ["testsuite"] } quickwit-proto = { workspace = true, features = ["testsuite"] } quickwit-storage = { workspace = true, features = ["testsuite"] } -[build-dependencies] -quickwit-codegen = { workspace = true } - [features] testsuite = ["mockall"] diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index 9c76b95cb30..339db703691 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -30,9 +30,13 @@ use quickwit_actors::{ Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, Mailbox, Supervisor, Universe, WeakMailbox, }; +use quickwit_cluster::{ + ClusterChange, ClusterChangeStream, ClusterChangeStreamFactory, ClusterNode, +}; use quickwit_common::pubsub::EventSubscriber; use quickwit_common::uri::Uri; use quickwit_common::Progress; +use quickwit_config::service::QuickwitService; use quickwit_config::{ClusterConfig, IndexConfig, IndexTemplate, SourceConfig}; use quickwit_ingest::{IngesterPool, LocalShardsUpdate}; use quickwit_metastore::IndexMetadata; @@ -51,7 +55,7 @@ use quickwit_proto::metastore::{ }; use quickwit_proto::types::{IndexUid, NodeId, ShardId, SourceUid}; use serde::Serialize; -use tracing::error; +use tracing::{error, info}; use crate::debouncer::Debouncer; use crate::indexing_scheduler::{IndexingScheduler, IndexingSchedulerState}; @@ -77,6 +81,7 @@ struct RebuildPlan; pub struct ControlPlane { cluster_config: ClusterConfig, + cluster_change_stream_opt: Option, // The control plane state is split into to independent functions, that we naturally isolated // code wise and state wise. // @@ -103,23 +108,23 @@ impl ControlPlane { universe: &Universe, cluster_config: ClusterConfig, self_node_id: NodeId, + cluster_change_stream_factory: impl ClusterChangeStreamFactory, indexer_pool: IndexerPool, ingester_pool: IngesterPool, metastore: MetastoreServiceClient, ) -> (Mailbox, ActorHandle>) { universe.spawn_builder().supervise_fn(move || { - let indexing_scheduler = IndexingScheduler::new( - cluster_config.cluster_id.clone(), - self_node_id.clone(), - indexer_pool.clone(), - ); - let ingest_controller = IngestController::new( - metastore.clone(), - ingester_pool.clone(), - cluster_config.replication_factor, - ); + let cluster_id = cluster_config.cluster_id.clone(); + let replication_factor = cluster_config.replication_factor; + + let indexing_scheduler = + IndexingScheduler::new(cluster_id, self_node_id.clone(), indexer_pool.clone()); + let ingest_controller = + IngestController::new(metastore.clone(), ingester_pool.clone(), replication_factor); + ControlPlane { cluster_config: cluster_config.clone(), + cluster_change_stream_opt: Some(cluster_change_stream_factory.create()), indexing_scheduler, ingest_controller, metastore: metastore.clone(), @@ -164,6 +169,12 @@ impl Actor for ControlPlane { ctx.schedule_self_msg(CONTROL_PLAN_LOOP_INTERVAL, ControlPlanLoop); + let weak_mailbox = ctx.mailbox().downgrade(); + let cluster_change_stream = self + .cluster_change_stream_opt + .take() + .expect("`initialize` should be called only once"); + spawn_watch_indexers_task(weak_mailbox, cluster_change_stream); Ok(()) } } @@ -748,10 +759,97 @@ fn apply_index_template_match( Ok(index_config) } +/// The indexer joined the cluster. +#[derive(Debug)] +struct IndexerJoined(ClusterNode); + +#[async_trait] +impl Handler for ControlPlane { + type Reply = (); + + async fn handle( + &mut self, + message: IndexerJoined, + _ctx: &ActorContext, + ) -> Result { + info!( + "indexer joined `{}` the cluster: rebuilding indexing plan", + message.0.node_id() + ); + // TODO: + // 1. Update shard table. + // 2. Rebalance shards if necessary. + self.indexing_scheduler.rebuild_plan(&self.model); + Ok(()) + } +} + +/// The indexer left the cluster. +#[derive(Debug)] +struct IndexerLeft(ClusterNode); + +#[async_trait] +impl Handler for ControlPlane { + type Reply = (); + + async fn handle( + &mut self, + message: IndexerLeft, + _ctx: &ActorContext, + ) -> Result { + info!( + "indexer left `{}` the cluster: rebuilding indexing plan", + message.0.node_id() + ); + // 1. Update shard table. + // 2. Rebalance shards if necessary. + self.indexing_scheduler.rebuild_plan(&self.model); + Ok(()) + } +} + +fn spawn_watch_indexers_task( + weak_mailbox: WeakMailbox, + cluster_change_stream: ClusterChangeStream, +) { + tokio::spawn(watcher_indexers(weak_mailbox, cluster_change_stream)); +} + +async fn watcher_indexers( + weak_mailbox: WeakMailbox, + mut cluster_change_stream: ClusterChangeStream, +) { + while let Some(cluster_change) = cluster_change_stream.next().await { + let Some(mailbox) = weak_mailbox.upgrade() else { + return; + }; + match cluster_change { + ClusterChange::Add(node) => { + if node.enabled_services().contains(&QuickwitService::Indexer) { + if let Err(error) = mailbox.send_message(IndexerJoined(node)).await { + error!(error=%error, "failed to forward `IndexerJoined` event to control plane"); + } + } + } + ClusterChange::Remove(node) => { + if node.enabled_services().contains(&QuickwitService::Indexer) { + if let Err(error) = mailbox.send_message(IndexerLeft(node)).await { + error!(error=%error, "failed to forward `IndexerLeft` event to control plane"); + } + } + } + ClusterChange::Update(_) => { + // We are not interested in updates (yet). + } + } + } +} + #[cfg(test)] mod tests { use mockall::Sequence; use quickwit_actors::{AskError, Observe, SupervisorMetrics}; + use quickwit_cluster::ClusterChangeStreamFactoryForTest; use quickwit_config::{IndexConfig, SourceParams, CLI_SOURCE_ID, INGEST_V2_SOURCE_ID}; use quickwit_indexing::IndexingService; use quickwit_metastore::{ @@ -805,10 +903,12 @@ mod tests { Ok(ListIndexesMetadataResponse::try_from_indexes_metadata(Vec::new()).unwrap()) }); let cluster_config = ClusterConfig::for_test(); + let cluster_change_stream_factory = ClusterChangeStreamFactoryForTest::default(); let (control_plane_mailbox, _control_plane_handle) = ControlPlane::spawn( &universe, cluster_config, self_node_id, + cluster_change_stream_factory, indexer_pool, ingester_pool, MetastoreServiceClient::from(mock_metastore), @@ -848,10 +948,12 @@ mod tests { }); let cluster_config = ClusterConfig::for_test(); + let cluster_change_stream_factory = ClusterChangeStreamFactoryForTest::default(); let (control_plane_mailbox, _control_plane_handle) = ControlPlane::spawn( &universe, cluster_config, self_node_id, + cluster_change_stream_factory, indexer_pool, ingester_pool, MetastoreServiceClient::from(mock_metastore), @@ -900,10 +1002,12 @@ mod tests { }); let cluster_config = ClusterConfig::for_test(); + let cluster_change_stream_factory = ClusterChangeStreamFactoryForTest::default(); let (control_plane_mailbox, _control_plane_handle) = ControlPlane::spawn( &universe, cluster_config, self_node_id, + cluster_change_stream_factory, indexer_pool, ingester_pool, MetastoreServiceClient::from(mock_metastore), @@ -965,10 +1069,12 @@ mod tests { }); let cluster_config = ClusterConfig::for_test(); + let cluster_change_stream_factory = ClusterChangeStreamFactoryForTest::default(); let (control_plane_mailbox, _control_plane_handle) = ControlPlane::spawn( &universe, cluster_config, self_node_id, + cluster_change_stream_factory, indexer_pool, ingester_pool, MetastoreServiceClient::from(mock_metastore), @@ -1021,10 +1127,12 @@ mod tests { }); let cluster_config = ClusterConfig::for_test(); + let cluster_change_stream_factory = ClusterChangeStreamFactoryForTest::default(); let (control_plane_mailbox, _control_plane_handle) = ControlPlane::spawn( &universe, cluster_config, self_node_id, + cluster_change_stream_factory, indexer_pool, ingester_pool, MetastoreServiceClient::from(mock_metastore), @@ -1091,10 +1199,12 @@ mod tests { }); let cluster_config = ClusterConfig::for_test(); + let cluster_change_stream_factory = ClusterChangeStreamFactoryForTest::default(); let (control_plane_mailbox, _control_plane_handle) = ControlPlane::spawn( &universe, cluster_config, self_node_id, + cluster_change_stream_factory, indexer_pool, ingester_pool, MetastoreServiceClient::from(mock_metastore), @@ -1181,10 +1291,12 @@ mod tests { ); let cluster_config = ClusterConfig::for_test(); + let cluster_change_stream_factory = ClusterChangeStreamFactoryForTest::default(); let (control_plane_mailbox, control_plane_handle) = ControlPlane::spawn( &universe, cluster_config, node_id, + cluster_change_stream_factory, indexer_pool, ingester_pool, MetastoreServiceClient::from(mock_metastore), @@ -1319,10 +1431,12 @@ mod tests { ); let cluster_config = ClusterConfig::for_test(); + let cluster_change_stream_factory = ClusterChangeStreamFactoryForTest::default(); let (control_plane_mailbox, _control_plane_handle) = ControlPlane::spawn( &universe, cluster_config, node_id, + cluster_change_stream_factory, indexer_pool, ingester_pool, MetastoreServiceClient::from(mock_metastore), @@ -1445,10 +1559,12 @@ mod tests { ); let cluster_config = ClusterConfig::for_test(); + let cluster_change_stream_factory = ClusterChangeStreamFactoryForTest::default(); let (control_plane_mailbox, _control_plane_handle) = ControlPlane::spawn( &universe, cluster_config, node_id, + cluster_change_stream_factory, indexer_pool, ingester_pool, MetastoreServiceClient::from(mock_metastore), @@ -1560,10 +1676,12 @@ mod tests { ingester_pool.insert("node1".into(), ingester_mock.into()); let cluster_config = ClusterConfig::for_test(); + let cluster_change_stream_factory = ClusterChangeStreamFactoryForTest::default(); let (control_plane_mailbox, _control_plane_handle) = ControlPlane::spawn( &universe, cluster_config, node_id, + cluster_change_stream_factory, indexer_pool, ingester_pool, MetastoreServiceClient::from(mock_metastore), @@ -1652,10 +1770,12 @@ mod tests { }, ); let cluster_config = ClusterConfig::for_test(); + let cluster_change_stream_factory = ClusterChangeStreamFactoryForTest::default(); let (control_plane_mailbox, _control_plane_handle) = ControlPlane::spawn( &universe, cluster_config, node_id, + cluster_change_stream_factory, indexer_pool, ingester_pool, MetastoreServiceClient::from(mock_metastore), @@ -1681,6 +1801,7 @@ mod tests { cluster_config.auto_create_indexes = true; let node_id = NodeId::from("test-node"); + let cluster_change_stream_factory = ClusterChangeStreamFactoryForTest::default(); let indexer_pool = IndexerPool::default(); let ingester_pool = IngesterPool::default(); @@ -1739,6 +1860,7 @@ mod tests { &universe, cluster_config, node_id, + cluster_change_stream_factory, indexer_pool, ingester_pool, MetastoreServiceClient::from(mock_metastore), diff --git a/quickwit/quickwit-control-plane/src/metrics.rs b/quickwit/quickwit-control-plane/src/metrics.rs index 6b2167ccaec..69a1caa5c51 100644 --- a/quickwit/quickwit-control-plane/src/metrics.rs +++ b/quickwit/quickwit-control-plane/src/metrics.rs @@ -38,7 +38,7 @@ impl Default for ControlPlaneMetrics { ), schedule_total: new_counter( "schedule_total", - "Number of control plane `schedule` operation.", + "Number of control plane `schedule` operations.", "control_plane", ), metastore_error_aborted: new_counter( diff --git a/quickwit/quickwit-control-plane/src/tests.rs b/quickwit/quickwit-control-plane/src/tests.rs index 904cbdbb55f..2f050dd7a48 100644 --- a/quickwit/quickwit-control-plane/src/tests.rs +++ b/quickwit/quickwit-control-plane/src/tests.rs @@ -131,7 +131,6 @@ async fn start_control_plane( let indexer_pool = Pool::default(); let ingester_pool = Pool::default(); - let change_stream = cluster.ready_nodes_change_stream().await; let mut indexing_clients = FnvHashMap::default(); for indexer in indexers { @@ -139,7 +138,8 @@ async fn start_control_plane( indexing_clients.insert(indexer.self_node_id().to_string(), indexing_service_mailbox); indexer_inboxes.push(indexing_service_inbox); } - let indexer_change_stream = test_indexer_change_stream(change_stream, indexing_clients); + let indexer_change_stream = + test_indexer_change_stream(cluster.change_stream(), indexing_clients); indexer_pool.listen_for_changes(indexer_change_stream); let mut cluster_config = ClusterConfig::for_test(); @@ -150,6 +150,7 @@ async fn start_control_plane( universe, cluster_config, self_node_id, + cluster, indexer_pool, ingester_pool, MetastoreServiceClient::from(metastore), diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 01eaf10dae0..2d3c89d1864 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -55,12 +55,13 @@ use std::time::Duration; use anyhow::Context; use bytesize::ByteSize; pub use format::BodyFormat; -use futures::{Stream, StreamExt}; +use futures::StreamExt; use itertools::Itertools; use once_cell::sync::Lazy; use quickwit_actors::{ActorExitStatus, Mailbox, Universe}; use quickwit_cluster::{ - start_cluster_service, Cluster, ClusterChange, ClusterMember, ListenerHandle, + start_cluster_service, Cluster, ClusterChange, ClusterChangeStream, ClusterMember, + ListenerHandle, }; use quickwit_common::pubsub::{EventBroker, EventSubscriptionHandle}; use quickwit_common::rate_limiter::RateLimiterSettings; @@ -205,7 +206,7 @@ async fn balance_channel_for_service( cluster: &Cluster, service: QuickwitService, ) -> BalanceChannel { - let cluster_change_stream = cluster.ready_nodes_change_stream().await; + let cluster_change_stream = cluster.change_stream(); let service_change_stream = cluster_change_stream.filter_map(move |cluster_change| { Box::pin(async move { match cluster_change { @@ -280,19 +281,18 @@ async fn start_control_plane_if_needed( ) .await?; - let cluster_id = cluster.cluster_id().to_string(); let self_node_id: NodeId = cluster.self_node_id().into(); - let replication_factor = node_config .ingest_api_config .replication_factor() .expect("replication factor should have been validated") .get(); + let control_plane_mailbox = setup_control_plane( universe, event_broker, - cluster_id, self_node_id, + cluster.clone(), indexer_pool.clone(), ingester_pool.clone(), metastore_client.clone(), @@ -430,10 +430,9 @@ pub async fn serve_quickwit( }; // Setup indexer pool. - let cluster_change_stream = cluster.ready_nodes_change_stream().await; setup_indexer_pool( &node_config, - cluster_change_stream, + cluster.change_stream(), indexer_pool.clone(), indexing_service_opt.clone(), ); @@ -475,9 +474,6 @@ pub async fn serve_quickwit( } } } - - let cluster_change_stream = cluster.ready_nodes_change_stream().await; - let split_cache_root_directory: PathBuf = node_config.data_dir_path.join("searcher-split-cache"); let split_cache_opt: Option> = @@ -500,7 +496,7 @@ pub async fn serve_quickwit( let (search_job_placer, search_service) = setup_searcher( &node_config, - cluster_change_stream, + cluster.change_stream(), metastore_through_control_plane.clone(), storage_resolver.clone(), searcher_context, @@ -741,9 +737,8 @@ async fn setup_ingest_v2( }; // Setup ingester pool change stream. let ingester_opt_clone = ingester_opt.clone(); - let cluster_change_stream = cluster.ready_nodes_change_stream().await; let max_message_size = node_config.grpc_config.max_message_size; - let ingester_change_stream = cluster_change_stream.filter_map(move |cluster_change| { + let ingester_change_stream = cluster.change_stream().filter_map(move |cluster_change| { let ingester_opt_clone_clone = ingester_opt_clone.clone(); Box::pin(async move { match cluster_change { @@ -794,7 +789,7 @@ async fn setup_ingest_v2( async fn setup_searcher( node_config: &NodeConfig, - cluster_change_stream: impl Stream + Send + 'static, + cluster_change_stream: ClusterChangeStream, metastore: MetastoreServiceClient, storage_resolver: StorageResolver, searcher_context: Arc, @@ -846,14 +841,15 @@ async fn setup_searcher( async fn setup_control_plane( universe: &Universe, event_broker: &EventBroker, - cluster_id: String, self_node_id: NodeId, + cluster: Cluster, indexer_pool: IndexerPool, ingester_pool: IngesterPool, metastore: MetastoreServiceClient, default_index_root_uri: Uri, replication_factor: usize, ) -> anyhow::Result> { + let cluster_id = cluster.cluster_id().to_string(); let cluster_config = ClusterConfig { cluster_id, auto_create_indexes: true, @@ -864,6 +860,7 @@ async fn setup_control_plane( universe, cluster_config, self_node_id, + cluster.clone(), indexer_pool, ingester_pool, metastore, @@ -881,7 +878,7 @@ async fn setup_control_plane( fn setup_indexer_pool( node_config: &NodeConfig, - cluster_change_stream: impl Stream + Send + 'static, + cluster_change_stream: ClusterChangeStream, indexer_pool: IndexerPool, indexing_service_opt: Option>, ) { @@ -1069,8 +1066,7 @@ mod tests { use quickwit_proto::metastore::ListIndexesMetadataResponse; use quickwit_proto::types::{IndexUid, PipelineUid}; use quickwit_search::Job; - use tokio::sync::{mpsc, watch}; - use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream}; + use tokio::sync::watch; use super::*; @@ -1165,21 +1161,20 @@ mod tests { universe.create_test_mailbox::(); let node_config = NodeConfig::for_test(); - let (indexer_change_stream_tx, indexer_change_stream_rx) = mpsc::channel(3); - let indexer_change_stream = ReceiverStream::new(indexer_change_stream_rx); + let (cluster_change_stream, cluster_change_stream_tx) = + ClusterChangeStream::new_unbounded(); let indexer_pool = IndexerPool::default(); setup_indexer_pool( &node_config, - indexer_change_stream, + cluster_change_stream, indexer_pool.clone(), Some(indexing_service_mailbox), ); let new_indexer_node = ClusterNode::for_test("test-indexer-node", 1, true, &["indexer"], &[]).await; - indexer_change_stream_tx + cluster_change_stream_tx .send(ClusterChange::Add(new_indexer_node)) - .await .unwrap(); tokio::time::sleep(Duration::from_millis(1)).await; @@ -1202,9 +1197,8 @@ mod tests { &[new_indexing_task.clone()], ) .await; - indexer_change_stream_tx + cluster_change_stream_tx .send(ClusterChange::Update(updated_indexer_node.clone())) - .await .unwrap(); tokio::time::sleep(Duration::from_millis(1)).await; @@ -1215,9 +1209,8 @@ mod tests { new_indexing_task ); - indexer_change_stream_tx + cluster_change_stream_tx .send(ClusterChange::Remove(updated_indexer_node)) - .await .unwrap(); tokio::time::sleep(Duration::from_millis(1)).await; @@ -1229,8 +1222,7 @@ mod tests { let node_config = NodeConfig::for_test(); let searcher_context = Arc::new(SearcherContext::new(SearcherConfig::default(), None)); let metastore = metastore_for_test(); - let (change_stream_tx, change_stream_rx) = mpsc::unbounded_channel(); - let change_stream = UnboundedReceiverStream::new(change_stream_rx); + let (change_stream, change_stream_tx) = ClusterChangeStream::new_unbounded(); let storage_resolver = StorageResolver::unconfigured(); let (search_job_placer, _searcher_service) = setup_searcher( &node_config,