From 7d357fa54f92a84f59f47ba04e1a36be5d0eef2d Mon Sep 17 00:00:00 2001 From: PSeitz Date: Tue, 17 Sep 2024 14:25:25 +0800 Subject: [PATCH] add request_timeout_secs config to searcher config (#5402) * add request_timeout config On very large datasets the fixed timeouts are too low for some queries. This PR adds a setting to configure the timeout. Two settings are introduced: - `request_timeout` on the node config - `QW_REQUEST_TIMEOUT` env parameter Currently there are two timeouts when doing a distributed search request, one from quickwit cluster when opening a channel and one from the search client. The timeout is applied to both (That means all cluster connections have the same request_timeout applied, not only search nodes) Related: https://github.com/quickwit-oss/quickwit/issues/5241 * move timeout to search config, add timeout tower layer * cancel search after timeout * use tokio::timeout * use global timeoutlayer --- quickwit/quickwit-common/src/tower/mod.rs | 2 + quickwit/quickwit-common/src/tower/timeout.rs | 144 ++++++++++++++++++ .../quickwit-common/src/tower/transport.rs | 1 - .../quickwit-config/src/node_config/mod.rs | 10 ++ .../src/node_config/serialize.rs | 1 + .../quickwit-proto/src/control_plane/mod.rs | 8 +- quickwit/quickwit-proto/src/indexing/mod.rs | 7 +- quickwit/quickwit-proto/src/ingest/mod.rs | 6 + quickwit/quickwit-search/src/error.rs | 6 + quickwit/quickwit-search/src/leaf.rs | 7 +- quickwit/quickwit-search/src/retry/search.rs | 1 + quickwit/quickwit-serve/src/lib.rs | 10 +- 12 files changed, 197 insertions(+), 6 deletions(-) create mode 100644 quickwit/quickwit-common/src/tower/timeout.rs diff --git a/quickwit/quickwit-common/src/tower/mod.rs b/quickwit/quickwit-common/src/tower/mod.rs index 0d761e0bcfc..ca055379bd5 100644 --- a/quickwit/quickwit-common/src/tower/mod.rs +++ b/quickwit/quickwit-common/src/tower/mod.rs @@ -33,6 +33,7 @@ mod rate; mod rate_estimator; mod rate_limit; mod retry; +mod timeout; mod transport; use std::error; @@ -55,6 +56,7 @@ pub use rate::{ConstantRate, Rate}; pub use rate_estimator::{RateEstimator, SmaRateEstimator}; pub use rate_limit::{RateLimit, RateLimitLayer}; pub use retry::{RetryLayer, RetryPolicy}; +pub use timeout::{Timeout, TimeoutExceeded, TimeoutLayer}; pub use transport::{make_channel, warmup_channel, BalanceChannel}; pub type BoxError = Box; diff --git a/quickwit/quickwit-common/src/tower/timeout.rs b/quickwit/quickwit-common/src/tower/timeout.rs new file mode 100644 index 00000000000..631be52e091 --- /dev/null +++ b/quickwit/quickwit-common/src/tower/timeout.rs @@ -0,0 +1,144 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Duration; + +use pin_project::pin_project; +use tokio::time::Sleep; +use tower::{Layer, Service}; + +#[derive(Debug, Clone)] +pub struct Timeout { + service: S, + timeout: Duration, +} +impl Timeout { + /// Creates a new [`Timeout`] + pub fn new(service: S, timeout: Duration) -> Self { + Timeout { service, timeout } + } +} + +impl Service for Timeout +where + S: Service, + S::Error: From, +{ + type Response = S::Response; + type Error = S::Error; + type Future = TimeoutFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.service.poll_ready(cx) + } + + fn call(&mut self, request: R) -> Self::Future { + TimeoutFuture { + inner: self.service.call(request), + sleep: tokio::time::sleep(self.timeout), + } + } +} + +/// The error type for the `Timeout` service. +#[derive(Debug, PartialEq, Eq)] +pub struct TimeoutExceeded; + +#[pin_project] +#[derive(Debug)] +pub struct TimeoutFuture { + #[pin] + inner: F, + #[pin] + sleep: Sleep, +} + +impl Future for TimeoutFuture +where + F: Future>, + E: From, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + + match this.inner.poll(cx) { + Poll::Ready(v) => return Poll::Ready(v), + Poll::Pending => {} + } + + // Now check the timeout + match this.sleep.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(_) => Poll::Ready(Err(TimeoutExceeded.into())), + } + } +} + +/// This is similar to tower's Timeout Layer except it requires +/// the error of the service to implement `From`. +/// +/// If the inner service does not complete within the specified duration, +/// the response will be aborted with the error `TimeoutExceeded`. +#[derive(Debug, Clone)] +pub struct TimeoutLayer { + timeout: Duration, +} + +impl TimeoutLayer { + /// Creates a new `TimeoutLayer` with the specified delay. + pub fn new(timeout: Duration) -> Self { + Self { timeout } + } +} + +impl Layer for TimeoutLayer { + type Service = Timeout; + + fn layer(&self, service: S) -> Self::Service { + Timeout::new(service, self.timeout) + } +} + +#[cfg(test)] +mod tests { + use tokio::time::Duration; + use tower::{ServiceBuilder, ServiceExt}; + + use super::*; + + #[tokio::test] + async fn test_timeout() { + let delay = Duration::from_millis(100); + let mut service = ServiceBuilder::new() + .layer(TimeoutLayer::new(delay)) + .service_fn(|_| async { + // sleep for 1 sec + tokio::time::sleep(Duration::from_secs(1)).await; + Ok::<_, TimeoutExceeded>(()) + }); + + let res = service.ready().await.unwrap().call(()).await; + assert_eq!(res, Err(TimeoutExceeded)); + } +} diff --git a/quickwit/quickwit-common/src/tower/transport.rs b/quickwit/quickwit-common/src/tower/transport.rs index 7cbe70fb192..8944427f7db 100644 --- a/quickwit/quickwit-common/src/tower/transport.rs +++ b/quickwit/quickwit-common/src/tower/transport.rs @@ -209,7 +209,6 @@ pub async fn make_channel(socket_addr: SocketAddr) -> Channel { .expect("provided arguments should be valid"); Endpoint::from(uri) .connect_timeout(Duration::from_secs(5)) - .timeout(Duration::from_secs(30)) .connect_lazy() } diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index 2321bd0399a..569dd94c55a 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -221,6 +221,8 @@ pub struct SearcherConfig { // TODO document and fix if necessary. #[serde(default, skip_serializing_if = "Option::is_none")] pub split_cache: Option, + #[serde(default = "SearcherConfig::default_request_timeout_secs")] + request_timeout_secs: NonZeroU64, } impl Default for SearcherConfig { @@ -234,11 +236,19 @@ impl Default for SearcherConfig { aggregation_memory_limit: ByteSize::mb(500), aggregation_bucket_limit: 65000, split_cache: None, + request_timeout_secs: Self::default_request_timeout_secs(), } } } impl SearcherConfig { + /// The timeout after which a search should be cancelled + pub fn request_timeout(&self) -> Duration { + Duration::from_secs(self.request_timeout_secs.get()) + } + fn default_request_timeout_secs() -> NonZeroU64 { + NonZeroU64::new(30).unwrap() + } fn validate(&self) -> anyhow::Result<()> { if let Some(split_cache_limits) = self.split_cache { if self.max_num_concurrent_split_searches diff --git a/quickwit/quickwit-config/src/node_config/serialize.rs b/quickwit/quickwit-config/src/node_config/serialize.rs index 81b9260f01d..806a7abd520 100644 --- a/quickwit/quickwit-config/src/node_config/serialize.rs +++ b/quickwit/quickwit-config/src/node_config/serialize.rs @@ -611,6 +611,7 @@ mod tests { max_num_concurrent_split_searches: 150, max_num_concurrent_split_streams: 120, split_cache: None, + request_timeout_secs: NonZeroU64::new(30).unwrap(), } ); assert_eq!( diff --git a/quickwit/quickwit-proto/src/control_plane/mod.rs b/quickwit/quickwit-proto/src/control_plane/mod.rs index 5a1becaf186..8184851845e 100644 --- a/quickwit/quickwit-proto/src/control_plane/mod.rs +++ b/quickwit/quickwit-proto/src/control_plane/mod.rs @@ -19,7 +19,7 @@ use quickwit_actors::AskError; use quickwit_common::rate_limited_error; -use quickwit_common::tower::{MakeLoadShedError, RpcName}; +use quickwit_common::tower::{MakeLoadShedError, RpcName, TimeoutExceeded}; use thiserror; use crate::metastore::{MetastoreError, OpenShardSubrequest}; @@ -44,6 +44,12 @@ pub enum ControlPlaneError { Unavailable(String), } +impl From for ControlPlaneError { + fn from(_timeout_exceeded: TimeoutExceeded) -> Self { + Self::Timeout("tower layer timeout".to_string()) + } +} + impl From for ControlPlaneError { fn from(task_cancelled: quickwit_common::tower::TaskCancelled) -> Self { ControlPlaneError::Internal(task_cancelled.to_string()) diff --git a/quickwit/quickwit-proto/src/indexing/mod.rs b/quickwit/quickwit-proto/src/indexing/mod.rs index 16e96495d94..b621f447ef1 100644 --- a/quickwit/quickwit-proto/src/indexing/mod.rs +++ b/quickwit/quickwit-proto/src/indexing/mod.rs @@ -26,7 +26,7 @@ use bytesize::ByteSize; use quickwit_actors::AskError; use quickwit_common::pubsub::Event; use quickwit_common::rate_limited_error; -use quickwit_common::tower::{MakeLoadShedError, RpcName}; +use quickwit_common::tower::{MakeLoadShedError, RpcName, TimeoutExceeded}; use serde::{Deserialize, Serialize}; use thiserror; @@ -52,6 +52,11 @@ pub enum IndexingError { #[error("service unavailable: {0}")] Unavailable(String), } +impl From for IndexingError { + fn from(_timeout_exceeded: TimeoutExceeded) -> Self { + Self::Timeout("tower layer timeout".to_string()) + } +} impl ServiceError for IndexingError { fn error_code(&self) -> ServiceErrorCode { diff --git a/quickwit/quickwit-proto/src/ingest/mod.rs b/quickwit/quickwit-proto/src/ingest/mod.rs index d55c808fbcb..48a410cd5ba 100644 --- a/quickwit/quickwit-proto/src/ingest/mod.rs +++ b/quickwit/quickwit-proto/src/ingest/mod.rs @@ -67,6 +67,12 @@ pub enum IngestV2Error { Unavailable(String), } +impl From for IngestV2Error { + fn from(_: quickwit_common::tower::TimeoutExceeded) -> IngestV2Error { + IngestV2Error::Timeout("tower layer timeout".to_string()) + } +} + impl From for IngestV2Error { fn from(task_cancelled: quickwit_common::tower::TaskCancelled) -> IngestV2Error { IngestV2Error::Internal(task_cancelled.to_string()) diff --git a/quickwit/quickwit-search/src/error.rs b/quickwit/quickwit-search/src/error.rs index 0dbdff10e41..17f44f49e1e 100644 --- a/quickwit/quickwit-search/src/error.rs +++ b/quickwit/quickwit-search/src/error.rs @@ -115,6 +115,12 @@ impl From for SearchError { } } +impl From for SearchError { + fn from(_elapsed: tokio::time::error::Elapsed) -> Self { + SearchError::Timeout("timeout exceeded".to_string()) + } +} + impl From for SearchError { fn from(error: postcard::Error) -> Self { SearchError::Internal(format!("Postcard error: {error}")) diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index 761c51bd16b..cbad5d87728 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -1119,7 +1119,11 @@ pub async fn multi_leaf_search( leaf_request_tasks.push(leaf_request_future); } - let leaf_responses = try_join_all(leaf_request_tasks).await?; + let leaf_responses: Vec> = tokio::time::timeout( + searcher_context.searcher_config.request_timeout(), + try_join_all(leaf_request_tasks), + ) + .await??; let merge_collector = make_merge_collector(&search_request, &aggregation_limits)?; let mut incremental_merge_collector = IncrementalCollector::new(merge_collector); for result in leaf_responses { @@ -1145,6 +1149,7 @@ pub async fn multi_leaf_search( } /// Resolves storage and calls leaf_search +#[allow(clippy::too_many_arguments)] async fn resolve_storage_and_leaf_search( searcher_context: Arc, search_request: Arc, diff --git a/quickwit/quickwit-search/src/retry/search.rs b/quickwit/quickwit-search/src/retry/search.rs index f6932d92d3b..d8578dd0e7f 100644 --- a/quickwit/quickwit-search/src/retry/search.rs +++ b/quickwit/quickwit-search/src/retry/search.rs @@ -58,6 +58,7 @@ impl RetryPolicy for LeafSea .retain(|request| !request.split_offsets.is_empty()); Some(request) } + Err(SearchError::Timeout(_)) => None, // Don't retry on timeout Err(_) => Some(request), } } diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index ab244c9297a..6a7a252a0cd 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -74,7 +74,7 @@ use quickwit_common::runtimes::RuntimesConfig; use quickwit_common::tower::{ BalanceChannel, BoxFutureInfaillible, BufferLayer, Change, CircuitBreakerEvaluator, ConstantRate, EstimateRateLayer, EventListenerLayer, GrpcMetricsLayer, LoadShedLayer, - RateLimitLayer, RetryLayer, RetryPolicy, SmaRateEstimator, + RateLimitLayer, RetryLayer, RetryPolicy, SmaRateEstimator, TimeoutLayer, }; use quickwit_common::uri::Uri; use quickwit_common::{get_bool_from_env, spawn_named_task}; @@ -174,6 +174,9 @@ static METASTORE_GRPC_CLIENT_METRICS_LAYER: Lazy = static METASTORE_GRPC_SERVER_METRICS_LAYER: Lazy = Lazy::new(|| GrpcMetricsLayer::new("metastore", "server")); +static GRPC_TIMEOUT_LAYER: Lazy = + Lazy::new(|| TimeoutLayer::new(Duration::from_secs(30))); + struct QuickwitServices { pub node_config: Arc, pub cluster: Cluster, @@ -946,6 +949,7 @@ async fn setup_ingest_v2( } else { let ingester_service = IngesterServiceClient::tower() .stack_layer(INGEST_GRPC_CLIENT_METRICS_LAYER.clone()) + .stack_layer(GRPC_TIMEOUT_LAYER.clone()) .build_from_channel( node.grpc_advertise_addr(), node.channel(), @@ -990,6 +994,7 @@ async fn setup_searcher( .await?; let search_service_clone = search_service.clone(); let max_message_size = node_config.grpc_config.max_message_size; + let request_timeout = node_config.searcher_config.request_timeout(); let searcher_change_stream = cluster_change_stream.filter_map(move |cluster_change| { let search_service_clone = search_service_clone.clone(); Box::pin(async move { @@ -1009,7 +1014,7 @@ async fn setup_searcher( SearchServiceClient::from_service(search_service_clone, grpc_addr); Some(Change::Insert(grpc_addr, search_client)) } else { - let timeout_channel = Timeout::new(node.channel(), Duration::from_secs(30)); + let timeout_channel = Timeout::new(node.channel(), request_timeout); let search_client = create_search_client_from_channel( grpc_addr, timeout_channel, @@ -1145,6 +1150,7 @@ fn setup_indexer_pool( } else { let client = IndexingServiceClient::tower() .stack_layer(INDEXING_GRPC_CLIENT_METRICS_LAYER.clone()) + .stack_layer(GRPC_TIMEOUT_LAYER.clone()) .build_from_channel( node.grpc_advertise_addr(), node.channel(),