diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml
index ebc7eeaac68..b458d28b9a4 100644
--- a/quickwit/Cargo.toml
+++ b/quickwit/Cargo.toml
@@ -170,7 +170,7 @@ tokio-util = { version = "0.7", features = ["full"] }
toml = "0.7.6"
tonic = { version = "0.9.0", features = ["gzip"] }
tonic-build = "0.9.0"
-tower = { version = "0.4.13", features = ["balance", "buffer", "load", "util"] }
+tower = { version = "0.4.13", features = ["balance", "buffer", "load", "retry", "util"] }
tower-http = { version = "0.4.0", features = ["compression-gzip", "cors"] }
tracing = "0.1.37"
tracing-opentelemetry = "0.19.0"
diff --git a/quickwit/quickwit-aws/Cargo.toml b/quickwit/quickwit-aws/Cargo.toml
index 170d9fbb28b..132837094d5 100644
--- a/quickwit/quickwit-aws/Cargo.toml
+++ b/quickwit/quickwit-aws/Cargo.toml
@@ -11,10 +11,10 @@ documentation = "https://quickwit.io/docs/"
[dependencies]
aws-config = { workspace = true }
+aws-sdk-kinesis = { workspace = true, optional = true }
aws-sdk-s3 = { workspace = true }
-aws-smithy-client = { workspace = true }
aws-smithy-async = { workspace = true }
-aws-sdk-kinesis = { workspace = true, optional = true }
+aws-smithy-client = { workspace = true }
aws-types = { workspace = true }
anyhow = { workspace = true }
@@ -24,10 +24,10 @@ hyper = { workspace = true }
hyper-rustls = { workspace = true }
once_cell = { workspace = true }
rand = { workspace = true }
+thiserror = { workspace = true }
tokio = { workspace = true }
-tracing = { workspace = true }
tower = { workspace = true }
-thiserror = { workspace = true }
+tracing = { workspace = true }
[dev-dependencies]
quickwit-actors = { workspace = true, features = ["testsuite"] }
diff --git a/quickwit/quickwit-common/src/lib.rs b/quickwit/quickwit-common/src/lib.rs
index 02d8072083d..df054c685a3 100644
--- a/quickwit/quickwit-common/src/lib.rs
+++ b/quickwit/quickwit-common/src/lib.rs
@@ -32,6 +32,7 @@ mod progress;
pub mod pubsub;
pub mod rand;
pub mod rendezvous_hasher;
+pub mod retry;
pub mod runtimes;
pub mod shared_consts;
pub mod sorted_iter;
diff --git a/quickwit/quickwit-common/src/retry.rs b/quickwit/quickwit-common/src/retry.rs
new file mode 100644
index 00000000000..455abdbc4a2
--- /dev/null
+++ b/quickwit/quickwit-common/src/retry.rs
@@ -0,0 +1,72 @@
+// Copyright (C) 2023 Quickwit, Inc.
+//
+// Quickwit is offered under the AGPL v3.0 and as commercial software.
+// For commercial licensing, contact us at hello@quickwit.io.
+//
+// AGPL:
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+use std::time::Duration;
+
+use rand::Rng;
+
+const DEFAULT_MAX_ATTEMPTS: usize = 30;
+const DEFAULT_BASE_DELAY: Duration = Duration::from_millis(250);
+const DEFAULT_MAX_DELAY: Duration = Duration::from_secs(20);
+
+pub trait Retryable {
+ fn is_retryable(&self) -> bool {
+ false
+ }
+}
+
+#[derive(Debug, Clone, Copy)]
+pub struct RetryParams {
+ pub base_delay: Duration,
+ pub max_delay: Duration,
+ pub max_attempts: usize,
+}
+
+impl Default for RetryParams {
+ fn default() -> Self {
+ Self {
+ base_delay: DEFAULT_BASE_DELAY,
+ max_delay: DEFAULT_MAX_DELAY,
+ max_attempts: DEFAULT_MAX_ATTEMPTS,
+ }
+ }
+}
+
+impl RetryParams {
+ /// Computes the delay after which a new attempt should be performed. The randomized delay
+ /// increases after each attempt (exponential backoff and full jitter). Implementation and
+ /// default values originate from the Java SDK. See also: .
+ pub fn compute_delay(&self, num_retries: usize) -> Duration {
+ let delay_ms = self.base_delay.as_millis() as u64 * 2u64.pow(num_retries as u32);
+ let ceil_delay_ms = delay_ms.min(self.max_delay.as_millis() as u64);
+ let half_delay_ms = ceil_delay_ms / 2;
+ let jitter_range = 0..half_delay_ms + 1;
+ let jittered_delay_ms = half_delay_ms + rand::thread_rng().gen_range(jitter_range);
+ Duration::from_millis(jittered_delay_ms)
+ }
+
+ #[cfg(any(test, feature = "testsuite"))]
+ pub fn for_test() -> Self {
+ Self {
+ base_delay: Duration::from_millis(1),
+ max_delay: Duration::from_millis(2),
+ ..Default::default()
+ }
+ }
+}
diff --git a/quickwit/quickwit-common/src/tower/mod.rs b/quickwit/quickwit-common/src/tower/mod.rs
index 79d5dd5e0e9..3434bf925c7 100644
--- a/quickwit/quickwit-common/src/tower/mod.rs
+++ b/quickwit/quickwit-common/src/tower/mod.rs
@@ -27,6 +27,7 @@ mod pool;
mod rate;
mod rate_estimator;
mod rate_limit;
+mod retry;
mod transport;
use std::error;
diff --git a/quickwit/quickwit-common/src/tower/retry.rs b/quickwit/quickwit-common/src/tower/retry.rs
new file mode 100644
index 00000000000..faac6c557e2
--- /dev/null
+++ b/quickwit/quickwit-common/src/tower/retry.rs
@@ -0,0 +1,224 @@
+// Copyright (C) 2023 Quickwit, Inc.
+//
+// Quickwit is offered under the AGPL v3.0 and as commercial software.
+// For commercial licensing, contact us at hello@quickwit.io.
+//
+// AGPL:
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+use std::any::type_name;
+use std::fmt;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+use futures::Future;
+use pin_project::pin_project;
+use tokio::time::Sleep;
+use tower::retry::Policy;
+use tracing::debug;
+
+use crate::retry::{RetryParams, Retryable};
+
+#[derive(Debug, Clone, Copy, Default)]
+pub struct RetryPolicy {
+ num_retries: usize,
+ retry_params: RetryParams,
+}
+
+impl From for RetryPolicy {
+ fn from(retry_params: RetryParams) -> Self {
+ Self {
+ num_retries: 0,
+ retry_params,
+ }
+ }
+}
+
+#[pin_project]
+pub struct RetryFuture {
+ retry_policy: RetryPolicy,
+ #[pin]
+ sleep_fut: Sleep,
+}
+
+impl Future for RetryFuture {
+ type Output = RetryPolicy;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll {
+ let this = self.project();
+ this.sleep_fut.poll(cx).map(|_| *this.retry_policy)
+ }
+}
+
+impl Policy for RetryPolicy
+where
+ R: Clone,
+ E: fmt::Debug + Retryable,
+{
+ type Future = RetryFuture;
+
+ fn retry(&self, _request: &R, result: Result<&T, &E>) -> Option {
+ match result {
+ Ok(_) => None,
+ Err(error) => {
+ let num_attempts = self.num_retries + 1;
+
+ if !error.is_retryable() || num_attempts >= self.retry_params.max_attempts {
+ None
+ } else {
+ let delay = self.retry_params.compute_delay(self.num_retries);
+ debug!(
+ num_attempts=%num_attempts,
+ delay_millis=%delay.as_millis(),
+ error=?error,
+ "{} request failed, retrying.", type_name::()
+ );
+ let retry_policy = Self {
+ num_retries: num_attempts,
+ retry_params: self.retry_params,
+ };
+ let sleep_fut = tokio::time::sleep(delay);
+ let retry_fut = RetryFuture {
+ retry_policy,
+ sleep_fut,
+ };
+ Some(retry_fut)
+ }
+ }
+ }
+ }
+
+ fn clone_request(&self, request: &R) -> Option {
+ Some(request.clone())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::sync::atomic::{AtomicUsize, Ordering};
+ use std::sync::{Arc, Mutex};
+
+ use futures::future::{ready, Ready};
+ use tower::retry::RetryLayer;
+ use tower::{Layer, Service, ServiceExt};
+
+ use super::*;
+
+ #[derive(Debug, Eq, PartialEq)]
+ pub enum Retry {
+ Permanent(E),
+ Transient(E),
+ }
+
+ impl Retryable for Retry {
+ fn is_retryable(&self) -> bool {
+ match self {
+ Retry::Permanent(_) => false,
+ Retry::Transient(_) => true,
+ }
+ }
+ }
+
+ #[derive(Debug, Clone, Default)]
+ struct HelloService;
+
+ type HelloResults = Arc>>>>;
+
+ #[derive(Debug, Clone, Default)]
+ struct HelloRequest {
+ num_attempts: Arc,
+ results: HelloResults,
+ }
+
+ impl Service for HelloService {
+ type Response = ();
+ type Error = Retry<()>;
+ type Future = Ready>>;
+
+ fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn call(&mut self, request: HelloRequest) -> Self::Future {
+ request.num_attempts.fetch_add(1, Ordering::Relaxed);
+ let result = request
+ .results
+ .lock()
+ .expect("The lock should not be poisoned.")
+ .pop()
+ .unwrap_or(Err(Retry::Permanent(())));
+ ready(result)
+ }
+ }
+
+ #[tokio::test]
+ async fn test_retry_policy() {
+ let retry_policy = RetryPolicy::from(RetryParams::for_test());
+ let retry_layer = RetryLayer::new(retry_policy);
+ let mut retry_hello_service = retry_layer.layer(HelloService);
+
+ let hello_request = HelloRequest {
+ results: Arc::new(Mutex::new(vec![Ok(())])),
+ ..Default::default()
+ };
+ retry_hello_service
+ .ready()
+ .await
+ .unwrap()
+ .call(hello_request.clone())
+ .await
+ .unwrap();
+ assert_eq!(hello_request.num_attempts.load(Ordering::Relaxed), 1);
+
+ let hello_request = HelloRequest {
+ results: Arc::new(Mutex::new(vec![Ok(()), Err(Retry::Transient(()))])),
+ ..Default::default()
+ };
+ retry_hello_service
+ .ready()
+ .await
+ .unwrap()
+ .call(hello_request.clone())
+ .await
+ .unwrap();
+ assert_eq!(hello_request.num_attempts.load(Ordering::Relaxed), 2);
+
+ let hello_request = HelloRequest {
+ results: Arc::new(Mutex::new(vec![
+ Err(Retry::Transient(())),
+ Err(Retry::Transient(())),
+ Err(Retry::Transient(())),
+ ])),
+ ..Default::default()
+ };
+ retry_hello_service
+ .ready()
+ .await
+ .unwrap()
+ .call(hello_request.clone())
+ .await
+ .unwrap_err();
+ assert_eq!(hello_request.num_attempts.load(Ordering::Relaxed), 4);
+
+ let hello_request = HelloRequest::default();
+ retry_hello_service
+ .ready()
+ .await
+ .unwrap()
+ .call(hello_request.clone())
+ .await
+ .unwrap_err();
+ assert_eq!(hello_request.num_attempts.load(Ordering::Relaxed), 1);
+ }
+}
diff --git a/quickwit/quickwit-metastore/src/metastore/retrying_metastore/error.rs b/quickwit/quickwit-metastore/src/metastore/retrying_metastore/error.rs
index 4974471aa4a..f3054c17e7d 100644
--- a/quickwit/quickwit-metastore/src/metastore/retrying_metastore/error.rs
+++ b/quickwit/quickwit-metastore/src/metastore/retrying_metastore/error.rs
@@ -17,7 +17,8 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
-use super::retry::Retryable;
+use quickwit_common::retry::Retryable;
+
use crate::MetastoreError;
impl Retryable for MetastoreError {
diff --git a/quickwit/quickwit-metastore/src/metastore/retrying_metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/retrying_metastore/mod.rs
index d4f0d66cf6e..115655699fe 100644
--- a/quickwit/quickwit-metastore/src/metastore/retrying_metastore/mod.rs
+++ b/quickwit/quickwit-metastore/src/metastore/retrying_metastore/mod.rs
@@ -23,12 +23,13 @@ mod retry;
mod test;
use async_trait::async_trait;
+use quickwit_common::retry::RetryParams;
use quickwit_common::uri::Uri;
use quickwit_config::{IndexConfig, SourceConfig};
use quickwit_proto::metastore::{DeleteQuery, DeleteTask};
use quickwit_proto::IndexUid;
-use self::retry::{retry, RetryParams};
+use self::retry::retry;
use crate::checkpoint::IndexCheckpointDelta;
use crate::{
IndexMetadata, ListIndexesQuery, ListSplitsQuery, Metastore, MetastoreResult, Split,
diff --git a/quickwit/quickwit-metastore/src/metastore/retrying_metastore/retry.rs b/quickwit/quickwit-metastore/src/metastore/retrying_metastore/retry.rs
index 384e10b73dc..4e27fb5dd3b 100644
--- a/quickwit/quickwit-metastore/src/metastore/retrying_metastore/retry.rs
+++ b/quickwit/quickwit-metastore/src/metastore/retrying_metastore/retry.rs
@@ -18,39 +18,11 @@
// along with this program. If not, see .
use std::fmt::Debug;
-use std::time::Duration;
use futures::Future;
-use rand::Rng;
+use quickwit_common::retry::{RetryParams, Retryable};
use tracing::{debug, warn};
-const DEFAULT_MAX_RETRY_ATTEMPTS: usize = 30;
-const DEFAULT_BASE_DELAY: Duration = Duration::from_millis(if cfg!(test) { 1 } else { 250 });
-const DEFAULT_MAX_DELAY: Duration = Duration::from_millis(if cfg!(test) { 1 } else { 20_000 });
-
-pub trait Retryable {
- fn is_retryable(&self) -> bool {
- false
- }
-}
-
-#[derive(Clone)]
-pub struct RetryParams {
- pub base_delay: Duration,
- pub max_delay: Duration,
- pub max_attempts: usize,
-}
-
-impl Default for RetryParams {
- fn default() -> Self {
- Self {
- base_delay: DEFAULT_BASE_DELAY,
- max_delay: DEFAULT_MAX_DELAY,
- max_attempts: DEFAULT_MAX_RETRY_ATTEMPTS,
- }
- }
-}
-
/// Retry with exponential backoff and full jitter. Implementation and default values originate from
/// the Java SDK. See also: .
pub async fn retry(retry_params: &RetryParams, f: F) -> Result
@@ -59,12 +31,12 @@ where
Fut: Future