Skip to content

Commit

Permalink
Apply suggestions from code review
Browse files Browse the repository at this point in the history
Co-authored-by: trinity-1686a <[email protected]>
  • Loading branch information
fulmicoton and trinity-1686a committed Oct 2, 2024
1 parent aa9c516 commit 2a78e2b
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 26 deletions.
6 changes: 4 additions & 2 deletions quickwit/quickwit-config/src/node_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,9 @@ pub struct SearcherConfig {
pub struct StorageTimeoutPolicy {
pub min_throughtput_bytes_per_secs: u64,
pub timeout_millis: u64,
pub repeat: usize,
// Disclaimer: this is a number of retry, so the overall max number of
// attempts is `max_num_retries + 1``.
pub max_num_retries: usize,
}

impl StorageTimeoutPolicy {
Expand All @@ -255,7 +257,7 @@ impl StorageTimeoutPolicy {
};
let timeout = Duration::from_millis(self.timeout_millis)
+ Duration::from_secs_f64(min_download_time_secs);
std::iter::repeat(timeout).take(self.repeat)
std::iter::repeat(timeout).take(self.max_num_retries + 1)
}
}

Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ pub(crate) async fn open_index_with_caches(
tokenizer_manager: Option<&TokenizerManager>,
ephemeral_unbounded_cache: bool,
) -> anyhow::Result<Index> {
// Let's a layer to retry `get_slice` requests if they are taking too long,
// Let's add a storage proxy to retry `get_slice` requests if they are taking too long,
// if configured in the searcher config.
//
// The goal here is too ensure a low latency.
Expand Down
27 changes: 15 additions & 12 deletions quickwit/quickwit-storage/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ pub struct StorageMetrics {
pub fast_field_cache: CacheMetrics,
pub split_footer_cache: CacheMetrics,
pub searcher_split_cache: CacheMetrics,
pub get_slice_timeout_total_by_attempts: [IntCounter; 5],
pub get_slice_timeout_successes: [IntCounter; 3],
pub get_slice_timeout_all_timeouts: IntCounter,
pub object_storage_get_total: IntCounter,
pub object_storage_put_total: IntCounter,
pub object_storage_put_parts: IntCounter,
Expand All @@ -42,28 +43,30 @@ pub struct StorageMetrics {

impl Default for StorageMetrics {
fn default() -> Self {
let get_slice_timeout_total = new_counter_vec(
"get_slice_timeout_total",
"Number of `get_slice` operations that timed out",
let get_slice_timeout_outcome_total_vec = new_counter_vec(
"get_slice_timeout_outcome",
"Outcome of get_slice operations. success_after_1_timeout means the operation \
succeeded after a retry caused by a timeout.",
"storage",
&[],
["attempt"],
["outcome"],
);
let get_slice_timeout_total_by_attempts = [
get_slice_timeout_total.with_label_values(["0"]),
get_slice_timeout_total.with_label_values(["1"]),
get_slice_timeout_total.with_label_values(["2"]),
get_slice_timeout_total.with_label_values(["3"]),
get_slice_timeout_total.with_label_values(["4"]),
let get_slice_timeout_successes = [
get_slice_timeout_outcome_total_vec.with_label_values(["success_after_0_timeout"]),
get_slice_timeout_outcome_total_vec.with_label_values(["success_after_1_timeout"]),
get_slice_timeout_outcome_total_vec.with_label_values(["success_after_2+_timeout"]),
];
let get_slice_timeout_all_timeouts =
get_slice_timeout_outcome_total_vec.with_label_values(["all_timeouts"]);
StorageMetrics {
fast_field_cache: CacheMetrics::for_component("fastfields"),
fd_cache_metrics: CacheMetrics::for_component("fd"),
partial_request_cache: CacheMetrics::for_component("partial_request"),
searcher_split_cache: CacheMetrics::for_component("searcher_split"),
shortlived_cache: CacheMetrics::for_component("shortlived"),
split_footer_cache: CacheMetrics::for_component("splitfooter"),
get_slice_timeout_total_by_attempts,
get_slice_timeout_successes,
get_slice_timeout_all_timeouts,
object_storage_get_total: new_counter(
"object_storage_gets_total",
"Number of objects fetched.",
Expand Down
37 changes: 26 additions & 11 deletions quickwit/quickwit-storage/src/timeout_and_retry_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,20 @@ use std::path::Path;
use std::sync::Arc;

use async_trait::async_trait;
use quickwit_common::rate_limited_info;
use quickwit_common::uri::Uri;
use quickwit_common::{rate_limited_info, rate_limited_warn};
use quickwit_config::StorageTimeoutPolicy;
use tantivy::directory::OwnedBytes;
use tokio::io::AsyncRead;
use tracing::warn;

use crate::storage::SendableAsync;
use crate::{BulkDeleteError, PutPayload, Storage, StorageErrorKind, StorageResult};

/// Storage proxy that implements a retry operation if the underlying storage
/// takes too long.
///
/// This is useful in order to unsure a low latency on S3.
/// This is useful in order to ensure a low latency on S3.
/// Retrying agressively is recommended for S3.

/// <https://docs.aws.amazon.com/whitepapers/latest/s3-optimizing-performance-best-practices/timeouts-and-retries-for-latency-sensitive-applications.html>
Expand Down Expand Up @@ -99,20 +100,25 @@ impl Storage for TimeoutAndRetryStorage {
.enumerate()
{
let get_slice_fut = self.underlying.get_slice(path, range.clone());
// TODO test avoid aborting timed out requests. #5468
match tokio::time::timeout(timeout_duration, get_slice_fut).await {
Ok(result) => return result,
Err(_elapsed) => {
if let Some(get_slice_timeout_count) = crate::STORAGE_METRICS
.get_slice_timeout_total_by_attempts
Ok(result) => {
crate::STORAGE_METRICS
.get_slice_timeout_successes
.get(attempt_id)
{
get_slice_timeout_count.inc();
}
.or(crate::STORAGE_METRICS.get_slice_timeout_successes.last())
.unwrap()
.inc();
return result;
}
Err(_elapsed) => {
rate_limited_info!(limit_per_min=60, num_bytes=num_bytes, path=%path.display(), timeout_secs=timeout_duration.as_secs_f32(), "get timeout elapsed");
continue;
}
}
}
rate_limited_warn!(limit_per_min=60, num_bytes=num_bytes, path=%path.display(), "all get_slice attempts timeouted");
crate::STORAGE_METRICS.get_slice_timeout_all_timeouts.inc();
return Err(
StorageErrorKind::Timeout.with_error(anyhow::anyhow!("internal timeout on get_slice"))
);
Expand Down Expand Up @@ -157,6 +163,8 @@ mod tests {
use std::sync::Mutex;
use std::time::Duration;

use tokio::time::Instant;

use super::*;

#[derive(Debug)]
Expand Down Expand Up @@ -244,14 +252,16 @@ mod tests {
async fn test_timeout_and_retry_storage() {
tokio::time::pause();

let path = Path::new("foo/bar");
let timeout_policy = StorageTimeoutPolicy {
min_throughtput_bytes_per_secs: 100_000,
timeout_millis: 2_000,
repeat: 2,
max_num_retries: 1,
};

let path = Path::new("foo/bar");

{
let now = Instant::now();
let storage_with_delay =
StorageWithDelay::new(vec![Duration::from_secs(5), Duration::from_secs(3)]);
let storage =
Expand All @@ -260,12 +270,17 @@ mod tests {
storage.get_slice(path, 10..100).await.unwrap_err().kind,
StorageErrorKind::Timeout
);
let elapsed = now.elapsed().as_millis();
assert!(elapsed.abs_diff(2 * 2_000) < 100);
}
{
let now = Instant::now();
let storage_with_delay =
StorageWithDelay::new(vec![Duration::from_secs(5), Duration::from_secs(1)]);
let storage = TimeoutAndRetryStorage::new(Arc::new(storage_with_delay), timeout_policy);
assert!(storage.get_slice(path, 10..100).await.is_ok(),);
let elapsed = now.elapsed().as_millis();
assert!(elapsed.abs_diff(2_000 + 1_000) < 100);
}
}
}

0 comments on commit 2a78e2b

Please sign in to comment.