Skip to content

Commit

Permalink
add request_timeout config
Browse files Browse the repository at this point in the history
On very large datasets the fixed timeouts are too low for some queries.
This PR adds a setting to configure the timeout.

Two settings are introduced:
- `request_timeout` on the node config
- `QW_REQUEST_TIMEOUT` env parameter

Currently there are two timeouts when doing a distributed search request, one from quickwit cluster when opening a
channel and one from the search client.
The timeout is applied to both (That means all cluster connections have
the same request_timeout applied, not only search nodes)

Related: #5241
  • Loading branch information
PSeitz committed Sep 16, 2024
1 parent bef4143 commit 48eeb8c
Show file tree
Hide file tree
Showing 13 changed files with 74 additions and 12 deletions.
1 change: 1 addition & 0 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,7 @@ async fn create_empty_cluster(config: &NodeConfig) -> anyhow::Result<Cluster> {
config.gossip_interval,
FailureDetectorConfig::default(),
&ChannelTransport::default(),
config.request_timeout,
)
.await?;

Expand Down
29 changes: 26 additions & 3 deletions quickwit/quickwit-cluster/src/change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

use chitchat::{ChitchatId, NodeState};
use futures::Stream;
Expand Down Expand Up @@ -79,6 +80,7 @@ pub(crate) async fn compute_cluster_change_events(
previous_nodes: &mut BTreeMap<NodeId, ClusterNode>,
previous_node_states: &BTreeMap<ChitchatId, NodeState>,
new_node_states: &BTreeMap<ChitchatId, NodeState>,
request_timeout: Duration,
) -> Vec<ClusterChange> {
let mut cluster_events = Vec::new();

Expand All @@ -95,6 +97,7 @@ pub(crate) async fn compute_cluster_change_events(
chitchat_id,
node_state,
previous_nodes,
request_timeout,
)
.await;

Expand Down Expand Up @@ -139,6 +142,7 @@ async fn compute_cluster_change_events_on_added(
new_chitchat_id: &ChitchatId,
new_node_state: &NodeState,
previous_nodes: &mut BTreeMap<NodeId, ClusterNode>,
request_timeout: Duration,
) -> Vec<ClusterChange> {
let is_self_node = self_chitchat_id == new_chitchat_id;
let new_node_id: NodeId = new_chitchat_id.node_id.clone().into();
Expand Down Expand Up @@ -166,8 +170,14 @@ async fn compute_cluster_change_events_on_added(
events.push(ClusterChange::Remove(previous_node));
}
}
let Some(new_node) =
try_new_node(cluster_id, new_chitchat_id, new_node_state, is_self_node).await
let Some(new_node) = try_new_node(
cluster_id,
new_chitchat_id,
new_node_state,
is_self_node,
request_timeout,
)
.await
else {
return events;
};
Expand Down Expand Up @@ -300,10 +310,11 @@ async fn try_new_node(
chitchat_id: &ChitchatId,
node_state: &NodeState,
is_self_node: bool,
request_timeout: Duration,
) -> Option<ClusterNode> {
match node_state.grpc_advertise_addr() {
Ok(socket_addr) => {
let channel = make_channel(socket_addr).await;
let channel = make_channel(socket_addr, request_timeout).await;
try_new_node_with_channel(cluster_id, chitchat_id, node_state, channel, is_self_node)
}
Err(error) => {
Expand Down Expand Up @@ -443,6 +454,7 @@ pub(crate) mod tests {
&new_chitchat_id,
&new_node_state,
&mut previous_nodes,
Duration::from_secs(30),
)
.await;
assert!(events.is_empty());
Expand All @@ -465,6 +477,7 @@ pub(crate) mod tests {
&new_chitchat_id,
&new_node_state,
&mut previous_nodes,
Duration::from_secs(30),
)
.await;
assert!(events.is_empty());
Expand Down Expand Up @@ -493,6 +506,7 @@ pub(crate) mod tests {
&new_chitchat_id,
&new_node_state,
&mut previous_nodes,
Duration::from_secs(30),
)
.await;

Expand All @@ -515,6 +529,7 @@ pub(crate) mod tests {
&rejoined_chitchat_id,
&new_node_state,
&mut previous_nodes,
Duration::from_secs(30),
)
.await;
assert_eq!(events.len(), 2);
Expand Down Expand Up @@ -543,6 +558,7 @@ pub(crate) mod tests {
&new_chitchat_id,
&new_node_state,
&mut previous_nodes,
Duration::from_secs(30),
)
.await;
assert!(events.is_empty());
Expand All @@ -567,6 +583,7 @@ pub(crate) mod tests {
&new_chitchat_id,
&new_node_state,
&mut previous_nodes,
Duration::from_secs(30),
)
.await;
assert_eq!(events.len(), 1);
Expand Down Expand Up @@ -897,6 +914,7 @@ pub(crate) mod tests {
&mut previous_nodes,
&previous_node_states,
&new_node_states,
Duration::from_secs(30),
)
.await;
assert!(events.is_empty());
Expand Down Expand Up @@ -926,6 +944,7 @@ pub(crate) mod tests {
&mut previous_nodes,
&previous_node_states,
&new_node_states,
Duration::from_secs(30),
)
.await;
assert!(events.is_empty());
Expand All @@ -943,6 +962,7 @@ pub(crate) mod tests {
&mut previous_nodes,
&previous_node_states,
&new_node_states,
Duration::from_secs(30),
)
.await;
assert_eq!(events.len(), 1);
Expand All @@ -957,6 +977,7 @@ pub(crate) mod tests {
&mut previous_nodes,
&new_node_states,
&new_node_states,
Duration::from_secs(30),
)
.await;
assert_eq!(events.len(), 0);
Expand Down Expand Up @@ -989,6 +1010,7 @@ pub(crate) mod tests {
&mut previous_nodes,
&previous_node_states,
&new_node_states,
Duration::from_secs(30),
)
.await;
assert_eq!(events.len(), 1);
Expand All @@ -1008,6 +1030,7 @@ pub(crate) mod tests {
&mut previous_nodes,
&previous_node_states,
&new_node_states,
Duration::from_secs(30),
)
.await;
assert_eq!(events.len(), 1);
Expand Down
11 changes: 11 additions & 0 deletions quickwit/quickwit-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub struct Cluster {
/// Socket address (UDP) the node listens on for receiving gossip messages.
pub gossip_listen_addr: SocketAddr,
gossip_interval: Duration,
timeout: Duration,
inner: Arc<RwLock<InnerCluster>>,
}

Expand All @@ -90,6 +91,9 @@ impl Cluster {
pub fn cluster_id(&self) -> &str {
&self.cluster_id
}
pub fn timeout(&self) -> Duration {
self.timeout
}

pub fn self_chitchat_id(&self) -> &ChitchatId {
&self.self_chitchat_id
Expand All @@ -107,6 +111,7 @@ impl Cluster {
self.self_chitchat_id.gossip_advertise_addr
}

#[allow(clippy::too_many_arguments)]
pub async fn join(
cluster_id: String,
self_node: ClusterMember,
Expand All @@ -115,6 +120,7 @@ impl Cluster {
gossip_interval: Duration,
failure_detector_config: FailureDetectorConfig,
transport: &dyn Transport,
request_timeout: Duration,
) -> anyhow::Result<Self> {
info!(
cluster_id=%cluster_id,
Expand Down Expand Up @@ -185,6 +191,7 @@ impl Cluster {
weak_chitchat,
live_nodes_rx,
catchup_callback_rx.clone(),
request_timeout,
)
.await;

Expand All @@ -201,6 +208,7 @@ impl Cluster {
self_chitchat_id: self_node.chitchat_id(),
gossip_listen_addr,
gossip_interval,
timeout: request_timeout,
inner: Arc::new(RwLock::new(inner)),
};
spawn_change_stream_task(cluster.clone()).await;
Expand Down Expand Up @@ -550,6 +558,7 @@ fn chitchat_kv_to_indexing_task(key: &str, value: &str) -> Option<IndexingTask>
}

async fn spawn_change_stream_task(cluster: Cluster) {
let request_timeout = cluster.timeout();
let cluster_guard = cluster.inner.read().await;
let cluster_id = cluster_guard.cluster_id.clone();
let self_chitchat_id = cluster_guard.self_chitchat_id.clone();
Expand All @@ -575,6 +584,7 @@ async fn spawn_change_stream_task(cluster: Cluster) {
previous_live_nodes,
&previous_live_node_states,
&new_live_node_states,
request_timeout,
)
.await;
if !events.is_empty() {
Expand Down Expand Up @@ -691,6 +701,7 @@ pub async fn create_cluster_for_test_with_id(
Duration::from_millis(25),
failure_detector_config,
transport,
Duration::from_secs(30),
)
.await?;
cluster.set_self_node_readiness(self_node_readiness).await;
Expand Down
10 changes: 7 additions & 3 deletions quickwit/quickwit-cluster/src/grpc_gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub(crate) async fn spawn_catchup_callback_task(
weak_chitchat: Weak<Mutex<Chitchat>>,
live_nodes_rx: watch::Receiver<BTreeMap<ChitchatId, NodeState>>,
mut catchup_callback_rx: watch::Receiver<()>,
request_timeout: Duration,
) {
let catchup_callback_future = async move {
let mut interval = tokio::time::interval(Duration::from_secs(60));
Expand All @@ -61,6 +62,7 @@ pub(crate) async fn spawn_catchup_callback_task(
chitchat,
live_nodes_rx.clone(),
cluster_grpc_client,
request_timeout,
)
.await;

Expand All @@ -80,8 +82,9 @@ async fn perform_grpc_gossip_rounds<Factory, Fut>(
chitchat: Arc<Mutex<Chitchat>>,
live_nodes_rx: watch::Receiver<BTreeMap<ChitchatId, NodeState>>,
grpc_client_factory: Factory,
request_timeout: Duration,
) where
Factory: Fn(SocketAddr) -> Fut,
Factory: Fn(SocketAddr, Duration) -> Fut,
Fut: Future<Output = ClusterServiceClient>,
{
wait_for_gossip_candidates(
Expand All @@ -102,7 +105,7 @@ async fn perform_grpc_gossip_rounds<Factory, Fut>(
info!("pulling cluster state from node(s): {node_ids:?}");

for (node_id, grpc_advertise_addr) in zip(node_ids, grpc_advertise_addrs) {
let cluster_client = grpc_client_factory(grpc_advertise_addr).await;
let cluster_client = grpc_client_factory(grpc_advertise_addr, request_timeout).await;

let request = FetchClusterStateRequest {
cluster_id: cluster_id.clone(),
Expand Down Expand Up @@ -272,7 +275,7 @@ mod tests {
let self_chitchat_id = cluster.self_chitchat_id();
let chitchat = cluster.chitchat().await;

let grpc_client_factory = |_: SocketAddr| {
let grpc_client_factory = |_: SocketAddr, _: Duration| {
Box::pin(async {
let mut mock_cluster_service = MockClusterService::new();
mock_cluster_service
Expand Down Expand Up @@ -336,6 +339,7 @@ mod tests {
chitchat.clone(),
live_nodes_rx,
grpc_client_factory,
Duration::from_secs(30),
)
.await;

Expand Down
8 changes: 6 additions & 2 deletions quickwit/quickwit-cluster/src/grpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::net::SocketAddr;
use std::time::Duration;

use bytesize::ByteSize;
use itertools::Itertools;
Expand All @@ -40,8 +41,11 @@ static CLUSTER_GRPC_CLIENT_METRICS_LAYER: Lazy<GrpcMetricsLayer> =
static CLUSTER_GRPC_SERVER_METRICS_LAYER: Lazy<GrpcMetricsLayer> =
Lazy::new(|| GrpcMetricsLayer::new("cluster", "server"));

pub(crate) async fn cluster_grpc_client(socket_addr: SocketAddr) -> ClusterServiceClient {
let channel = make_channel(socket_addr).await;
pub(crate) async fn cluster_grpc_client(
socket_addr: SocketAddr,
request_timeout: Duration,
) -> ClusterServiceClient {
let channel = make_channel(socket_addr, request_timeout).await;

ClusterServiceClient::tower()
.stack_layer(CLUSTER_GRPC_CLIENT_METRICS_LAYER.clone())
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ pub async fn start_cluster_service(node_config: &NodeConfig) -> anyhow::Result<C
node_config.gossip_interval,
failure_detector_config,
&CountingUdpTransport,
node_config.request_timeout,
)
.await?;
if node_config
Expand Down
4 changes: 3 additions & 1 deletion quickwit/quickwit-cluster/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ impl ClusterNode {
enabled_services: &[&str],
indexing_tasks: &[IndexingTask],
) -> Self {
use std::time::Duration;

use quickwit_common::tower::make_channel;

use crate::cluster::set_indexing_tasks_in_node_state;
Expand All @@ -76,7 +78,7 @@ impl ClusterNode {
let gossip_advertise_addr = ([127, 0, 0, 1], port).into();
let grpc_advertise_addr = ([127, 0, 0, 1], port + 1).into();
let chitchat_id = ChitchatId::new(node_id.to_string(), 0, gossip_advertise_addr);
let channel = make_channel(grpc_advertise_addr).await;
let channel = make_channel(grpc_advertise_addr, Duration::from_secs(30)).await;
let mut node_state = NodeState::for_test();
node_state.set(ENABLED_SERVICES_KEY, enabled_services.join(","));
node_state.set(GRPC_ADVERTISE_ADDR_KEY, grpc_advertise_addr.to_string());
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-common/src/tower/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ where K: Hash + Eq + Clone + Send + Sync + 'static
/// Creates a channel from a socket address.
///
/// The function is marked as `async` because it requires an executor (`connect_lazy`).
pub async fn make_channel(socket_addr: SocketAddr) -> Channel {
pub async fn make_channel(socket_addr: SocketAddr, request_timeout: Duration) -> Channel {
let uri = Uri::builder()
.scheme("http")
.authority(socket_addr.to_string())
Expand All @@ -209,7 +209,7 @@ pub async fn make_channel(socket_addr: SocketAddr) -> Channel {
.expect("provided arguments should be valid");
Endpoint::from(uri)
.connect_timeout(Duration::from_secs(5))
.timeout(Duration::from_secs(30))
.timeout(request_timeout)
.connect_lazy()
}

Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-config/src/node_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ pub struct NodeConfig {
pub gossip_advertise_addr: SocketAddr,
pub grpc_advertise_addr: SocketAddr,
pub gossip_interval: Duration,
pub request_timeout: Duration,
pub peer_seeds: Vec<String>,
pub data_dir_path: PathBuf,
pub metastore_uri: Uri,
Expand Down
Loading

0 comments on commit 48eeb8c

Please sign in to comment.