Skip to content

Commit

Permalink
Making gossip interval configurable (#4551)
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton authored Feb 8, 2024
1 parent 477cf91 commit 1df7855
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 9 deletions.
1 change: 1 addition & 0 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,7 @@ async fn create_empty_cluster(config: &NodeConfig) -> anyhow::Result<Cluster> {
self_node,
config.gossip_advertise_addr,
Vec::new(),
config.gossip_interval,
FailureDetectorConfig::default(),
&ChannelTransport::default(),
)
Expand Down
17 changes: 8 additions & 9 deletions quickwit/quickwit-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,6 @@ use crate::member::{
};
use crate::ClusterNode;

const GOSSIP_INTERVAL: Duration = if cfg!(any(test, feature = "testsuite")) {
Duration::from_millis(25)
} else {
Duration::from_secs(1)
};

const MARKED_FOR_DELETION_GRACE_PERIOD: usize = if cfg!(any(test, feature = "testsuite")) {
100 // ~ HEARTBEAT * 100 = 2.5 seconds.
} else {
Expand All @@ -71,6 +65,7 @@ pub struct Cluster {
self_chitchat_id: ChitchatId,
/// Socket address (UDP) the node listens on for receiving gossip messages.
pub gossip_listen_addr: SocketAddr,
gossip_interval: Duration,
inner: Arc<RwLock<InnerCluster>>,
}

Expand All @@ -85,6 +80,7 @@ impl Debug for Cluster {
"gossip_advertise_addr",
&self.self_chitchat_id.gossip_advertise_addr,
)
.field("gossip_interval", &self.gossip_interval)
.finish()
}
}
Expand Down Expand Up @@ -115,6 +111,7 @@ impl Cluster {
self_node: ClusterMember,
gossip_listen_addr: SocketAddr,
peer_seed_addrs: Vec<String>,
gossip_interval: Duration,
failure_detector_config: FailureDetectorConfig,
transport: &dyn Transport,
) -> anyhow::Result<Self> {
Expand All @@ -134,7 +131,7 @@ impl Cluster {
listen_addr: gossip_listen_addr,
seed_nodes: peer_seed_addrs,
failure_detector_config,
gossip_interval: GOSSIP_INTERVAL,
gossip_interval,
marked_for_deletion_grace_period: MARKED_FOR_DELETION_GRACE_PERIOD,
};
let chitchat_handle = spawn_chitchat(
Expand Down Expand Up @@ -178,6 +175,7 @@ impl Cluster {
cluster_id,
self_chitchat_id: self_node.chitchat_id(),
gossip_listen_addr,
gossip_interval,
inner: Arc::new(RwLock::new(inner)),
};
spawn_ready_nodes_change_stream_task(cluster.clone()).await;
Expand Down Expand Up @@ -335,7 +333,7 @@ impl Cluster {
"Leaving the cluster."
);
self.set_self_node_readiness(false).await;
tokio::time::sleep(GOSSIP_INTERVAL * 2).await;
tokio::time::sleep(self.gossip_interval * 2).await;
}

/// This exposes in chitchat some metrics about the CPU usage of cooperative pipelines.
Expand Down Expand Up @@ -641,6 +639,7 @@ pub async fn create_cluster_for_test_with_id(
self_node,
gossip_advertise_addr,
peer_seed_addrs,
Duration::from_millis(25),
failure_detector_config,
transport,
)
Expand All @@ -654,7 +653,7 @@ pub async fn create_cluster_for_test_with_id(
fn create_failure_detector_config_for_test() -> FailureDetectorConfig {
FailureDetectorConfig {
phi_threshold: 5.0,
initial_interval: GOSSIP_INTERVAL,
initial_interval: Duration::from_millis(25),
..Default::default()
}
}
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ pub async fn start_cluster_service(node_config: &NodeConfig) -> anyhow::Result<C
self_node,
gossip_listen_addr,
peer_seed_addrs,
node_config.gossip_interval,
FailureDetectorConfig::default(),
&UdpTransport,
)
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-config/src/node_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ pub struct NodeConfig {
pub grpc_listen_addr: SocketAddr,
pub gossip_advertise_addr: SocketAddr,
pub grpc_advertise_addr: SocketAddr,
pub gossip_interval: Duration,
pub peer_seeds: Vec<String>,
pub data_dir_path: PathBuf,
pub metastore_uri: Uri,
Expand Down
17 changes: 17 additions & 0 deletions quickwit/quickwit-config/src/node_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use std::collections::HashMap;
use std::net::{IpAddr, SocketAddr};
use std::str::FromStr;
use std::time::Duration;

use anyhow::{bail, Context};
use http::HeaderMap;
Expand All @@ -44,6 +45,12 @@ pub const DEFAULT_CLUSTER_ID: &str = "quickwit-default-cluster";

pub const DEFAULT_DATA_DIR_PATH: &str = "qwdata";

pub const DEFAULT_GOSSIP_INTERVAL: Duration = if cfg!(any(test, feature = "testsuite")) {
Duration::from_millis(25)
} else {
Duration::from_secs(1)
};

// Default config values in the order they appear in [`NodeConfigBuilder`].
fn default_cluster_id() -> ConfigValue<String, QW_CLUSTER_ID> {
ConfigValue::with_default(DEFAULT_CLUSTER_ID.to_string())
Expand Down Expand Up @@ -175,6 +182,7 @@ struct NodeConfigBuilder {
rest_listen_port: Option<u16>,
gossip_listen_port: ConfigValue<u16, QW_GOSSIP_LISTEN_PORT>,
grpc_listen_port: ConfigValue<u16, QW_GRPC_LISTEN_PORT>,
gossip_interval_ms: ConfigValue<u32, QW_GOSSIP_INTERVAL_MS>,
#[serde(default)]
peer_seeds: ConfigValue<List, QW_PEER_SEEDS>,
#[serde(rename = "data_dir")]
Expand Down Expand Up @@ -290,6 +298,12 @@ impl NodeConfigBuilder {
self.ingest_api_config.validate()?;
self.searcher_config.validate()?;

let gossip_interval = self
.gossip_interval_ms
.resolve_optional(env_vars)?
.map(|gossip_interval_ms| Duration::from_millis(gossip_interval_ms as u64))
.unwrap_or(DEFAULT_GOSSIP_INTERVAL);

let node_config = NodeConfig {
cluster_id: self.cluster_id.resolve(env_vars)?,
node_id: self.node_id.resolve(env_vars)?,
Expand All @@ -298,6 +312,7 @@ impl NodeConfigBuilder {
grpc_listen_addr,
gossip_advertise_addr,
grpc_advertise_addr,
gossip_interval,
peer_seeds: self.peer_seeds.resolve(env_vars)?.0,
data_dir_path,
metastore_uri,
Expand Down Expand Up @@ -341,6 +356,7 @@ impl Default for NodeConfigBuilder {
rest_listen_port: None,
gossip_listen_port: ConfigValue::none(),
grpc_listen_port: ConfigValue::none(),
gossip_interval_ms: ConfigValue::none(),
advertise_address: ConfigValue::none(),
peer_seeds: ConfigValue::with_default(List::default()),
data_dir_uri: default_data_dir_uri(),
Expand Down Expand Up @@ -434,6 +450,7 @@ pub fn node_config_for_test() -> NodeConfig {
grpc_advertise_addr: grpc_listen_addr,
gossip_listen_addr,
grpc_listen_addr,
gossip_interval: Duration::from_millis(25u64),
peer_seeds: Vec::new(),
data_dir_path,
metastore_uri,
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-config/src/qw_env_vars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ qw_env_vars!(
QW_REST_LISTEN_PORT,
QW_GOSSIP_LISTEN_PORT,
QW_GRPC_LISTEN_PORT,
QW_GOSSIP_INTERVAL_MS,
QW_PEER_SEEDS,
QW_DATA_DIR,
QW_METASTORE_URI,
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-lambda/src/indexer/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ async fn create_empty_cluster(config: &NodeConfig) -> anyhow::Result<Cluster> {
self_node,
config.gossip_advertise_addr,
Vec::new(),
config.gossip_interval,
FailureDetectorConfig::default(),
&ChannelTransport::default(),
)
Expand Down

0 comments on commit 1df7855

Please sign in to comment.