Skip to content

Commit

Permalink
Rebuild indexing plan when indexer joins or leaves the cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Feb 21, 2024
1 parent 1891932 commit d6a2dfc
Show file tree
Hide file tree
Showing 11 changed files with 252 additions and 68 deletions.
2 changes: 1 addition & 1 deletion quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/quickwit-cluster/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ chitchat = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
once_cell = { workspace = true }
pin-project = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
Expand Down
60 changes: 60 additions & 0 deletions quickwit/quickwit-cluster/src/change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,56 @@

use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::pin::Pin;
use std::task::{Context, Poll};

use chitchat::{ChitchatId, NodeState};
use futures::Stream;
use pin_project::pin_project;
use quickwit_common::sorted_iter::{KeyDiff, SortedByKeyIterator};
use quickwit_common::tower::{make_channel, warmup_channel};
use quickwit_proto::types::NodeId;
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tonic::transport::Channel;
use tracing::{info, warn};

use crate::member::NodeStateExt;
use crate::ClusterNode;

/// Describes a change in the cluster.
#[derive(Debug, Clone)]
pub enum ClusterChange {
Add(ClusterNode),
Update(ClusterNode),
Remove(ClusterNode),
}

#[pin_project]
pub struct ClusterChangeStream(#[pin] UnboundedReceiverStream<ClusterChange>);

impl ClusterChangeStream {
pub fn new_unbounded() -> (Self, mpsc::UnboundedSender<ClusterChange>) {
let (change_stream_tx, change_stream_rx) = mpsc::unbounded_channel();
(
Self(UnboundedReceiverStream::new(change_stream_rx)),
change_stream_tx,
)
}
}

impl Stream for ClusterChangeStream {
type Item = ClusterChange;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().0.poll_next(cx)
}
}

pub trait ClusterChangeStreamFactory: Clone + Send + 'static {
fn create(&self) -> ClusterChangeStream;
}

/// Compares the digests of the previous and new set of lives nodes, identifies the changes that
/// occurred in the cluster, and emits the corresponding events, focusing on ready nodes only.
pub(crate) async fn compute_cluster_change_events(
Expand Down Expand Up @@ -294,6 +326,34 @@ async fn try_new_node(
}
}

#[cfg(any(test, feature = "testsuite"))]
pub mod for_test {
use std::sync::{Arc, Mutex};

use tokio::sync::mpsc;

use super::*;

#[derive(Clone, Default)]
pub struct ClusterChangeStreamFactoryForTest {
inner: Arc<Mutex<Option<mpsc::UnboundedSender<ClusterChange>>>>,
}

impl ClusterChangeStreamFactoryForTest {
pub fn change_stream_tx(&self) -> mpsc::UnboundedSender<ClusterChange> {
self.inner.lock().unwrap().take().unwrap()
}
}

impl ClusterChangeStreamFactory for ClusterChangeStreamFactoryForTest {
fn create(&self) -> ClusterChangeStream {
let (change_stream, change_stream_tx) = ClusterChangeStream::new_unbounded();
*self.inner.lock().unwrap() = Some(change_stream_tx);
change_stream
}
}
}

#[cfg(test)]
mod tests {
use std::collections::HashSet;
Expand Down
44 changes: 26 additions & 18 deletions quickwit/quickwit-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,24 @@ use chitchat::{
spawn_chitchat, Chitchat, ChitchatConfig, ChitchatHandle, ChitchatId, ClusterStateSnapshot,
FailureDetectorConfig, KeyChangeEvent, ListenerHandle, NodeState,
};
use futures::Stream;
use itertools::Itertools;
use quickwit_proto::indexing::{IndexingPipelineId, IndexingTask, PipelineMetrics};
use quickwit_proto::types::{NodeId, PipelineUid, ShardId};
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, watch, Mutex, RwLock};
use tokio::time::timeout;
use tokio_stream::wrappers::{UnboundedReceiverStream, WatchStream};
use tokio_stream::wrappers::WatchStream;
use tokio_stream::StreamExt;
use tracing::{info, warn};

use crate::change::{compute_cluster_change_events, ClusterChange};
use crate::change::{compute_cluster_change_events, ClusterChange, ClusterChangeStreamFactory};
use crate::member::{
build_cluster_member, ClusterMember, NodeStateExt, ENABLED_SERVICES_KEY,
GRPC_ADVERTISE_ADDR_KEY, PIPELINE_METRICS_PREFIX, READINESS_KEY, READINESS_VALUE_NOT_READY,
READINESS_VALUE_READY,
};
use crate::metrics::spawn_metrics_task;
use crate::ClusterNode;
use crate::{ClusterChangeStream, ClusterNode};

const MARKED_FOR_DELETION_GRACE_PERIOD: usize = if cfg!(any(test, feature = "testsuite")) {
100 // ~ HEARTBEAT * 100 = 2.5 seconds.
Expand Down Expand Up @@ -197,20 +196,23 @@ impl Cluster {
}

/// Returns a stream of changes affecting the set of ready nodes in the cluster.
pub async fn ready_nodes_change_stream(&self) -> impl Stream<Item = ClusterChange> {
// The subscriber channel must be unbounded because we do no want to block when sending the
// events.
let (change_stream_tx, change_stream_rx) = mpsc::unbounded_channel();
let mut inner = self.inner.write().await;
for node in inner.live_nodes.values() {
if node.is_ready() {
change_stream_tx
.send(ClusterChange::Add(node.clone()))
.expect("The receiver end of the channel should be open.");
pub fn change_stream(&self) -> ClusterChangeStream {
let (change_stream, change_stream_tx) = ClusterChangeStream::new_unbounded();
let inner = self.inner.clone();
// We spawn a task so the signature of this function is sync.
let future = async move {
let mut inner = inner.write().await;
for node in inner.live_nodes.values() {
if node.is_ready() {
change_stream_tx
.send(ClusterChange::Add(node.clone()))
.expect("receiver end of the channel should be open");
}
}
}
inner.change_stream_subscribers.push(change_stream_tx);
UnboundedReceiverStream::new(change_stream_rx)
inner.change_stream_subscribers.push(change_stream_tx);
};
tokio::spawn(future);
change_stream
}

/// Returns whether the self node is ready.
Expand Down Expand Up @@ -387,6 +389,12 @@ impl Cluster {
}
}

impl ClusterChangeStreamFactory for Cluster {
fn create(&self) -> ClusterChangeStream {
self.change_stream()
}
}

/// Deprecated: this is going away soon.
fn spawn_ready_members_task(
cluster_id: String,
Expand Down Expand Up @@ -787,7 +795,7 @@ mod tests {
async fn test_cluster_multiple_nodes() -> anyhow::Result<()> {
let transport = ChannelTransport::default();
let node_1 = create_cluster_for_test(Vec::new(), &[], &transport, true).await?;
let node_1_change_stream = node_1.ready_nodes_change_stream().await;
let node_1_change_stream = node_1.change_stream();

let peer_seeds = vec![node_1.gossip_listen_addr.to_string()];
let node_2 = create_cluster_for_test(peer_seeds, &[], &transport, true).await?;
Expand Down
4 changes: 3 additions & 1 deletion quickwit/quickwit-cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ use quickwit_proto::indexing::CpuCapacity;
use quickwit_proto::types::NodeId;
use time::OffsetDateTime;

pub use crate::change::ClusterChange;
#[cfg(any(test, feature = "testsuite"))]
pub use crate::change::for_test::*;
pub use crate::change::{ClusterChange, ClusterChangeStream, ClusterChangeStreamFactory};
#[cfg(any(test, feature = "testsuite"))]
pub use crate::cluster::{
create_cluster_for_test, create_cluster_for_test_with_id, grpc_addr_from_listen_addr_for_test,
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-config/src/cluster_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use quickwit_common::uri::Uri;

/// An embryo of a cluster config.
// TODO: Version object.
// TODO: Move to `quickwit-config` and version object.
#[derive(Debug, Clone)]
pub struct ClusterConfig {
pub cluster_id: String,
Expand Down
4 changes: 1 addition & 3 deletions quickwit/quickwit-control-plane/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ tracing = { workspace = true }
ulid = { workspace = true }

quickwit-actors = { workspace = true }
quickwit-cluster = { workspace = true }
quickwit-common = { workspace = true }
quickwit-config = { workspace = true }
quickwit-ingest = { workspace = true }
Expand All @@ -56,8 +57,5 @@ quickwit-metastore = { workspace = true, features = ["testsuite"] }
quickwit-proto = { workspace = true, features = ["testsuite"] }
quickwit-storage = { workspace = true, features = ["testsuite"] }

[build-dependencies]
quickwit-codegen = { workspace = true }

[features]
testsuite = ["mockall"]
Loading

0 comments on commit d6a2dfc

Please sign in to comment.