Skip to content

Commit

Permalink
Implement Tower retry policy (#3751)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Aug 17, 2023
1 parent e1ce66f commit 8fd9ac4
Show file tree
Hide file tree
Showing 10 changed files with 319 additions and 52 deletions.
2 changes: 1 addition & 1 deletion quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ tokio-util = { version = "0.7", features = ["full"] }
toml = "0.7.6"
tonic = { version = "0.9.0", features = ["gzip"] }
tonic-build = "0.9.0"
tower = { version = "0.4.13", features = ["balance", "buffer", "load", "util"] }
tower = { version = "0.4.13", features = ["balance", "buffer", "load", "retry", "util"] }
tower-http = { version = "0.4.0", features = ["compression-gzip", "cors"] }
tracing = "0.1.37"
tracing-opentelemetry = "0.19.0"
Expand Down
8 changes: 4 additions & 4 deletions quickwit/quickwit-aws/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ documentation = "https://quickwit.io/docs/"

[dependencies]
aws-config = { workspace = true }
aws-sdk-kinesis = { workspace = true, optional = true }
aws-sdk-s3 = { workspace = true }
aws-smithy-client = { workspace = true }
aws-smithy-async = { workspace = true }
aws-sdk-kinesis = { workspace = true, optional = true }
aws-smithy-client = { workspace = true }
aws-types = { workspace = true }

anyhow = { workspace = true }
Expand All @@ -24,10 +24,10 @@ hyper = { workspace = true }
hyper-rustls = { workspace = true }
once_cell = { workspace = true }
rand = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
tower = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }

[dev-dependencies]
quickwit-actors = { workspace = true, features = ["testsuite"] }
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ mod progress;
pub mod pubsub;
pub mod rand;
pub mod rendezvous_hasher;
pub mod retry;
pub mod runtimes;
pub mod shared_consts;
pub mod sorted_iter;
Expand Down
72 changes: 72 additions & 0 deletions quickwit/quickwit-common/src/retry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright (C) 2023 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::time::Duration;

use rand::Rng;

const DEFAULT_MAX_ATTEMPTS: usize = 30;
const DEFAULT_BASE_DELAY: Duration = Duration::from_millis(250);
const DEFAULT_MAX_DELAY: Duration = Duration::from_secs(20);

pub trait Retryable {
fn is_retryable(&self) -> bool {
false
}
}

#[derive(Debug, Clone, Copy)]
pub struct RetryParams {
pub base_delay: Duration,
pub max_delay: Duration,
pub max_attempts: usize,
}

impl Default for RetryParams {
fn default() -> Self {
Self {
base_delay: DEFAULT_BASE_DELAY,
max_delay: DEFAULT_MAX_DELAY,
max_attempts: DEFAULT_MAX_ATTEMPTS,
}
}
}

impl RetryParams {
/// Computes the delay after which a new attempt should be performed. The randomized delay
/// increases after each attempt (exponential backoff and full jitter). Implementation and
/// default values originate from the Java SDK. See also: <https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/>.
pub fn compute_delay(&self, num_retries: usize) -> Duration {
let delay_ms = self.base_delay.as_millis() as u64 * 2u64.pow(num_retries as u32);
let ceil_delay_ms = delay_ms.min(self.max_delay.as_millis() as u64);
let half_delay_ms = ceil_delay_ms / 2;
let jitter_range = 0..half_delay_ms + 1;
let jittered_delay_ms = half_delay_ms + rand::thread_rng().gen_range(jitter_range);
Duration::from_millis(jittered_delay_ms)
}

#[cfg(any(test, feature = "testsuite"))]
pub fn for_test() -> Self {
Self {
base_delay: Duration::from_millis(1),
max_delay: Duration::from_millis(2),
..Default::default()
}
}
}
1 change: 1 addition & 0 deletions quickwit/quickwit-common/src/tower/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ mod pool;
mod rate;
mod rate_estimator;
mod rate_limit;
mod retry;
mod transport;

use std::error;
Expand Down
224 changes: 224 additions & 0 deletions quickwit/quickwit-common/src/tower/retry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
// Copyright (C) 2023 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::any::type_name;
use std::fmt;
use std::pin::Pin;
use std::task::{Context, Poll};

use futures::Future;
use pin_project::pin_project;
use tokio::time::Sleep;
use tower::retry::Policy;
use tracing::debug;

use crate::retry::{RetryParams, Retryable};

#[derive(Debug, Clone, Copy, Default)]
pub struct RetryPolicy {
num_retries: usize,
retry_params: RetryParams,
}

impl From<RetryParams> for RetryPolicy {
fn from(retry_params: RetryParams) -> Self {
Self {
num_retries: 0,
retry_params,
}
}
}

#[pin_project]
pub struct RetryFuture {
retry_policy: RetryPolicy,
#[pin]
sleep_fut: Sleep,
}

impl Future for RetryFuture {
type Output = RetryPolicy;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
this.sleep_fut.poll(cx).map(|_| *this.retry_policy)
}
}

impl<R, T, E> Policy<R, T, E> for RetryPolicy
where
R: Clone,
E: fmt::Debug + Retryable,
{
type Future = RetryFuture;

fn retry(&self, _request: &R, result: Result<&T, &E>) -> Option<Self::Future> {
match result {
Ok(_) => None,
Err(error) => {
let num_attempts = self.num_retries + 1;

if !error.is_retryable() || num_attempts >= self.retry_params.max_attempts {
None
} else {
let delay = self.retry_params.compute_delay(self.num_retries);
debug!(
num_attempts=%num_attempts,
delay_millis=%delay.as_millis(),
error=?error,
"{} request failed, retrying.", type_name::<R>()
);
let retry_policy = Self {
num_retries: num_attempts,
retry_params: self.retry_params,
};
let sleep_fut = tokio::time::sleep(delay);
let retry_fut = RetryFuture {
retry_policy,
sleep_fut,
};
Some(retry_fut)
}
}
}
}

fn clone_request(&self, request: &R) -> Option<R> {
Some(request.clone())
}
}

#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};

use futures::future::{ready, Ready};
use tower::retry::RetryLayer;
use tower::{Layer, Service, ServiceExt};

use super::*;

#[derive(Debug, Eq, PartialEq)]
pub enum Retry<E> {
Permanent(E),
Transient(E),
}

impl<E> Retryable for Retry<E> {
fn is_retryable(&self) -> bool {
match self {
Retry::Permanent(_) => false,
Retry::Transient(_) => true,
}
}
}

#[derive(Debug, Clone, Default)]
struct HelloService;

type HelloResults = Arc<Mutex<Vec<Result<(), Retry<()>>>>>;

#[derive(Debug, Clone, Default)]
struct HelloRequest {
num_attempts: Arc<AtomicUsize>,
results: HelloResults,
}

impl Service<HelloRequest> for HelloService {
type Response = ();
type Error = Retry<()>;
type Future = Ready<Result<(), Retry<()>>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, request: HelloRequest) -> Self::Future {
request.num_attempts.fetch_add(1, Ordering::Relaxed);
let result = request
.results
.lock()
.expect("The lock should not be poisoned.")
.pop()
.unwrap_or(Err(Retry::Permanent(())));
ready(result)
}
}

#[tokio::test]
async fn test_retry_policy() {
let retry_policy = RetryPolicy::from(RetryParams::for_test());
let retry_layer = RetryLayer::new(retry_policy);
let mut retry_hello_service = retry_layer.layer(HelloService);

let hello_request = HelloRequest {
results: Arc::new(Mutex::new(vec![Ok(())])),
..Default::default()
};
retry_hello_service
.ready()
.await
.unwrap()
.call(hello_request.clone())
.await
.unwrap();
assert_eq!(hello_request.num_attempts.load(Ordering::Relaxed), 1);

let hello_request = HelloRequest {
results: Arc::new(Mutex::new(vec![Ok(()), Err(Retry::Transient(()))])),
..Default::default()
};
retry_hello_service
.ready()
.await
.unwrap()
.call(hello_request.clone())
.await
.unwrap();
assert_eq!(hello_request.num_attempts.load(Ordering::Relaxed), 2);

let hello_request = HelloRequest {
results: Arc::new(Mutex::new(vec![
Err(Retry::Transient(())),
Err(Retry::Transient(())),
Err(Retry::Transient(())),
])),
..Default::default()
};
retry_hello_service
.ready()
.await
.unwrap()
.call(hello_request.clone())
.await
.unwrap_err();
assert_eq!(hello_request.num_attempts.load(Ordering::Relaxed), 4);

let hello_request = HelloRequest::default();
retry_hello_service
.ready()
.await
.unwrap()
.call(hello_request.clone())
.await
.unwrap_err();
assert_eq!(hello_request.num_attempts.load(Ordering::Relaxed), 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use super::retry::Retryable;
use quickwit_common::retry::Retryable;

use crate::MetastoreError;

impl Retryable for MetastoreError {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ mod retry;
mod test;

use async_trait::async_trait;
use quickwit_common::retry::RetryParams;
use quickwit_common::uri::Uri;
use quickwit_config::{IndexConfig, SourceConfig};
use quickwit_proto::metastore::{DeleteQuery, DeleteTask};
use quickwit_proto::IndexUid;

use self::retry::{retry, RetryParams};
use self::retry::retry;
use crate::checkpoint::IndexCheckpointDelta;
use crate::{
IndexMetadata, ListIndexesQuery, ListSplitsQuery, Metastore, MetastoreResult, Split,
Expand Down
Loading

0 comments on commit 8fd9ac4

Please sign in to comment.