From 6340156ab2eac9990a1f13ae6f8f3c2599df5fd2 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Fri, 10 May 2024 19:16:40 +0900 Subject: [PATCH] Configurable router timeout. --- .../quickwit-ingest/src/ingest_v2/router.rs | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index 59fc86119ae..eaffb574a0b 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -19,7 +19,7 @@ use std::collections::{HashMap, HashSet}; use std::fmt; -use std::sync::{Arc, Weak}; +use std::sync::{Arc, OnceLock, Weak}; use std::time::Duration; use async_trait::async_trait; @@ -53,12 +53,23 @@ use super::IngesterPool; use crate::{get_ingest_router_buffer_size, LeaderId}; /// Duration after which ingest requests time out with [`IngestV2Error::Timeout`]. -pub(super) const INGEST_REQUEST_TIMEOUT: Duration = if cfg!(any(test, feature = "testsuite")) { +const DEFAULT_INGEST_REQUEST_TIMEOUT: Duration = if cfg!(any(test, feature = "testsuite")) { Duration::from_millis(10) } else { Duration::from_secs(35) }; +fn ingest_request_timeout() -> Duration { + static TIMEOUT: OnceLock = OnceLock::new(); + *TIMEOUT.get_or_init(|| { + let duration_ms = quickwit_common::get_from_env( + "QW_INGEST_REQUEST_TIMEOUT_MS", + DEFAULT_INGEST_REQUEST_TIMEOUT.as_millis() as u64, + ); + Duration::from_millis(duration_ms) + }) +} + const MAX_PERSIST_ATTEMPTS: usize = 5; type PersistResult = (PersistRequestSummary, IngestV2Result); @@ -436,7 +447,7 @@ impl IngestRouter { .map_err(|_| { let message = format!( "ingest request timed out after {} seconds", - INGEST_REQUEST_TIMEOUT.as_secs() + timeout_duration.as_secs() ); IngestV2Error::Timeout(message) })? @@ -460,7 +471,7 @@ impl IngestRouterService for IngestRouter { .try_acquire_many_owned(request_size_bytes as u32) .map_err(|_| IngestV2Error::TooManyRequests)?; - self.ingest_timeout(ingest_request, INGEST_REQUEST_TIMEOUT) + self.ingest_timeout(ingest_request, ingest_request_timeout()) .await } }