Skip to content

Commit

Permalink
use tokio::timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
PSeitz committed Sep 16, 2024
1 parent c8a5035 commit a1410cd
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 41 deletions.
6 changes: 6 additions & 0 deletions quickwit/quickwit-search/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ impl From<TantivyError> for SearchError {
}
}

impl From<tokio::time::error::Elapsed> for SearchError {
fn from(_elapsed: tokio::time::error::Elapsed) -> Self {
SearchError::Timeout("timeout exceeded".to_string())
}
}

impl From<postcard::Error> for SearchError {
fn from(error: postcard::Error) -> Self {
SearchError::Internal(format!("Postcard error: {error}"))
Expand Down
43 changes: 5 additions & 38 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use std::ops::Bound;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant};

use anyhow::Context;
use futures::future::try_join_all;
Expand Down Expand Up @@ -1055,43 +1054,13 @@ impl CanSplitDoBetter {
}
}

#[derive(Debug, Clone)]
pub struct TimeoutHandle {
start_time: Instant,
timeout_after: Duration,
}
impl TimeoutHandle {
/// For now we just take the defined timeout duration from the config.
/// Ideally we would observe the connection and abort if the connection is closed.
pub fn new(max_duration: Duration) -> Self {
Self {
start_time: Instant::now(),
timeout_after: max_duration,
}
}
pub fn err_on_timeout(&self) -> Result<(), SearchError> {
if self.is_timedout() {
warn!("timeout reached, cancel search");
return Err(SearchError::Timeout(format!(
"exceeded {:?}",
self.timeout_after
)));
}
Ok(())
}
fn is_timedout(&self) -> bool {
self.start_time.elapsed() > self.timeout_after
}
}

/// `multi_leaf_search` searches multiple indices and multiple splits.
#[instrument(skip_all, fields(index = ?leaf_search_request.search_request.as_ref().unwrap().index_id_patterns))]
pub async fn multi_leaf_search(
searcher_context: Arc<SearcherContext>,
leaf_search_request: LeafSearchRequest,
storage_resolver: &StorageResolver,
) -> Result<LeafSearchResponse, SearchError> {
let timeout = TimeoutHandle::new(searcher_context.searcher_config.request_timeout());
let search_request: Arc<SearchRequest> = leaf_search_request
.search_request
.ok_or_else(|| SearchError::Internal("no search request".to_string()))?
Expand Down Expand Up @@ -1144,15 +1113,17 @@ pub async fn multi_leaf_search(
leaf_search_request_ref.split_offsets,
doc_mapper,
aggregation_limits.clone(),
timeout.clone(),
)
.in_current_span(),
);
leaf_request_tasks.push(leaf_request_future);
}

let leaf_responses = try_join_all(leaf_request_tasks).await?;
timeout.err_on_timeout()?;
let leaf_responses: Vec<crate::Result<LeafSearchResponse>> = tokio::time::timeout(
searcher_context.searcher_config.request_timeout(),
try_join_all(leaf_request_tasks),
)
.await??;
let merge_collector = make_merge_collector(&search_request, &aggregation_limits)?;
let mut incremental_merge_collector = IncrementalCollector::new(merge_collector);
for result in leaf_responses {
Expand Down Expand Up @@ -1187,7 +1158,6 @@ async fn resolve_storage_and_leaf_search(
splits: Vec<SplitIdAndFooterOffsets>,
doc_mapper: Arc<dyn DocMapper>,
aggregations_limits: AggregationLimits,
timeout: TimeoutHandle,
) -> crate::Result<LeafSearchResponse> {
let storage = storage_resolver.resolve(&index_uri).await?;

Expand All @@ -1198,7 +1168,6 @@ async fn resolve_storage_and_leaf_search(
splits,
doc_mapper,
aggregations_limits,
timeout,
)
.await
}
Expand Down Expand Up @@ -1240,7 +1209,6 @@ pub async fn leaf_search(
splits: Vec<SplitIdAndFooterOffsets>,
doc_mapper: Arc<dyn DocMapper>,
aggregations_limits: AggregationLimits,
timeout: TimeoutHandle,
) -> Result<LeafSearchResponse, SearchError> {
info!(splits_num = splits.len(), split_offsets = ?PrettySample::new(&splits, 5));

Expand All @@ -1263,7 +1231,6 @@ pub async fn leaf_search(
let incremental_merge_collector = Arc::new(Mutex::new(incremental_merge_collector));

for (split, mut request) in split_with_req {
timeout.err_on_timeout()?;
let leaf_split_search_permit = searcher_context.leaf_search_split_semaphore
.clone()
.acquire_owned()
Expand Down
3 changes: 0 additions & 3 deletions quickwit/quickwit-search/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::{BTreeMap, BTreeSet};
use std::time::Duration;

use assert_json_diff::{assert_json_eq, assert_json_include};
use leaf::TimeoutHandle;
use quickwit_config::SearcherConfig;
use quickwit_doc_mapper::tag_pruning::extract_tags_from_query;
use quickwit_doc_mapper::DefaultDocMapper;
Expand Down Expand Up @@ -1067,7 +1065,6 @@ async fn test_search_util(test_sandbox: &TestSandbox, query: &str) -> Vec<u32> {
splits_offsets,
test_sandbox.doc_mapper(),
agg_limits,
TimeoutHandle::new(Duration::from_secs(30)),
)
.await
.unwrap();
Expand Down

0 comments on commit a1410cd

Please sign in to comment.