Skip to content

Commit

Permalink
Introduce a searcher config option to timeout get requests.
Browse files Browse the repository at this point in the history
This is done at a storage instead of the s3 client as the s3 client
only had a global operation timeout parameter.

Here the timeout is proportional to the length (in byte) of the
slice being downloaded.

By default, if the searcher config does not define a timeout
explicitly, this PR is no-op.

Closes #5466
  • Loading branch information
fulmicoton committed Oct 1, 2024
1 parent 7b22075 commit 0b02059
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 7 deletions.
2 changes: 1 addition & 1 deletion quickwit/quickwit-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub use crate::metastore_config::{
};
pub use crate::node_config::{
IndexerConfig, IngestApiConfig, JaegerConfig, NodeConfig, SearcherConfig, SplitCacheLimits,
DEFAULT_QW_CONFIG_PATH,
StorageTimeoutPolicy, DEFAULT_QW_CONFIG_PATH,
};
use crate::source_config::serialize::{SourceConfigV0_7, SourceConfigV0_8, VersionedSourceConfig};
pub use crate::storage_config::{
Expand Down
35 changes: 35 additions & 0 deletions quickwit/quickwit-config/src/node_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,40 @@ pub struct SearcherConfig {
pub split_cache: Option<SplitCacheLimits>,
#[serde(default = "SearcherConfig::default_request_timeout_secs")]
request_timeout_secs: NonZeroU64,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub storage_timeout_policy: Option<StorageTimeoutPolicy>,
}

/// Configuration controlling how fast a searcher should timeout a `get_slice`
/// request to retry it.
///
/// [Amazon's best practise](https://docs.aws.amazon.com/whitepapers/latest/s3-optimizing-performance-best-practices/timeouts-and-retries-for-latency-sensitive-applications.html)
/// suggests that to ensure low latency, it is best to:
/// - retry small GET request after 2s
/// - retry large GET request when the throughput is below some percentile.
///
/// This policy is inspired by this guidance. It does not track instanteneous throughput, but
/// computes an overall timeout using the following formula:
/// `timeout_offset + num_bytes_get_request / min_throughtput`
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct StorageTimeoutPolicy {
min_throughtput_bytes_per_secs: u64,
timeout_millis: u64,
repeat: usize,
}

impl StorageTimeoutPolicy {
pub fn compute_timeout(&self, num_bytes: usize) -> impl Iterator<Item = Duration> {
let min_download_time_secs: f64 = if self.min_throughtput_bytes_per_secs == 0 {
0.0f64
} else {
num_bytes as f64 / self.min_throughtput_bytes_per_secs as f64
};
let timeout = Duration::from_millis(self.timeout_millis)
+ Duration::from_secs_f64(min_download_time_secs);
std::iter::repeat(timeout).take(self.repeat)
}
}

impl Default for SearcherConfig {
Expand All @@ -237,6 +271,7 @@ impl Default for SearcherConfig {
aggregation_bucket_limit: 65000,
split_cache: None,
request_timeout_secs: Self::default_request_timeout_secs(),
storage_timeout_policy: None,
}
}
}
Expand Down
29 changes: 25 additions & 4 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ use quickwit_proto::search::{
use quickwit_query::query_ast::{BoolQuery, QueryAst, QueryAstTransformer, RangeQuery, TermQuery};
use quickwit_query::tokenizers::TokenizerManager;
use quickwit_storage::{
wrap_storage_with_cache, BundleStorage, MemorySizedCache, OwnedBytes, SplitCache, Storage,
StorageResolver,
wrap_storage_with_cache, AggressiveRetryStorage, BundleStorage, MemorySizedCache, OwnedBytes,
SplitCache, Storage, StorageResolver,
};
use tantivy::aggregation::agg_req::{AggregationVariants, Aggregations};
use tantivy::aggregation::AggregationLimitsGuard;
Expand Down Expand Up @@ -134,13 +134,34 @@ pub(crate) async fn open_index_with_caches(
tokenizer_manager: Option<&TokenizerManager>,
ephemeral_unbounded_cache: bool,
) -> anyhow::Result<Index> {
let (hotcache_bytes, bundle_storage) =
open_split_bundle(searcher_context, index_storage, split_and_footer_offsets).await?;
// Let's a layer to retry `get_slice` requests if they are taking too long,
// if configured in the searcher config.
//
// The goal here is too ensure a low latency.

let index_storage_with_aggressive_timeout_retry = if let Some(storage_timeout_policy) =
&searcher_context.searcher_config.storage_timeout_policy
{
Arc::new(AggressiveRetryStorage::new(
index_storage,
storage_timeout_policy.clone(),
))
} else {
index_storage
};

let (hotcache_bytes, bundle_storage) = open_split_bundle(
searcher_context,
index_storage_with_aggressive_timeout_retry,
split_and_footer_offsets,
)
.await?;

let bundle_storage_with_cache = wrap_storage_with_cache(
searcher_context.fast_fields_cache.clone(),
Arc::new(bundle_storage),
);

let directory = StorageDirectory::new(bundle_storage_with_cache);

let hot_directory = if ephemeral_unbounded_cache {
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
//! - etc.
//!
//! The `BundleStorage` bundles together multiple files into a single file.
mod aggressive_retry_storage;
mod cache;
mod debouncer;
mod file_descriptor_cache;
Expand Down Expand Up @@ -63,6 +64,7 @@ pub use split_cache::SplitCache;
pub use tantivy::directory::OwnedBytes;
pub use versioned_component::VersionedComponent;

pub use self::aggressive_retry_storage::AggressiveRetryStorage;
pub use self::bundle_storage::{BundleStorage, BundleStorageFileOffsets};
#[cfg(any(test, feature = "testsuite"))]
pub use self::cache::MockStorageCache;
Expand Down
19 changes: 17 additions & 2 deletions quickwit/quickwit-storage/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

use once_cell::sync::Lazy;
use quickwit_common::metrics::{
new_counter, new_counter_with_labels, new_gauge, IntCounter, IntGauge,
new_counter, new_counter_vec, new_counter_with_labels, new_gauge, IntCounter, IntGauge,
};

/// Counters associated to storage operations.
Expand All @@ -32,6 +32,7 @@ pub struct StorageMetrics {
pub fast_field_cache: CacheMetrics,
pub split_footer_cache: CacheMetrics,
pub searcher_split_cache: CacheMetrics,
pub get_slice_timeout_total_by_attempts: [IntCounter; 5],
pub object_storage_get_total: IntCounter,
pub object_storage_put_total: IntCounter,
pub object_storage_put_parts: IntCounter,
Expand All @@ -41,14 +42,28 @@ pub struct StorageMetrics {

impl Default for StorageMetrics {
fn default() -> Self {
let get_slice_timeout_total = new_counter_vec(
"get_slice_timeout_total",
"Number of `get_slice` operations that timed out",
"storage",
&[],
["attempt"],
);
let get_slice_timeout_total_by_attempts = [
get_slice_timeout_total.with_label_values(["0"]),
get_slice_timeout_total.with_label_values(["1"]),
get_slice_timeout_total.with_label_values(["2"]),
get_slice_timeout_total.with_label_values(["3"]),
get_slice_timeout_total.with_label_values(["4"]),
];
StorageMetrics {
fast_field_cache: CacheMetrics::for_component("fastfields"),
fd_cache_metrics: CacheMetrics::for_component("fd"),
partial_request_cache: CacheMetrics::for_component("partial_request"),
searcher_split_cache: CacheMetrics::for_component("searcher_split"),
shortlived_cache: CacheMetrics::for_component("shortlived"),
split_footer_cache: CacheMetrics::for_component("splitfooter"),

get_slice_timeout_total_by_attempts,
object_storage_get_total: new_counter(
"object_storage_gets_total",
"Number of objects fetched.",
Expand Down

0 comments on commit 0b02059

Please sign in to comment.