Skip to content

Commit

Permalink
Broadcast shard ingestion throughput and state via Chitchat (#4189)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Nov 23, 2023
1 parent d189324 commit 2986019
Show file tree
Hide file tree
Showing 25 changed files with 1,121 additions and 118 deletions.
6 changes: 3 additions & 3 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ base64 = "0.21"
bytes = { version = "1", features = ["serde"] }
bytesize = { version = "1.3.0", features = ["serde"] }
bytestring = "1.3.0"
chitchat = "0.7"
chitchat = { git = "https://github.com/quickwit-oss/chitchat.git", rev = "72a994f" }
chrono = { version = "0.4.23", default-features = false, features = [
"clock",
"std",
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-cluster/src/change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ mod tests {
}

pub(crate) fn build(self) -> NodeState {
let mut node_state = NodeState::default();
let mut node_state = NodeState::for_test();

node_state.set(
ENABLED_SERVICES_KEY,
Expand Down
51 changes: 40 additions & 11 deletions quickwit/quickwit-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use anyhow::Context;
use chitchat::transport::Transport;
use chitchat::{
spawn_chitchat, Chitchat, ChitchatConfig, ChitchatHandle, ChitchatId, ClusterStateSnapshot,
FailureDetectorConfig, NodeState,
FailureDetectorConfig, ListenerHandle, NodeState,
};
use futures::Stream;
use itertools::Itertools;
Expand Down Expand Up @@ -249,9 +249,31 @@ impl Cluster {
.await
.self_node_state()
.get_versioned(key)
.filter(|versioned_value| versioned_value.tombstone.is_none())
.map(|versioned_value| versioned_value.value.clone())
}

pub async fn remove_self_key(&self, key: &str) {
self.chitchat()
.await
.lock()
.await
.self_node_state()
.mark_for_deletion(key)
}

pub async fn subscribe(
&self,
key_prefix: &str,
callback: impl Fn(&str, &str) + Send + Sync + 'static,
) -> ListenerHandle {
self.chitchat()
.await
.lock()
.await
.subscribe_event(key_prefix, callback)
}

/// Waits until the predicate holds true for the set of ready members.
pub async fn wait_for_ready_members<F>(
&self,
Expand Down Expand Up @@ -643,16 +665,16 @@ pub fn grpc_addr_from_listen_addr_for_test(listen_addr: SocketAddr) -> SocketAdd

#[cfg(any(test, feature = "testsuite"))]
pub async fn create_cluster_for_test_with_id(
node_id: u16,
node_id: NodeId,
gossip_advertise_port: u16,
cluster_id: String,
peer_seed_addrs: Vec<String>,
enabled_services: &HashSet<quickwit_config::service::QuickwitService>,
transport: &dyn Transport,
self_node_readiness: bool,
) -> anyhow::Result<Cluster> {
use quickwit_proto::indexing::PIPELINE_FULL_CAPACITY;
let gossip_advertise_addr: SocketAddr = ([127, 0, 0, 1], node_id).into();
let node_id: NodeId = format!("node_{node_id}").into();
let gossip_advertise_addr: SocketAddr = ([127, 0, 0, 1], gossip_advertise_port).into();
let self_node = ClusterMember {
node_id,
generation_id: crate::GenerationId(1),
Expand Down Expand Up @@ -700,14 +722,17 @@ pub async fn create_cluster_for_test(

use quickwit_config::service::QuickwitService;

static NODE_AUTO_INCREMENT: AtomicU16 = AtomicU16::new(1u16);
let node_id = NODE_AUTO_INCREMENT.fetch_add(1, Ordering::Relaxed);
static GOSSIP_ADVERTISE_PORT_SEQUENCE: AtomicU16 = AtomicU16::new(1u16);
let gossip_advertise_port = GOSSIP_ADVERTISE_PORT_SEQUENCE.fetch_add(1, Ordering::Relaxed);
let node_id: NodeId = format!("node-{}", gossip_advertise_port).into();

let enabled_services = enabled_services
.iter()
.map(|service_str| QuickwitService::from_str(service_str))
.collect::<Result<HashSet<_>, _>>()?;
let cluster = create_cluster_for_test_with_id(
node_id,
gossip_advertise_port,
"test-cluster".to_string(),
seeds,
&enabled_services,
Expand Down Expand Up @@ -1017,8 +1042,8 @@ mod tests {
indexing_tasks_clone.clone(),
)
},
Duration::from_secs(4),
Duration::from_millis(500),
Duration::from_secs(5),
Duration::from_millis(100),
)
.await
.unwrap();
Expand Down Expand Up @@ -1126,7 +1151,8 @@ mod tests {
let transport = ChannelTransport::default();

let cluster1a = create_cluster_for_test_with_id(
11u16,
"node-11".into(),
11,
"cluster1".to_string(),
Vec::new(),
&HashSet::default(),
Expand All @@ -1135,7 +1161,8 @@ mod tests {
)
.await?;
let cluster2a = create_cluster_for_test_with_id(
21u16,
"node-21".into(),
21,
"cluster2".to_string(),
vec![cluster1a.gossip_listen_addr.to_string()],
&HashSet::default(),
Expand All @@ -1144,6 +1171,7 @@ mod tests {
)
.await?;
let cluster1b = create_cluster_for_test_with_id(
"node-12".into(),
12,
"cluster1".to_string(),
vec![
Expand All @@ -1156,6 +1184,7 @@ mod tests {
)
.await?;
let cluster2b = create_cluster_for_test_with_id(
"node-22".into(),
22,
"cluster2".to_string(),
vec![
Expand Down Expand Up @@ -1249,7 +1278,7 @@ mod tests {

#[test]
fn test_serialize_indexing_tasks() {
let mut node_state = NodeState::default();
let mut node_state = NodeState::for_test();
test_serialize_indexing_tasks_aux(&[], &mut node_state);
test_serialize_indexing_tasks_aux(
&[IndexingTask {
Expand Down
5 changes: 4 additions & 1 deletion quickwit/quickwit-cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ mod node;
pub use chitchat::transport::ChannelTransport;
use chitchat::transport::UdpTransport;
use chitchat::FailureDetectorConfig;
pub use chitchat::ListenerHandle;
use quickwit_config::service::QuickwitService;
use quickwit_config::NodeConfig;
use quickwit_proto::indexing::CpuCapacity;
Expand All @@ -36,7 +37,9 @@ use time::OffsetDateTime;

pub use crate::change::ClusterChange;
#[cfg(any(test, feature = "testsuite"))]
pub use crate::cluster::{create_cluster_for_test, grpc_addr_from_listen_addr_for_test};
pub use crate::cluster::{
create_cluster_for_test, create_cluster_for_test_with_id, grpc_addr_from_listen_addr_for_test,
};
pub use crate::cluster::{Cluster, ClusterSnapshot, NodeIdSchema};
pub use crate::member::{ClusterMember, INDEXING_CPU_CAPACITY_KEY};
pub use crate::node::ClusterNode;
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-cluster/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl ClusterNode {
let grpc_advertise_addr = ([127, 0, 0, 1], port + 1).into();
let chitchat_id = ChitchatId::new(node_id.to_string(), 0, gossip_advertise_addr);
let channel = make_channel(grpc_advertise_addr).await;
let mut node_state = NodeState::default();
let mut node_state = NodeState::for_test();
node_state.set(ENABLED_SERVICES_KEY, enabled_services.join(","));
node_state.set(GRPC_ADVERTISE_ADDR_KEY, grpc_advertise_addr.to_string());
set_indexing_tasks_in_node_state(indexing_tasks, &mut node_state);
Expand Down
67 changes: 16 additions & 51 deletions quickwit/quickwit-common/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

use std::collections::HashMap;
use std::fmt;
use std::fmt::Formatter;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, Weak};
use std::time::Duration;
Expand All @@ -37,34 +35,14 @@ pub trait EventSubscriber<E>: Send + Sync + 'static {
async fn handle_event(&mut self, event: E);
}

struct ClosureSubscriber<E, F> {
callback: Arc<F>,
_phantom: PhantomData<E>,
}

impl<E, F> Clone for ClosureSubscriber<E, F> {
fn clone(&self) -> Self {
ClosureSubscriber {
callback: self.callback.clone(),
_phantom: self._phantom,
}
}
}

impl<E, F> fmt::Debug for ClosureSubscriber<E, F> {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
f.debug_struct("ClosureSubscriber")
.field("callback", &std::any::type_name::<F>())
.finish()
}
}

#[async_trait]
impl<E: Sync + Send + 'static, F: Fn(E) + Sync + Send + 'static> EventSubscriber<E>
for ClosureSubscriber<E, F>
impl<E, F> EventSubscriber<E> for F
where
E: Event,
F: Fn(E) + Send + Sync + 'static,
{
async fn handle_event(&mut self, event: E) {
(self.callback)(event);
(self)(event);
}
}

Expand Down Expand Up @@ -98,7 +76,7 @@ impl EventBroker {
.inner
.subscriptions
.lock()
.expect("the lock should not be poisoned");
.expect("lock should not be poisoned");

if !subscriptions.contains::<EventSubscriptions<E>>() {
subscriptions.insert::<EventSubscriptions<E>>(HashMap::new());
Expand All @@ -113,16 +91,17 @@ impl EventBroker {
};
let typed_subscriptions = subscriptions
.get_mut::<EventSubscriptions<E>>()
.expect("The subscription map should exist.");
.expect("subscription map should exist");
typed_subscriptions.insert(subscription_id, subscription);

EventSubscriptionHandle {
subscription_id,
broker: Arc::downgrade(&self.inner),
drop_me: |subscription_id, broker| {
let mut subscriptions = broker
.subscriptions
.lock()
.expect("the lock should not be poisoned");
.expect("lock should not be poisoned");
if let Some(typed_subscriptions) = subscriptions.get_mut::<EventSubscriptions<E>>()
{
typed_subscriptions.remove(&subscription_id);
Expand All @@ -131,29 +110,14 @@ impl EventBroker {
}
}

/// Subscribes to an event with a callback function.
#[must_use]
pub fn subscribe_fn<E>(
&self,
callback_fn: impl Fn(E) + Sync + Send + 'static,
) -> EventSubscriptionHandle
where
E: Event,
{
self.subscribe(ClosureSubscriber {
callback: Arc::new(callback_fn),
_phantom: Default::default(),
})
}

/// Publishes an event.
pub fn publish<E>(&self, event: E)
where E: Event {
let subscriptions = self
.inner
.subscriptions
.lock()
.expect("the lock should not be poisoned");
.expect("lock should not be poisoned");

if let Some(typed_subscriptions) = subscriptions.get::<EventSubscriptions<E>>() {
for subscription in typed_subscriptions.values() {
Expand All @@ -172,6 +136,7 @@ struct EventSubscription<E> {
subscriber: Arc<TokioMutex<Box<dyn EventSubscriber<E>>>>,
}

#[derive(Clone)]
pub struct EventSubscriptionHandle {
subscription_id: usize,
broker: Weak<InnerEventBroker>,
Expand All @@ -181,8 +146,8 @@ pub struct EventSubscriptionHandle {
impl EventSubscriptionHandle {
pub fn cancel(self) {}

/// By default, dropping an event cancels the subscription.
/// `forever` consumes the handle and avoid drop
/// By default, dropping a subscription handle cancels the subscription.
/// `forever` consumes the handle and avoids cancelling the subscription on drop.
pub fn forever(mut self) {
self.broker = Weak::new();
}
Expand Down Expand Up @@ -251,7 +216,7 @@ mod tests {
async fn test_event_broker_handle_drop() {
let event_broker = EventBroker::default();
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
drop(event_broker.subscribe_fn::<MyEvent>(move |event| {
drop(event_broker.subscribe(move |event: MyEvent| {
tx.send(event.value).unwrap();
}));
event_broker.publish(MyEvent { value: 42 });
Expand All @@ -263,7 +228,7 @@ mod tests {
let event_broker = EventBroker::default();
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
event_broker
.subscribe_fn::<MyEvent>(move |event| {
.subscribe(move |event: MyEvent| {
tx.send(event.value).unwrap();
})
.cancel();
Expand All @@ -276,7 +241,7 @@ mod tests {
let event_broker = EventBroker::default();
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
event_broker
.subscribe_fn::<MyEvent>(move |event| {
.subscribe(move |event: MyEvent| {
tx.send(event.value).unwrap();
})
.forever();
Expand Down
7 changes: 5 additions & 2 deletions quickwit/quickwit-common/src/shared_consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,8 @@ pub const DELETION_GRACE_PERIOD: Duration = Duration::from_secs(60 * 32); // 32
/// being requested.
pub const SCROLL_BATCH_LEN: usize = 1_000;

/// Prefix used in chitchat to publish the shard positions.
pub const SHARD_POSITIONS_PREFIX: &str = "shard_positions:";
/// Prefix used in chitchat to broadcast the positions of the shards assigned to an indexer.
pub const INDEXER_ASSIGNED_SHARDS_POSITIONS_PREFIX: &str = "indexer.assigned_shards_positions:";

/// Prefix used in chitchat to broadcast the list of primary shards hosted by a leader.
pub const INGESTER_PRIMARY_SHARDS_PREFIX: &str = "ingester.primary_shards:";
Loading

0 comments on commit 2986019

Please sign in to comment.