Skip to content

Commit

Permalink
add request_timeout_secs config to searcher config (#5402)
Browse files Browse the repository at this point in the history
* add request_timeout config

On very large datasets the fixed timeouts are too low for some queries.
This PR adds a setting to configure the timeout.

Two settings are introduced:
- `request_timeout` on the node config
- `QW_REQUEST_TIMEOUT` env parameter

Currently there are two timeouts when doing a distributed search request, one from quickwit cluster when opening a
channel and one from the search client.
The timeout is applied to both (That means all cluster connections have
the same request_timeout applied, not only search nodes)

Related: #5241

* move timeout to search config, add timeout tower layer

* cancel search after timeout

* use tokio::timeout

* use global timeoutlayer
  • Loading branch information
PSeitz authored Sep 17, 2024
1 parent 740b2ba commit 7d357fa
Show file tree
Hide file tree
Showing 12 changed files with 197 additions and 6 deletions.
2 changes: 2 additions & 0 deletions quickwit/quickwit-common/src/tower/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ mod rate;
mod rate_estimator;
mod rate_limit;
mod retry;
mod timeout;
mod transport;

use std::error;
Expand All @@ -55,6 +56,7 @@ pub use rate::{ConstantRate, Rate};
pub use rate_estimator::{RateEstimator, SmaRateEstimator};
pub use rate_limit::{RateLimit, RateLimitLayer};
pub use retry::{RetryLayer, RetryPolicy};
pub use timeout::{Timeout, TimeoutExceeded, TimeoutLayer};
pub use transport::{make_channel, warmup_channel, BalanceChannel};

pub type BoxError = Box<dyn error::Error + Send + Sync + 'static>;
Expand Down
144 changes: 144 additions & 0 deletions quickwit/quickwit-common/src/tower/timeout.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Copyright (C) 2024 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

use pin_project::pin_project;
use tokio::time::Sleep;
use tower::{Layer, Service};

#[derive(Debug, Clone)]
pub struct Timeout<S> {
service: S,
timeout: Duration,
}
impl<S> Timeout<S> {
/// Creates a new [`Timeout`]
pub fn new(service: S, timeout: Duration) -> Self {
Timeout { service, timeout }
}
}

impl<S, R> Service<R> for Timeout<S>
where
S: Service<R>,
S::Error: From<TimeoutExceeded>,
{
type Response = S::Response;
type Error = S::Error;
type Future = TimeoutFuture<S::Future>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}

fn call(&mut self, request: R) -> Self::Future {
TimeoutFuture {
inner: self.service.call(request),
sleep: tokio::time::sleep(self.timeout),
}
}
}

/// The error type for the `Timeout` service.
#[derive(Debug, PartialEq, Eq)]
pub struct TimeoutExceeded;

#[pin_project]
#[derive(Debug)]
pub struct TimeoutFuture<F> {
#[pin]
inner: F,
#[pin]
sleep: Sleep,
}

impl<F, T, E> Future for TimeoutFuture<F>
where
F: Future<Output = Result<T, E>>,
E: From<TimeoutExceeded>,
{
type Output = Result<T, E>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();

match this.inner.poll(cx) {
Poll::Ready(v) => return Poll::Ready(v),
Poll::Pending => {}
}

// Now check the timeout
match this.sleep.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(_) => Poll::Ready(Err(TimeoutExceeded.into())),
}
}
}

/// This is similar to tower's Timeout Layer except it requires
/// the error of the service to implement `From<TimeoutExceeded>`.
///
/// If the inner service does not complete within the specified duration,
/// the response will be aborted with the error `TimeoutExceeded`.
#[derive(Debug, Clone)]
pub struct TimeoutLayer {
timeout: Duration,
}

impl TimeoutLayer {
/// Creates a new `TimeoutLayer` with the specified delay.
pub fn new(timeout: Duration) -> Self {
Self { timeout }
}
}

impl<S> Layer<S> for TimeoutLayer {
type Service = Timeout<S>;

fn layer(&self, service: S) -> Self::Service {
Timeout::new(service, self.timeout)
}
}

#[cfg(test)]
mod tests {
use tokio::time::Duration;
use tower::{ServiceBuilder, ServiceExt};

use super::*;

#[tokio::test]
async fn test_timeout() {
let delay = Duration::from_millis(100);
let mut service = ServiceBuilder::new()
.layer(TimeoutLayer::new(delay))
.service_fn(|_| async {
// sleep for 1 sec
tokio::time::sleep(Duration::from_secs(1)).await;
Ok::<_, TimeoutExceeded>(())
});

let res = service.ready().await.unwrap().call(()).await;
assert_eq!(res, Err(TimeoutExceeded));
}
}
1 change: 0 additions & 1 deletion quickwit/quickwit-common/src/tower/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ pub async fn make_channel(socket_addr: SocketAddr) -> Channel {
.expect("provided arguments should be valid");
Endpoint::from(uri)
.connect_timeout(Duration::from_secs(5))
.timeout(Duration::from_secs(30))
.connect_lazy()
}

Expand Down
10 changes: 10 additions & 0 deletions quickwit/quickwit-config/src/node_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ pub struct SearcherConfig {
// TODO document and fix if necessary.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub split_cache: Option<SplitCacheLimits>,
#[serde(default = "SearcherConfig::default_request_timeout_secs")]
request_timeout_secs: NonZeroU64,
}

impl Default for SearcherConfig {
Expand All @@ -234,11 +236,19 @@ impl Default for SearcherConfig {
aggregation_memory_limit: ByteSize::mb(500),
aggregation_bucket_limit: 65000,
split_cache: None,
request_timeout_secs: Self::default_request_timeout_secs(),
}
}
}

impl SearcherConfig {
/// The timeout after which a search should be cancelled
pub fn request_timeout(&self) -> Duration {
Duration::from_secs(self.request_timeout_secs.get())
}
fn default_request_timeout_secs() -> NonZeroU64 {
NonZeroU64::new(30).unwrap()
}
fn validate(&self) -> anyhow::Result<()> {
if let Some(split_cache_limits) = self.split_cache {
if self.max_num_concurrent_split_searches
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-config/src/node_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,7 @@ mod tests {
max_num_concurrent_split_searches: 150,
max_num_concurrent_split_streams: 120,
split_cache: None,
request_timeout_secs: NonZeroU64::new(30).unwrap(),
}
);
assert_eq!(
Expand Down
8 changes: 7 additions & 1 deletion quickwit/quickwit-proto/src/control_plane/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

use quickwit_actors::AskError;
use quickwit_common::rate_limited_error;
use quickwit_common::tower::{MakeLoadShedError, RpcName};
use quickwit_common::tower::{MakeLoadShedError, RpcName, TimeoutExceeded};
use thiserror;

use crate::metastore::{MetastoreError, OpenShardSubrequest};
Expand All @@ -44,6 +44,12 @@ pub enum ControlPlaneError {
Unavailable(String),
}

impl From<TimeoutExceeded> for ControlPlaneError {
fn from(_timeout_exceeded: TimeoutExceeded) -> Self {
Self::Timeout("tower layer timeout".to_string())
}
}

impl From<quickwit_common::tower::TaskCancelled> for ControlPlaneError {
fn from(task_cancelled: quickwit_common::tower::TaskCancelled) -> Self {
ControlPlaneError::Internal(task_cancelled.to_string())
Expand Down
7 changes: 6 additions & 1 deletion quickwit/quickwit-proto/src/indexing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use bytesize::ByteSize;
use quickwit_actors::AskError;
use quickwit_common::pubsub::Event;
use quickwit_common::rate_limited_error;
use quickwit_common::tower::{MakeLoadShedError, RpcName};
use quickwit_common::tower::{MakeLoadShedError, RpcName, TimeoutExceeded};
use serde::{Deserialize, Serialize};
use thiserror;

Expand All @@ -52,6 +52,11 @@ pub enum IndexingError {
#[error("service unavailable: {0}")]
Unavailable(String),
}
impl From<TimeoutExceeded> for IndexingError {
fn from(_timeout_exceeded: TimeoutExceeded) -> Self {
Self::Timeout("tower layer timeout".to_string())
}
}

impl ServiceError for IndexingError {
fn error_code(&self) -> ServiceErrorCode {
Expand Down
6 changes: 6 additions & 0 deletions quickwit/quickwit-proto/src/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ pub enum IngestV2Error {
Unavailable(String),
}

impl From<quickwit_common::tower::TimeoutExceeded> for IngestV2Error {
fn from(_: quickwit_common::tower::TimeoutExceeded) -> IngestV2Error {
IngestV2Error::Timeout("tower layer timeout".to_string())
}
}

impl From<quickwit_common::tower::TaskCancelled> for IngestV2Error {
fn from(task_cancelled: quickwit_common::tower::TaskCancelled) -> IngestV2Error {
IngestV2Error::Internal(task_cancelled.to_string())
Expand Down
6 changes: 6 additions & 0 deletions quickwit/quickwit-search/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ impl From<TantivyError> for SearchError {
}
}

impl From<tokio::time::error::Elapsed> for SearchError {
fn from(_elapsed: tokio::time::error::Elapsed) -> Self {
SearchError::Timeout("timeout exceeded".to_string())
}
}

impl From<postcard::Error> for SearchError {
fn from(error: postcard::Error) -> Self {
SearchError::Internal(format!("Postcard error: {error}"))
Expand Down
7 changes: 6 additions & 1 deletion quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1119,7 +1119,11 @@ pub async fn multi_leaf_search(
leaf_request_tasks.push(leaf_request_future);
}

let leaf_responses = try_join_all(leaf_request_tasks).await?;
let leaf_responses: Vec<crate::Result<LeafSearchResponse>> = tokio::time::timeout(
searcher_context.searcher_config.request_timeout(),
try_join_all(leaf_request_tasks),
)
.await??;
let merge_collector = make_merge_collector(&search_request, &aggregation_limits)?;
let mut incremental_merge_collector = IncrementalCollector::new(merge_collector);
for result in leaf_responses {
Expand All @@ -1145,6 +1149,7 @@ pub async fn multi_leaf_search(
}

/// Resolves storage and calls leaf_search
#[allow(clippy::too_many_arguments)]
async fn resolve_storage_and_leaf_search(
searcher_context: Arc<SearcherContext>,
search_request: Arc<SearchRequest>,
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-search/src/retry/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ impl RetryPolicy<LeafSearchRequest, LeafSearchResponse, SearchError> for LeafSea
.retain(|request| !request.split_offsets.is_empty());
Some(request)
}
Err(SearchError::Timeout(_)) => None, // Don't retry on timeout
Err(_) => Some(request),
}
}
Expand Down
10 changes: 8 additions & 2 deletions quickwit/quickwit-serve/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ use quickwit_common::runtimes::RuntimesConfig;
use quickwit_common::tower::{
BalanceChannel, BoxFutureInfaillible, BufferLayer, Change, CircuitBreakerEvaluator,
ConstantRate, EstimateRateLayer, EventListenerLayer, GrpcMetricsLayer, LoadShedLayer,
RateLimitLayer, RetryLayer, RetryPolicy, SmaRateEstimator,
RateLimitLayer, RetryLayer, RetryPolicy, SmaRateEstimator, TimeoutLayer,
};
use quickwit_common::uri::Uri;
use quickwit_common::{get_bool_from_env, spawn_named_task};
Expand Down Expand Up @@ -174,6 +174,9 @@ static METASTORE_GRPC_CLIENT_METRICS_LAYER: Lazy<GrpcMetricsLayer> =
static METASTORE_GRPC_SERVER_METRICS_LAYER: Lazy<GrpcMetricsLayer> =
Lazy::new(|| GrpcMetricsLayer::new("metastore", "server"));

static GRPC_TIMEOUT_LAYER: Lazy<TimeoutLayer> =
Lazy::new(|| TimeoutLayer::new(Duration::from_secs(30)));

struct QuickwitServices {
pub node_config: Arc<NodeConfig>,
pub cluster: Cluster,
Expand Down Expand Up @@ -946,6 +949,7 @@ async fn setup_ingest_v2(
} else {
let ingester_service = IngesterServiceClient::tower()
.stack_layer(INGEST_GRPC_CLIENT_METRICS_LAYER.clone())
.stack_layer(GRPC_TIMEOUT_LAYER.clone())
.build_from_channel(
node.grpc_advertise_addr(),
node.channel(),
Expand Down Expand Up @@ -990,6 +994,7 @@ async fn setup_searcher(
.await?;
let search_service_clone = search_service.clone();
let max_message_size = node_config.grpc_config.max_message_size;
let request_timeout = node_config.searcher_config.request_timeout();
let searcher_change_stream = cluster_change_stream.filter_map(move |cluster_change| {
let search_service_clone = search_service_clone.clone();
Box::pin(async move {
Expand All @@ -1009,7 +1014,7 @@ async fn setup_searcher(
SearchServiceClient::from_service(search_service_clone, grpc_addr);
Some(Change::Insert(grpc_addr, search_client))
} else {
let timeout_channel = Timeout::new(node.channel(), Duration::from_secs(30));
let timeout_channel = Timeout::new(node.channel(), request_timeout);
let search_client = create_search_client_from_channel(
grpc_addr,
timeout_channel,
Expand Down Expand Up @@ -1145,6 +1150,7 @@ fn setup_indexer_pool(
} else {
let client = IndexingServiceClient::tower()
.stack_layer(INDEXING_GRPC_CLIENT_METRICS_LAYER.clone())
.stack_layer(GRPC_TIMEOUT_LAYER.clone())
.build_from_channel(
node.grpc_advertise_addr(),
node.channel(),
Expand Down

0 comments on commit 7d357fa

Please sign in to comment.