diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index 59fc86119ae..6ad33f62c5c 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -19,7 +19,7 @@ use std::collections::{HashMap, HashSet}; use std::fmt; -use std::sync::{Arc, Weak}; +use std::sync::{Arc, OnceLock, Weak}; use std::time::Duration; use async_trait::async_trait; @@ -53,12 +53,20 @@ use super::IngesterPool; use crate::{get_ingest_router_buffer_size, LeaderId}; /// Duration after which ingest requests time out with [`IngestV2Error::Timeout`]. -pub(super) const INGEST_REQUEST_TIMEOUT: Duration = if cfg!(any(test, feature = "testsuite")) { +const DEFAULT_INGEST_REQUEST_TIMEOUT: Duration = if cfg!(any(test, feature = "testsuite")) { Duration::from_millis(10) } else { - Duration::from_secs(35) + Duration::from_millis(500) }; +fn ingest_request_timeout() -> Duration { + static TIMEOUT: OnceLock = OnceLock::new(); + *TIMEOUT.get_or_init(|| { + let duration_ms = quickwit_common::get_from_env("QW_INGEST_REQUEST_TIMEOUT_MS", DEFAULT_INGEST_REQUEST_TIMEOUT.as_millis() as u64); + Duration::from_millis(duration_ms) + }) +} + const MAX_PERSIST_ATTEMPTS: usize = 5; type PersistResult = (PersistRequestSummary, IngestV2Result); @@ -436,7 +444,7 @@ impl IngestRouter { .map_err(|_| { let message = format!( "ingest request timed out after {} seconds", - INGEST_REQUEST_TIMEOUT.as_secs() + timeout_duration.as_secs() ); IngestV2Error::Timeout(message) })? @@ -460,7 +468,7 @@ impl IngestRouterService for IngestRouter { .try_acquire_many_owned(request_size_bytes as u32) .map_err(|_| IngestV2Error::TooManyRequests)?; - self.ingest_timeout(ingest_request, INGEST_REQUEST_TIMEOUT) + self.ingest_timeout(ingest_request, ingest_request_timeout()) .await } }