From 48eeb8c9983032a9db9eb20a48e393402823be0b Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Mon, 9 Sep 2024 15:56:43 +0800 Subject: [PATCH] add request_timeout config On very large datasets the fixed timeouts are too low for some queries. This PR adds a setting to configure the timeout. Two settings are introduced: - `request_timeout` on the node config - `QW_REQUEST_TIMEOUT` env parameter Currently there are two timeouts when doing a distributed search request, one from quickwit cluster when opening a channel and one from the search client. The timeout is applied to both (That means all cluster connections have the same request_timeout applied, not only search nodes) Related: https://github.com/quickwit-oss/quickwit/issues/5241 --- quickwit/quickwit-cli/src/tool.rs | 1 + quickwit/quickwit-cluster/src/change.rs | 29 +++++++++++++++++-- quickwit/quickwit-cluster/src/cluster.rs | 11 +++++++ quickwit/quickwit-cluster/src/grpc_gossip.rs | 10 +++++-- quickwit/quickwit-cluster/src/grpc_service.rs | 8 +++-- quickwit/quickwit-cluster/src/lib.rs | 1 + quickwit/quickwit-cluster/src/node.rs | 4 ++- .../quickwit-common/src/tower/transport.rs | 4 +-- .../quickwit-config/src/node_config/mod.rs | 1 + .../src/node_config/serialize.rs | 12 ++++++++ quickwit/quickwit-config/src/qw_env_vars.rs | 1 + .../src/indexer/ingest/helpers.rs | 1 + quickwit/quickwit-serve/src/lib.rs | 3 +- 13 files changed, 74 insertions(+), 12 deletions(-) diff --git a/quickwit/quickwit-cli/src/tool.rs b/quickwit/quickwit-cli/src/tool.rs index c7ab1911205..159819e069c 100644 --- a/quickwit/quickwit-cli/src/tool.rs +++ b/quickwit/quickwit-cli/src/tool.rs @@ -945,6 +945,7 @@ async fn create_empty_cluster(config: &NodeConfig) -> anyhow::Result { config.gossip_interval, FailureDetectorConfig::default(), &ChannelTransport::default(), + config.request_timeout, ) .await?; diff --git a/quickwit/quickwit-cluster/src/change.rs b/quickwit/quickwit-cluster/src/change.rs index 363c92573f3..f3f47e04659 100644 --- a/quickwit/quickwit-cluster/src/change.rs +++ b/quickwit/quickwit-cluster/src/change.rs @@ -21,6 +21,7 @@ use std::collections::btree_map::Entry; use std::collections::BTreeMap; use std::pin::Pin; use std::task::{Context, Poll}; +use std::time::Duration; use chitchat::{ChitchatId, NodeState}; use futures::Stream; @@ -79,6 +80,7 @@ pub(crate) async fn compute_cluster_change_events( previous_nodes: &mut BTreeMap, previous_node_states: &BTreeMap, new_node_states: &BTreeMap, + request_timeout: Duration, ) -> Vec { let mut cluster_events = Vec::new(); @@ -95,6 +97,7 @@ pub(crate) async fn compute_cluster_change_events( chitchat_id, node_state, previous_nodes, + request_timeout, ) .await; @@ -139,6 +142,7 @@ async fn compute_cluster_change_events_on_added( new_chitchat_id: &ChitchatId, new_node_state: &NodeState, previous_nodes: &mut BTreeMap, + request_timeout: Duration, ) -> Vec { let is_self_node = self_chitchat_id == new_chitchat_id; let new_node_id: NodeId = new_chitchat_id.node_id.clone().into(); @@ -166,8 +170,14 @@ async fn compute_cluster_change_events_on_added( events.push(ClusterChange::Remove(previous_node)); } } - let Some(new_node) = - try_new_node(cluster_id, new_chitchat_id, new_node_state, is_self_node).await + let Some(new_node) = try_new_node( + cluster_id, + new_chitchat_id, + new_node_state, + is_self_node, + request_timeout, + ) + .await else { return events; }; @@ -300,10 +310,11 @@ async fn try_new_node( chitchat_id: &ChitchatId, node_state: &NodeState, is_self_node: bool, + request_timeout: Duration, ) -> Option { match node_state.grpc_advertise_addr() { Ok(socket_addr) => { - let channel = make_channel(socket_addr).await; + let channel = make_channel(socket_addr, request_timeout).await; try_new_node_with_channel(cluster_id, chitchat_id, node_state, channel, is_self_node) } Err(error) => { @@ -443,6 +454,7 @@ pub(crate) mod tests { &new_chitchat_id, &new_node_state, &mut previous_nodes, + Duration::from_secs(30), ) .await; assert!(events.is_empty()); @@ -465,6 +477,7 @@ pub(crate) mod tests { &new_chitchat_id, &new_node_state, &mut previous_nodes, + Duration::from_secs(30), ) .await; assert!(events.is_empty()); @@ -493,6 +506,7 @@ pub(crate) mod tests { &new_chitchat_id, &new_node_state, &mut previous_nodes, + Duration::from_secs(30), ) .await; @@ -515,6 +529,7 @@ pub(crate) mod tests { &rejoined_chitchat_id, &new_node_state, &mut previous_nodes, + Duration::from_secs(30), ) .await; assert_eq!(events.len(), 2); @@ -543,6 +558,7 @@ pub(crate) mod tests { &new_chitchat_id, &new_node_state, &mut previous_nodes, + Duration::from_secs(30), ) .await; assert!(events.is_empty()); @@ -567,6 +583,7 @@ pub(crate) mod tests { &new_chitchat_id, &new_node_state, &mut previous_nodes, + Duration::from_secs(30), ) .await; assert_eq!(events.len(), 1); @@ -897,6 +914,7 @@ pub(crate) mod tests { &mut previous_nodes, &previous_node_states, &new_node_states, + Duration::from_secs(30), ) .await; assert!(events.is_empty()); @@ -926,6 +944,7 @@ pub(crate) mod tests { &mut previous_nodes, &previous_node_states, &new_node_states, + Duration::from_secs(30), ) .await; assert!(events.is_empty()); @@ -943,6 +962,7 @@ pub(crate) mod tests { &mut previous_nodes, &previous_node_states, &new_node_states, + Duration::from_secs(30), ) .await; assert_eq!(events.len(), 1); @@ -957,6 +977,7 @@ pub(crate) mod tests { &mut previous_nodes, &new_node_states, &new_node_states, + Duration::from_secs(30), ) .await; assert_eq!(events.len(), 0); @@ -989,6 +1010,7 @@ pub(crate) mod tests { &mut previous_nodes, &previous_node_states, &new_node_states, + Duration::from_secs(30), ) .await; assert_eq!(events.len(), 1); @@ -1008,6 +1030,7 @@ pub(crate) mod tests { &mut previous_nodes, &previous_node_states, &new_node_states, + Duration::from_secs(30), ) .await; assert_eq!(events.len(), 1); diff --git a/quickwit/quickwit-cluster/src/cluster.rs b/quickwit/quickwit-cluster/src/cluster.rs index 9305d6485d7..28b04dc8956 100644 --- a/quickwit/quickwit-cluster/src/cluster.rs +++ b/quickwit/quickwit-cluster/src/cluster.rs @@ -67,6 +67,7 @@ pub struct Cluster { /// Socket address (UDP) the node listens on for receiving gossip messages. pub gossip_listen_addr: SocketAddr, gossip_interval: Duration, + timeout: Duration, inner: Arc>, } @@ -90,6 +91,9 @@ impl Cluster { pub fn cluster_id(&self) -> &str { &self.cluster_id } + pub fn timeout(&self) -> Duration { + self.timeout + } pub fn self_chitchat_id(&self) -> &ChitchatId { &self.self_chitchat_id @@ -107,6 +111,7 @@ impl Cluster { self.self_chitchat_id.gossip_advertise_addr } + #[allow(clippy::too_many_arguments)] pub async fn join( cluster_id: String, self_node: ClusterMember, @@ -115,6 +120,7 @@ impl Cluster { gossip_interval: Duration, failure_detector_config: FailureDetectorConfig, transport: &dyn Transport, + request_timeout: Duration, ) -> anyhow::Result { info!( cluster_id=%cluster_id, @@ -185,6 +191,7 @@ impl Cluster { weak_chitchat, live_nodes_rx, catchup_callback_rx.clone(), + request_timeout, ) .await; @@ -201,6 +208,7 @@ impl Cluster { self_chitchat_id: self_node.chitchat_id(), gossip_listen_addr, gossip_interval, + timeout: request_timeout, inner: Arc::new(RwLock::new(inner)), }; spawn_change_stream_task(cluster.clone()).await; @@ -550,6 +558,7 @@ fn chitchat_kv_to_indexing_task(key: &str, value: &str) -> Option } async fn spawn_change_stream_task(cluster: Cluster) { + let request_timeout = cluster.timeout(); let cluster_guard = cluster.inner.read().await; let cluster_id = cluster_guard.cluster_id.clone(); let self_chitchat_id = cluster_guard.self_chitchat_id.clone(); @@ -575,6 +584,7 @@ async fn spawn_change_stream_task(cluster: Cluster) { previous_live_nodes, &previous_live_node_states, &new_live_node_states, + request_timeout, ) .await; if !events.is_empty() { @@ -691,6 +701,7 @@ pub async fn create_cluster_for_test_with_id( Duration::from_millis(25), failure_detector_config, transport, + Duration::from_secs(30), ) .await?; cluster.set_self_node_readiness(self_node_readiness).await; diff --git a/quickwit/quickwit-cluster/src/grpc_gossip.rs b/quickwit/quickwit-cluster/src/grpc_gossip.rs index 44b816b88e4..8a8480715d5 100644 --- a/quickwit/quickwit-cluster/src/grpc_gossip.rs +++ b/quickwit/quickwit-cluster/src/grpc_gossip.rs @@ -46,6 +46,7 @@ pub(crate) async fn spawn_catchup_callback_task( weak_chitchat: Weak>, live_nodes_rx: watch::Receiver>, mut catchup_callback_rx: watch::Receiver<()>, + request_timeout: Duration, ) { let catchup_callback_future = async move { let mut interval = tokio::time::interval(Duration::from_secs(60)); @@ -61,6 +62,7 @@ pub(crate) async fn spawn_catchup_callback_task( chitchat, live_nodes_rx.clone(), cluster_grpc_client, + request_timeout, ) .await; @@ -80,8 +82,9 @@ async fn perform_grpc_gossip_rounds( chitchat: Arc>, live_nodes_rx: watch::Receiver>, grpc_client_factory: Factory, + request_timeout: Duration, ) where - Factory: Fn(SocketAddr) -> Fut, + Factory: Fn(SocketAddr, Duration) -> Fut, Fut: Future, { wait_for_gossip_candidates( @@ -102,7 +105,7 @@ async fn perform_grpc_gossip_rounds( info!("pulling cluster state from node(s): {node_ids:?}"); for (node_id, grpc_advertise_addr) in zip(node_ids, grpc_advertise_addrs) { - let cluster_client = grpc_client_factory(grpc_advertise_addr).await; + let cluster_client = grpc_client_factory(grpc_advertise_addr, request_timeout).await; let request = FetchClusterStateRequest { cluster_id: cluster_id.clone(), @@ -272,7 +275,7 @@ mod tests { let self_chitchat_id = cluster.self_chitchat_id(); let chitchat = cluster.chitchat().await; - let grpc_client_factory = |_: SocketAddr| { + let grpc_client_factory = |_: SocketAddr, _: Duration| { Box::pin(async { let mut mock_cluster_service = MockClusterService::new(); mock_cluster_service @@ -336,6 +339,7 @@ mod tests { chitchat.clone(), live_nodes_rx, grpc_client_factory, + Duration::from_secs(30), ) .await; diff --git a/quickwit/quickwit-cluster/src/grpc_service.rs b/quickwit/quickwit-cluster/src/grpc_service.rs index a9c3ec4f7f2..d4aa5e00aef 100644 --- a/quickwit/quickwit-cluster/src/grpc_service.rs +++ b/quickwit/quickwit-cluster/src/grpc_service.rs @@ -18,6 +18,7 @@ // along with this program. If not, see . use std::net::SocketAddr; +use std::time::Duration; use bytesize::ByteSize; use itertools::Itertools; @@ -40,8 +41,11 @@ static CLUSTER_GRPC_CLIENT_METRICS_LAYER: Lazy = static CLUSTER_GRPC_SERVER_METRICS_LAYER: Lazy = Lazy::new(|| GrpcMetricsLayer::new("cluster", "server")); -pub(crate) async fn cluster_grpc_client(socket_addr: SocketAddr) -> ClusterServiceClient { - let channel = make_channel(socket_addr).await; +pub(crate) async fn cluster_grpc_client( + socket_addr: SocketAddr, + request_timeout: Duration, +) -> ClusterServiceClient { + let channel = make_channel(socket_addr, request_timeout).await; ClusterServiceClient::tower() .stack_layer(CLUSTER_GRPC_CLIENT_METRICS_LAYER.clone()) diff --git a/quickwit/quickwit-cluster/src/lib.rs b/quickwit/quickwit-cluster/src/lib.rs index 8077d0a229a..c9ffa182e37 100644 --- a/quickwit/quickwit-cluster/src/lib.rs +++ b/quickwit/quickwit-cluster/src/lib.rs @@ -159,6 +159,7 @@ pub async fn start_cluster_service(node_config: &NodeConfig) -> anyhow::Result Self { + use std::time::Duration; + use quickwit_common::tower::make_channel; use crate::cluster::set_indexing_tasks_in_node_state; @@ -76,7 +78,7 @@ impl ClusterNode { let gossip_advertise_addr = ([127, 0, 0, 1], port).into(); let grpc_advertise_addr = ([127, 0, 0, 1], port + 1).into(); let chitchat_id = ChitchatId::new(node_id.to_string(), 0, gossip_advertise_addr); - let channel = make_channel(grpc_advertise_addr).await; + let channel = make_channel(grpc_advertise_addr, Duration::from_secs(30)).await; let mut node_state = NodeState::for_test(); node_state.set(ENABLED_SERVICES_KEY, enabled_services.join(",")); node_state.set(GRPC_ADVERTISE_ADDR_KEY, grpc_advertise_addr.to_string()); diff --git a/quickwit/quickwit-common/src/tower/transport.rs b/quickwit/quickwit-common/src/tower/transport.rs index 7cbe70fb192..7434951386e 100644 --- a/quickwit/quickwit-common/src/tower/transport.rs +++ b/quickwit/quickwit-common/src/tower/transport.rs @@ -200,7 +200,7 @@ where K: Hash + Eq + Clone + Send + Sync + 'static /// Creates a channel from a socket address. /// /// The function is marked as `async` because it requires an executor (`connect_lazy`). -pub async fn make_channel(socket_addr: SocketAddr) -> Channel { +pub async fn make_channel(socket_addr: SocketAddr, request_timeout: Duration) -> Channel { let uri = Uri::builder() .scheme("http") .authority(socket_addr.to_string()) @@ -209,7 +209,7 @@ pub async fn make_channel(socket_addr: SocketAddr) -> Channel { .expect("provided arguments should be valid"); Endpoint::from(uri) .connect_timeout(Duration::from_secs(5)) - .timeout(Duration::from_secs(30)) + .timeout(request_timeout) .connect_lazy() } diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index 2321bd0399a..7329cf85785 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -418,6 +418,7 @@ pub struct NodeConfig { pub gossip_advertise_addr: SocketAddr, pub grpc_advertise_addr: SocketAddr, pub gossip_interval: Duration, + pub request_timeout: Duration, pub peer_seeds: Vec, pub data_dir_path: PathBuf, pub metastore_uri: Uri, diff --git a/quickwit/quickwit-config/src/node_config/serialize.rs b/quickwit/quickwit-config/src/node_config/serialize.rs index 81b9260f01d..f8af3a2a307 100644 --- a/quickwit/quickwit-config/src/node_config/serialize.rs +++ b/quickwit/quickwit-config/src/node_config/serialize.rs @@ -52,6 +52,8 @@ pub const DEFAULT_GOSSIP_INTERVAL: Duration = if cfg!(any(test, feature = "tests Duration::from_secs(1) }; +pub const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(30); + // Default config values in the order they appear in [`NodeConfigBuilder`]. fn default_cluster_id() -> ConfigValue { ConfigValue::with_default(DEFAULT_CLUSTER_ID.to_string()) @@ -182,6 +184,7 @@ struct NodeConfigBuilder { gossip_listen_port: ConfigValue, grpc_listen_port: ConfigValue, gossip_interval_ms: ConfigValue, + request_timeout_sec: ConfigValue, #[serde(default)] peer_seeds: ConfigValue, #[serde(rename = "data_dir")] @@ -305,6 +308,12 @@ impl NodeConfigBuilder { .map(|gossip_interval_ms| Duration::from_millis(gossip_interval_ms as u64)) .unwrap_or(DEFAULT_GOSSIP_INTERVAL); + let request_timeout = self + .request_timeout_sec + .resolve_optional(env_vars)? + .map(|request_timeout_sec| Duration::from_secs(request_timeout_sec as u64)) + .unwrap_or(DEFAULT_REQUEST_TIMEOUT); + let node_config = NodeConfig { cluster_id: self.cluster_id.resolve(env_vars)?, node_id, @@ -314,6 +323,7 @@ impl NodeConfigBuilder { gossip_advertise_addr, grpc_advertise_addr, gossip_interval, + request_timeout, peer_seeds: self.peer_seeds.resolve(env_vars)?.0, data_dir_path, metastore_uri, @@ -358,6 +368,7 @@ impl Default for NodeConfigBuilder { gossip_listen_port: ConfigValue::none(), grpc_listen_port: ConfigValue::none(), gossip_interval_ms: ConfigValue::none(), + request_timeout_sec: ConfigValue::none(), advertise_address: ConfigValue::none(), peer_seeds: ConfigValue::with_default(List::default()), data_dir_uri: default_data_dir_uri(), @@ -452,6 +463,7 @@ pub fn node_config_for_tests_from_ports( gossip_listen_addr, grpc_listen_addr, gossip_interval: Duration::from_millis(25u64), + request_timeout: Duration::from_secs(30), peer_seeds: Vec::new(), data_dir_path, metastore_uri, diff --git a/quickwit/quickwit-config/src/qw_env_vars.rs b/quickwit/quickwit-config/src/qw_env_vars.rs index a8e07a407bf..92943c4020e 100644 --- a/quickwit/quickwit-config/src/qw_env_vars.rs +++ b/quickwit/quickwit-config/src/qw_env_vars.rs @@ -57,6 +57,7 @@ qw_env_vars!( QW_GOSSIP_LISTEN_PORT, QW_GRPC_LISTEN_PORT, QW_GOSSIP_INTERVAL_MS, + QW_REQUEST_TIMEOUT, QW_PEER_SEEDS, QW_DATA_DIR, QW_METASTORE_URI, diff --git a/quickwit/quickwit-lambda/src/indexer/ingest/helpers.rs b/quickwit/quickwit-lambda/src/indexer/ingest/helpers.rs index 6471d636ba1..d3bbc3ccd8d 100644 --- a/quickwit/quickwit-lambda/src/indexer/ingest/helpers.rs +++ b/quickwit/quickwit-lambda/src/indexer/ingest/helpers.rs @@ -87,6 +87,7 @@ pub(super) async fn create_empty_cluster( config.gossip_interval, FailureDetectorConfig::default(), &ChannelTransport::default(), + config.request_timeout, ) .await?; Ok(cluster) diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index ab244c9297a..98359d148c6 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -990,6 +990,7 @@ async fn setup_searcher( .await?; let search_service_clone = search_service.clone(); let max_message_size = node_config.grpc_config.max_message_size; + let request_timeout = node_config.request_timeout; let searcher_change_stream = cluster_change_stream.filter_map(move |cluster_change| { let search_service_clone = search_service_clone.clone(); Box::pin(async move { @@ -1009,7 +1010,7 @@ async fn setup_searcher( SearchServiceClient::from_service(search_service_clone, grpc_addr); Some(Change::Insert(grpc_addr, search_client)) } else { - let timeout_channel = Timeout::new(node.channel(), Duration::from_secs(30)); + let timeout_channel = Timeout::new(node.channel(), request_timeout); let search_client = create_search_client_from_channel( grpc_addr, timeout_channel,