diff --git a/quickwit/quickwit-search/src/error.rs b/quickwit/quickwit-search/src/error.rs index 0dbdff10e41..17f44f49e1e 100644 --- a/quickwit/quickwit-search/src/error.rs +++ b/quickwit/quickwit-search/src/error.rs @@ -115,6 +115,12 @@ impl From for SearchError { } } +impl From for SearchError { + fn from(_elapsed: tokio::time::error::Elapsed) -> Self { + SearchError::Timeout("timeout exceeded".to_string()) + } +} + impl From for SearchError { fn from(error: postcard::Error) -> Self { SearchError::Internal(format!("Postcard error: {error}")) diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index 0b6040d542c..cbad5d87728 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -22,7 +22,6 @@ use std::ops::Bound; use std::path::PathBuf; use std::str::FromStr; use std::sync::{Arc, Mutex, RwLock}; -use std::time::{Duration, Instant}; use anyhow::Context; use futures::future::try_join_all; @@ -1055,35 +1054,6 @@ impl CanSplitDoBetter { } } -#[derive(Debug, Clone)] -pub struct TimeoutHandle { - start_time: Instant, - timeout_after: Duration, -} -impl TimeoutHandle { - /// For now we just take the defined timeout duration from the config. - /// Ideally we would observe the connection and abort if the connection is closed. - pub fn new(max_duration: Duration) -> Self { - Self { - start_time: Instant::now(), - timeout_after: max_duration, - } - } - pub fn err_on_timeout(&self) -> Result<(), SearchError> { - if self.is_timedout() { - warn!("timeout reached, cancel search"); - return Err(SearchError::Timeout(format!( - "exceeded {:?}", - self.timeout_after - ))); - } - Ok(()) - } - fn is_timedout(&self) -> bool { - self.start_time.elapsed() > self.timeout_after - } -} - /// `multi_leaf_search` searches multiple indices and multiple splits. #[instrument(skip_all, fields(index = ?leaf_search_request.search_request.as_ref().unwrap().index_id_patterns))] pub async fn multi_leaf_search( @@ -1091,7 +1061,6 @@ pub async fn multi_leaf_search( leaf_search_request: LeafSearchRequest, storage_resolver: &StorageResolver, ) -> Result { - let timeout = TimeoutHandle::new(searcher_context.searcher_config.request_timeout()); let search_request: Arc = leaf_search_request .search_request .ok_or_else(|| SearchError::Internal("no search request".to_string()))? @@ -1144,15 +1113,17 @@ pub async fn multi_leaf_search( leaf_search_request_ref.split_offsets, doc_mapper, aggregation_limits.clone(), - timeout.clone(), ) .in_current_span(), ); leaf_request_tasks.push(leaf_request_future); } - let leaf_responses = try_join_all(leaf_request_tasks).await?; - timeout.err_on_timeout()?; + let leaf_responses: Vec> = tokio::time::timeout( + searcher_context.searcher_config.request_timeout(), + try_join_all(leaf_request_tasks), + ) + .await??; let merge_collector = make_merge_collector(&search_request, &aggregation_limits)?; let mut incremental_merge_collector = IncrementalCollector::new(merge_collector); for result in leaf_responses { @@ -1187,7 +1158,6 @@ async fn resolve_storage_and_leaf_search( splits: Vec, doc_mapper: Arc, aggregations_limits: AggregationLimits, - timeout: TimeoutHandle, ) -> crate::Result { let storage = storage_resolver.resolve(&index_uri).await?; @@ -1198,7 +1168,6 @@ async fn resolve_storage_and_leaf_search( splits, doc_mapper, aggregations_limits, - timeout, ) .await } @@ -1240,7 +1209,6 @@ pub async fn leaf_search( splits: Vec, doc_mapper: Arc, aggregations_limits: AggregationLimits, - timeout: TimeoutHandle, ) -> Result { info!(splits_num = splits.len(), split_offsets = ?PrettySample::new(&splits, 5)); @@ -1263,7 +1231,6 @@ pub async fn leaf_search( let incremental_merge_collector = Arc::new(Mutex::new(incremental_merge_collector)); for (split, mut request) in split_with_req { - timeout.err_on_timeout()?; let leaf_split_search_permit = searcher_context.leaf_search_split_semaphore .clone() .acquire_owned() diff --git a/quickwit/quickwit-search/src/tests.rs b/quickwit/quickwit-search/src/tests.rs index 4f7d02813c4..a13286b948d 100644 --- a/quickwit/quickwit-search/src/tests.rs +++ b/quickwit/quickwit-search/src/tests.rs @@ -18,10 +18,8 @@ // along with this program. If not, see . use std::collections::{BTreeMap, BTreeSet}; -use std::time::Duration; use assert_json_diff::{assert_json_eq, assert_json_include}; -use leaf::TimeoutHandle; use quickwit_config::SearcherConfig; use quickwit_doc_mapper::tag_pruning::extract_tags_from_query; use quickwit_doc_mapper::DefaultDocMapper; @@ -1067,7 +1065,6 @@ async fn test_search_util(test_sandbox: &TestSandbox, query: &str) -> Vec { splits_offsets, test_sandbox.doc_mapper(), agg_limits, - TimeoutHandle::new(Duration::from_secs(30)), ) .await .unwrap();