diff --git a/quickwit/quickwit-cli/src/tool.rs b/quickwit/quickwit-cli/src/tool.rs index 408543d8aaa..e0dca7e2bc1 100644 --- a/quickwit/quickwit-cli/src/tool.rs +++ b/quickwit/quickwit-cli/src/tool.rs @@ -949,6 +949,7 @@ async fn create_empty_cluster(config: &NodeConfig) -> anyhow::Result { self_node, config.gossip_advertise_addr, Vec::new(), + config.gossip_interval, FailureDetectorConfig::default(), &ChannelTransport::default(), ) diff --git a/quickwit/quickwit-cluster/src/cluster.rs b/quickwit/quickwit-cluster/src/cluster.rs index ae36cc6ad6a..8ee6fe30ac8 100644 --- a/quickwit/quickwit-cluster/src/cluster.rs +++ b/quickwit/quickwit-cluster/src/cluster.rs @@ -49,12 +49,6 @@ use crate::member::{ }; use crate::ClusterNode; -const GOSSIP_INTERVAL: Duration = if cfg!(any(test, feature = "testsuite")) { - Duration::from_millis(25) -} else { - Duration::from_secs(1) -}; - const MARKED_FOR_DELETION_GRACE_PERIOD: usize = if cfg!(any(test, feature = "testsuite")) { 100 // ~ HEARTBEAT * 100 = 2.5 seconds. } else { @@ -71,6 +65,7 @@ pub struct Cluster { self_chitchat_id: ChitchatId, /// Socket address (UDP) the node listens on for receiving gossip messages. pub gossip_listen_addr: SocketAddr, + gossip_interval: Duration, inner: Arc>, } @@ -85,6 +80,7 @@ impl Debug for Cluster { "gossip_advertise_addr", &self.self_chitchat_id.gossip_advertise_addr, ) + .field("gossip_interval", &self.gossip_interval) .finish() } } @@ -115,6 +111,7 @@ impl Cluster { self_node: ClusterMember, gossip_listen_addr: SocketAddr, peer_seed_addrs: Vec, + gossip_interval: Duration, failure_detector_config: FailureDetectorConfig, transport: &dyn Transport, ) -> anyhow::Result { @@ -134,7 +131,7 @@ impl Cluster { listen_addr: gossip_listen_addr, seed_nodes: peer_seed_addrs, failure_detector_config, - gossip_interval: GOSSIP_INTERVAL, + gossip_interval, marked_for_deletion_grace_period: MARKED_FOR_DELETION_GRACE_PERIOD, }; let chitchat_handle = spawn_chitchat( @@ -178,6 +175,7 @@ impl Cluster { cluster_id, self_chitchat_id: self_node.chitchat_id(), gossip_listen_addr, + gossip_interval, inner: Arc::new(RwLock::new(inner)), }; spawn_ready_nodes_change_stream_task(cluster.clone()).await; @@ -335,7 +333,7 @@ impl Cluster { "Leaving the cluster." ); self.set_self_node_readiness(false).await; - tokio::time::sleep(GOSSIP_INTERVAL * 2).await; + tokio::time::sleep(self.gossip_interval * 2).await; } /// This exposes in chitchat some metrics about the CPU usage of cooperative pipelines. @@ -641,6 +639,7 @@ pub async fn create_cluster_for_test_with_id( self_node, gossip_advertise_addr, peer_seed_addrs, + Duration::from_millis(25), failure_detector_config, transport, ) @@ -654,7 +653,7 @@ pub async fn create_cluster_for_test_with_id( fn create_failure_detector_config_for_test() -> FailureDetectorConfig { FailureDetectorConfig { phi_threshold: 5.0, - initial_interval: GOSSIP_INTERVAL, + initial_interval: Duration::from_millis(25), ..Default::default() } } diff --git a/quickwit/quickwit-cluster/src/lib.rs b/quickwit/quickwit-cluster/src/lib.rs index 1b76ba175c7..800abb46001 100644 --- a/quickwit/quickwit-cluster/src/lib.rs +++ b/quickwit/quickwit-cluster/src/lib.rs @@ -90,6 +90,7 @@ pub async fn start_cluster_service(node_config: &NodeConfig) -> anyhow::Result, pub data_dir_path: PathBuf, pub metastore_uri: Uri, diff --git a/quickwit/quickwit-config/src/node_config/serialize.rs b/quickwit/quickwit-config/src/node_config/serialize.rs index 200fe49c91b..a15be97a3c6 100644 --- a/quickwit/quickwit-config/src/node_config/serialize.rs +++ b/quickwit/quickwit-config/src/node_config/serialize.rs @@ -20,6 +20,7 @@ use std::collections::HashMap; use std::net::{IpAddr, SocketAddr}; use std::str::FromStr; +use std::time::Duration; use anyhow::{bail, Context}; use http::HeaderMap; @@ -44,6 +45,12 @@ pub const DEFAULT_CLUSTER_ID: &str = "quickwit-default-cluster"; pub const DEFAULT_DATA_DIR_PATH: &str = "qwdata"; +pub const DEFAULT_GOSSIP_INTERVAL: Duration = if cfg!(any(test, feature = "testsuite")) { + Duration::from_millis(25) +} else { + Duration::from_secs(1) +}; + // Default config values in the order they appear in [`NodeConfigBuilder`]. fn default_cluster_id() -> ConfigValue { ConfigValue::with_default(DEFAULT_CLUSTER_ID.to_string()) @@ -175,6 +182,7 @@ struct NodeConfigBuilder { rest_listen_port: Option, gossip_listen_port: ConfigValue, grpc_listen_port: ConfigValue, + gossip_interval_ms: ConfigValue, #[serde(default)] peer_seeds: ConfigValue, #[serde(rename = "data_dir")] @@ -290,6 +298,12 @@ impl NodeConfigBuilder { self.ingest_api_config.validate()?; self.searcher_config.validate()?; + let gossip_interval = self + .gossip_interval_ms + .resolve_optional(env_vars)? + .map(|gossip_interval_ms| Duration::from_millis(gossip_interval_ms as u64)) + .unwrap_or(DEFAULT_GOSSIP_INTERVAL); + let node_config = NodeConfig { cluster_id: self.cluster_id.resolve(env_vars)?, node_id: self.node_id.resolve(env_vars)?, @@ -298,6 +312,7 @@ impl NodeConfigBuilder { grpc_listen_addr, gossip_advertise_addr, grpc_advertise_addr, + gossip_interval, peer_seeds: self.peer_seeds.resolve(env_vars)?.0, data_dir_path, metastore_uri, @@ -341,6 +356,7 @@ impl Default for NodeConfigBuilder { rest_listen_port: None, gossip_listen_port: ConfigValue::none(), grpc_listen_port: ConfigValue::none(), + gossip_interval_ms: ConfigValue::none(), advertise_address: ConfigValue::none(), peer_seeds: ConfigValue::with_default(List::default()), data_dir_uri: default_data_dir_uri(), @@ -434,6 +450,7 @@ pub fn node_config_for_test() -> NodeConfig { grpc_advertise_addr: grpc_listen_addr, gossip_listen_addr, grpc_listen_addr, + gossip_interval: Duration::from_millis(25u64), peer_seeds: Vec::new(), data_dir_path, metastore_uri, diff --git a/quickwit/quickwit-config/src/qw_env_vars.rs b/quickwit/quickwit-config/src/qw_env_vars.rs index 07b1dd2dce3..a8e07a407bf 100644 --- a/quickwit/quickwit-config/src/qw_env_vars.rs +++ b/quickwit/quickwit-config/src/qw_env_vars.rs @@ -56,6 +56,7 @@ qw_env_vars!( QW_REST_LISTEN_PORT, QW_GOSSIP_LISTEN_PORT, QW_GRPC_LISTEN_PORT, + QW_GOSSIP_INTERVAL_MS, QW_PEER_SEEDS, QW_DATA_DIR, QW_METASTORE_URI, diff --git a/quickwit/quickwit-lambda/src/indexer/ingest.rs b/quickwit/quickwit-lambda/src/indexer/ingest.rs index d6987cfdc7b..b449c81b84f 100644 --- a/quickwit/quickwit-lambda/src/indexer/ingest.rs +++ b/quickwit/quickwit-lambda/src/indexer/ingest.rs @@ -79,6 +79,7 @@ async fn create_empty_cluster(config: &NodeConfig) -> anyhow::Result { self_node, config.gossip_advertise_addr, Vec::new(), + config.gossip_interval, FailureDetectorConfig::default(), &ChannelTransport::default(), )