Skip to content

Commit

Permalink
Introduce a searcher config option to timeout get requests.
Browse files Browse the repository at this point in the history
This is done at a storage instead of the s3 client as the s3 client
only had a global operation timeout parameter.

Here the timeout is proportional to the length (in byte) of the
slice being downloaded.

By default, if the searcher config does not define a timeout
explicitly, this PR is no-op.

Closes #5466
  • Loading branch information
fulmicoton committed Oct 2, 2024
1 parent 7b22075 commit aa9c516
Show file tree
Hide file tree
Showing 10 changed files with 370 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,12 @@
"fast_field_cache_capacity": "10G",
"split_footer_cache_capacity": "1G",
"max_num_concurrent_split_streams": 120,
"max_num_concurrent_split_searches": 150
"max_num_concurrent_split_searches": 150,
"storage_timeout_policy": {
"min_throughtput_bytes_per_secs": 100000,
"timeout_millis": 2000,
"repeat": 2
}
},
"jaeger": {
"enable_endpoint": true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ split_footer_cache_capacity = "1G"
max_num_concurrent_split_streams = 120
max_num_concurrent_split_searches = 150

[searcher.storage_timeout_policy]
min_throughtput_bytes_per_secs = 100000
timeout_millis = 2000
repeat = 2

[jaeger]
enable_endpoint = true
lookback_period_hours = 24
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ searcher:
split_footer_cache_capacity: 1G
max_num_concurrent_split_streams: 120
max_num_concurrent_split_searches: 150
storage_timeout_policy:
min_throughtput_bytes_per_secs: 100000
timeout_millis: 2000
repeat: 2

jaeger:
enable_endpoint: true
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub use crate::metastore_config::{
};
pub use crate::node_config::{
IndexerConfig, IngestApiConfig, JaegerConfig, NodeConfig, SearcherConfig, SplitCacheLimits,
DEFAULT_QW_CONFIG_PATH,
StorageTimeoutPolicy, DEFAULT_QW_CONFIG_PATH,
};
use crate::source_config::serialize::{SourceConfigV0_7, SourceConfigV0_8, VersionedSourceConfig};
pub use crate::storage_config::{
Expand Down
35 changes: 35 additions & 0 deletions quickwit/quickwit-config/src/node_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,40 @@ pub struct SearcherConfig {
pub split_cache: Option<SplitCacheLimits>,
#[serde(default = "SearcherConfig::default_request_timeout_secs")]
request_timeout_secs: NonZeroU64,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub storage_timeout_policy: Option<StorageTimeoutPolicy>,
}

/// Configuration controlling how fast a searcher should timeout a `get_slice`
/// request to retry it.
///
/// [Amazon's best practise](https://docs.aws.amazon.com/whitepapers/latest/s3-optimizing-performance-best-practices/timeouts-and-retries-for-latency-sensitive-applications.html)
/// suggests that to ensure low latency, it is best to:
/// - retry small GET request after 2s
/// - retry large GET request when the throughput is below some percentile.
///
/// This policy is inspired by this guidance. It does not track instanteneous throughput, but
/// computes an overall timeout using the following formula:
/// `timeout_offset + num_bytes_get_request / min_throughtput`
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct StorageTimeoutPolicy {
pub min_throughtput_bytes_per_secs: u64,
pub timeout_millis: u64,
pub repeat: usize,
}

impl StorageTimeoutPolicy {
pub fn compute_timeout(&self, num_bytes: usize) -> impl Iterator<Item = Duration> {
let min_download_time_secs: f64 = if self.min_throughtput_bytes_per_secs == 0 {
0.0f64
} else {
num_bytes as f64 / self.min_throughtput_bytes_per_secs as f64
};
let timeout = Duration::from_millis(self.timeout_millis)
+ Duration::from_secs_f64(min_download_time_secs);
std::iter::repeat(timeout).take(self.repeat)
}
}

impl Default for SearcherConfig {
Expand All @@ -237,6 +271,7 @@ impl Default for SearcherConfig {
aggregation_bucket_limit: 65000,
split_cache: None,
request_timeout_secs: Self::default_request_timeout_secs(),
storage_timeout_policy: None,
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions quickwit/quickwit-config/src/node_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,11 @@ mod tests {
max_num_concurrent_split_streams: 120,
split_cache: None,
request_timeout_secs: NonZeroU64::new(30).unwrap(),
storage_timeout_policy: Some(crate::StorageTimeoutPolicy {
min_throughtput_bytes_per_secs: 100_000,
timeout_millis: 2_000,
repeat: 2
})
}
);
assert_eq!(
Expand Down
27 changes: 24 additions & 3 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use quickwit_query::query_ast::{BoolQuery, QueryAst, QueryAstTransformer, RangeQ
use quickwit_query::tokenizers::TokenizerManager;
use quickwit_storage::{
wrap_storage_with_cache, BundleStorage, MemorySizedCache, OwnedBytes, SplitCache, Storage,
StorageResolver,
StorageResolver, TimeoutAndRetryStorage,
};
use tantivy::aggregation::agg_req::{AggregationVariants, Aggregations};
use tantivy::aggregation::AggregationLimitsGuard;
Expand Down Expand Up @@ -134,13 +134,34 @@ pub(crate) async fn open_index_with_caches(
tokenizer_manager: Option<&TokenizerManager>,
ephemeral_unbounded_cache: bool,
) -> anyhow::Result<Index> {
let (hotcache_bytes, bundle_storage) =
open_split_bundle(searcher_context, index_storage, split_and_footer_offsets).await?;
// Let's a layer to retry `get_slice` requests if they are taking too long,
// if configured in the searcher config.
//
// The goal here is too ensure a low latency.

let index_storage_with_retry_on_timeout = if let Some(storage_timeout_policy) =
&searcher_context.searcher_config.storage_timeout_policy
{
Arc::new(TimeoutAndRetryStorage::new(
index_storage,
storage_timeout_policy.clone(),
))
} else {
index_storage
};

let (hotcache_bytes, bundle_storage) = open_split_bundle(
searcher_context,
index_storage_with_retry_on_timeout,
split_and_footer_offsets,
)
.await?;

let bundle_storage_with_cache = wrap_storage_with_cache(
searcher_context.fast_fields_cache.clone(),
Arc::new(bundle_storage),
);

let directory = StorageDirectory::new(bundle_storage_with_cache);

let hot_directory = if ephemeral_unbounded_cache {
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ mod debouncer;
mod file_descriptor_cache;
mod metrics;
mod storage;
mod timeout_and_retry_storage;
pub use debouncer::AsyncDebouncer;
pub(crate) use debouncer::DebouncedStorage;

Expand Down Expand Up @@ -92,6 +93,7 @@ pub use self::test_suite::{
storage_test_multi_part_upload, storage_test_single_part_upload, storage_test_suite,
test_write_and_bulk_delete,
};
pub use self::timeout_and_retry_storage::TimeoutAndRetryStorage;
pub use crate::error::{
BulkDeleteError, DeleteFailure, StorageError, StorageErrorKind, StorageResolverError,
StorageResult,
Expand Down
19 changes: 17 additions & 2 deletions quickwit/quickwit-storage/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

use once_cell::sync::Lazy;
use quickwit_common::metrics::{
new_counter, new_counter_with_labels, new_gauge, IntCounter, IntGauge,
new_counter, new_counter_vec, new_counter_with_labels, new_gauge, IntCounter, IntGauge,
};

/// Counters associated to storage operations.
Expand All @@ -32,6 +32,7 @@ pub struct StorageMetrics {
pub fast_field_cache: CacheMetrics,
pub split_footer_cache: CacheMetrics,
pub searcher_split_cache: CacheMetrics,
pub get_slice_timeout_total_by_attempts: [IntCounter; 5],
pub object_storage_get_total: IntCounter,
pub object_storage_put_total: IntCounter,
pub object_storage_put_parts: IntCounter,
Expand All @@ -41,14 +42,28 @@ pub struct StorageMetrics {

impl Default for StorageMetrics {
fn default() -> Self {
let get_slice_timeout_total = new_counter_vec(
"get_slice_timeout_total",
"Number of `get_slice` operations that timed out",
"storage",
&[],
["attempt"],
);
let get_slice_timeout_total_by_attempts = [
get_slice_timeout_total.with_label_values(["0"]),
get_slice_timeout_total.with_label_values(["1"]),
get_slice_timeout_total.with_label_values(["2"]),
get_slice_timeout_total.with_label_values(["3"]),
get_slice_timeout_total.with_label_values(["4"]),
];
StorageMetrics {
fast_field_cache: CacheMetrics::for_component("fastfields"),
fd_cache_metrics: CacheMetrics::for_component("fd"),
partial_request_cache: CacheMetrics::for_component("partial_request"),
searcher_split_cache: CacheMetrics::for_component("searcher_split"),
shortlived_cache: CacheMetrics::for_component("shortlived"),
split_footer_cache: CacheMetrics::for_component("splitfooter"),

get_slice_timeout_total_by_attempts,
object_storage_get_total: new_counter(
"object_storage_gets_total",
"Number of objects fetched.",
Expand Down
Loading

0 comments on commit aa9c516

Please sign in to comment.