diff --git a/quickwit/quickwit-common/Cargo.toml b/quickwit/quickwit-common/Cargo.toml index 675522c7d9f..83170a8ec56 100644 --- a/quickwit/quickwit-common/Cargo.toml +++ b/quickwit/quickwit-common/Cargo.toml @@ -37,7 +37,7 @@ siphasher = { workspace = true } tempfile = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } -tokio-metrics ={ workspace = true } +tokio-metrics = { workspace = true } tokio-stream = { workspace = true } tonic = { workspace = true } tower = { workspace = true } @@ -51,3 +51,4 @@ named_tasks = ["tokio/tracing"] serde_json = { workspace = true } tempfile = { workspace = true } proptest = { workspace = true } +tokio = { workspace = true, features = ["test-util"] } diff --git a/quickwit/quickwit-common/src/tower/circuit_breaker.rs b/quickwit/quickwit-common/src/tower/circuit_breaker.rs new file mode 100644 index 00000000000..ae80516ae83 --- /dev/null +++ b/quickwit/quickwit-common/src/tower/circuit_breaker.rs @@ -0,0 +1,372 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::future::Future; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll}; +use std::time::Duration; + +use pin_project::pin_project; +use prometheus::IntCounter; +use tokio::time::Instant; +use tower::{Layer, Service}; + +/// The circuit breaker layer implements the [circuit breaker pattern](https://martinfowler.com/bliki/CircuitBreaker.html). +/// +/// It counts the errors emitted by the inner service, and if the number of errors exceeds a certain +/// threshold within a certain time window, it will "open" the circuit. +/// +/// Requests will then be rejected for a given timeout. +/// After this timeout, the circuit breaker ends up in a HalfOpen state. It will allow a single +/// request to pass through. Depending on the result of this request, the circuit breaker will +/// either close the circuit again or open it again. +/// +/// Implementation detail: +/// +/// A circuit breaker needs to have some logic to estimate the chances for the next request +/// to fail. In this implementation, we use a simple heuristic that does not take in account +/// successes. We simply count the number or errors which happened in the last window. +/// +/// The circuit breaker does not attempt to measure accurately the error rate. +/// Instead, it counts errors, and check for the time window in which these errors occurred. +/// This approach is accurate enough, robust, very easy to code and avoids calling the +/// `Instant::now()` at every error in the open state. +#[derive(Debug, Clone)] +pub struct CircuitBreakerLayer { + max_error_count_per_time_window: u32, + time_window: Duration, + timeout: Duration, + evaluator: Evaluator, + circuit_break_total: prometheus::IntCounter, +} + +pub trait CircuitBreakerEvaluator: Clone { + type Response; + type Error; + fn is_circuit_breaker_error(&self, output: &Result) -> bool; + fn make_circuit_breaker_output(&self) -> Self::Error; + fn make_layer( + self, + max_num_errors_per_secs: u32, + timeout: Duration, + circuit_break_total: prometheus::IntCounter, + ) -> CircuitBreakerLayer { + CircuitBreakerLayer { + max_error_count_per_time_window: max_num_errors_per_secs, + time_window: Duration::from_secs(1), + timeout, + evaluator: self, + circuit_break_total, + } + } +} + +impl Layer for CircuitBreakerLayer { + type Service = CircuitBreaker; + + fn layer(&self, service: S) -> CircuitBreaker { + let time_window = Duration::from_millis(self.time_window.as_millis() as u64); + let timeout = Duration::from_millis(self.timeout.as_millis() as u64); + CircuitBreaker { + underlying: service, + circuit_breaker_inner: Arc::new(Mutex::new(CircuitBreakerInner { + max_error_count_per_time_window: self.max_error_count_per_time_window, + time_window, + timeout, + state: CircuitBreakerState::Closed(ClosedState { + error_counter: 0u32, + error_window_end: Instant::now() + time_window, + }), + evaluator: self.evaluator.clone(), + circuit_break_total: self.circuit_break_total.clone(), + })), + } + } +} + +struct CircuitBreakerInner { + max_error_count_per_time_window: u32, + time_window: Duration, + timeout: Duration, + evaluator: Evaluator, + state: CircuitBreakerState, + circuit_break_total: IntCounter, +} + +impl CircuitBreakerInner { + fn get_state(&mut self) -> CircuitBreakerState { + let new_state = match self.state { + CircuitBreakerState::Open { until } => { + let now = Instant::now(); + if now < until { + CircuitBreakerState::Open { until } + } else { + CircuitBreakerState::HalfOpen + } + } + other => other, + }; + self.state = new_state; + new_state + } + + fn receive_error(&mut self) { + match self.state { + CircuitBreakerState::HalfOpen => { + self.circuit_break_total.inc(); + self.state = CircuitBreakerState::Open { + until: Instant::now() + self.timeout, + } + } + CircuitBreakerState::Open { .. } => {} + CircuitBreakerState::Closed(ClosedState { + error_counter, + error_window_end, + }) => { + if error_counter < self.max_error_count_per_time_window { + self.state = CircuitBreakerState::Closed(ClosedState { + error_counter: error_counter + 1, + error_window_end, + }); + return; + } + let now = Instant::now(); + if now < error_window_end { + self.circuit_break_total.inc(); + self.state = CircuitBreakerState::Open { + until: now + self.timeout, + }; + } else { + self.state = CircuitBreakerState::Closed(ClosedState { + error_counter: 0u32, + error_window_end: now + self.time_window, + }); + } + } + } + } + + fn receive_success(&mut self) { + match self.state { + CircuitBreakerState::HalfOpen | CircuitBreakerState::Open { .. } => { + self.state = CircuitBreakerState::Closed(ClosedState { + error_counter: 0u32, + error_window_end: Instant::now() + self.time_window, + }); + } + CircuitBreakerState::Closed { .. } => { + // We could actually take that as a signal. + } + } + } +} + +#[derive(Clone)] +pub struct CircuitBreaker { + underlying: S, + circuit_breaker_inner: Arc>>, +} + +impl std::fmt::Debug for CircuitBreaker { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("CircuitBreaker").finish() + } +} + +#[derive(Debug, Clone, Copy)] +enum CircuitBreakerState { + Open { until: Instant }, + HalfOpen, + Closed(ClosedState), +} + +#[derive(Debug, Clone, Copy)] +struct ClosedState { + error_counter: u32, + error_window_end: Instant, +} + +impl Service for CircuitBreaker +where + S: Service, + Evaluator: CircuitBreakerEvaluator, +{ + type Response = S::Response; + type Error = S::Error; + type Future = CircuitBreakerFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + let mut inner = self.circuit_breaker_inner.lock().unwrap(); + let state = inner.get_state(); + match state { + CircuitBreakerState::Closed { .. } | CircuitBreakerState::HalfOpen => { + self.underlying.poll_ready(cx) + } + CircuitBreakerState::Open { .. } => { + let circuit_break_error = inner.evaluator.make_circuit_breaker_output(); + Poll::Ready(Err(circuit_break_error)) + } + } + } + + fn call(&mut self, request: R) -> Self::Future { + CircuitBreakerFuture { + underlying_fut: self.underlying.call(request), + circuit_breaker_inner: self.circuit_breaker_inner.clone(), + } + } +} + +#[pin_project] +pub struct CircuitBreakerFuture { + #[pin] + underlying_fut: F, + circuit_breaker_inner: Arc>>, +} + +impl Future for CircuitBreakerFuture +where + F: Future>, + Evaluator: CircuitBreakerEvaluator, +{ + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let circuit_breaker_inner = self.circuit_breaker_inner.clone(); + let poll_res = self.project().underlying_fut.poll(cx); + match poll_res { + Poll::Pending => Poll::Pending, + Poll::Ready(result) => { + let mut circuit_breaker_inner_lock = circuit_breaker_inner.lock().unwrap(); + let is_circuit_breaker_error = circuit_breaker_inner_lock + .evaluator + .is_circuit_breaker_error(&result); + if is_circuit_breaker_error { + circuit_breaker_inner_lock.receive_error(); + } else { + circuit_breaker_inner_lock.receive_success(); + } + Poll::Ready(result) + } + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::atomic::{AtomicBool, Ordering}; + + use tower::{ServiceBuilder, ServiceExt}; + + use super::*; + + #[derive(Debug)] + enum TestError { + CircuitBreak, + ServiceError, + } + + #[derive(Debug, Clone, Copy)] + struct TestCircuitBreakerEvaluator; + + impl CircuitBreakerEvaluator for TestCircuitBreakerEvaluator { + type Response = (); + type Error = TestError; + + fn is_circuit_breaker_error(&self, output: &Result) -> bool { + output.is_err() + } + + fn make_circuit_breaker_output(&self) -> TestError { + TestError::CircuitBreak + } + } + + #[tokio::test] + async fn test_circuit_breaker() { + tokio::time::pause(); + let test_switch: Arc = Arc::new(AtomicBool::new(true)); + + const TIMEOUT: Duration = Duration::from_millis(500); + + let int_counter: prometheus::IntCounter = + IntCounter::new("circuit_break_total_test", "test circuit breaker counter").unwrap(); + let mut service = ServiceBuilder::new() + .layer(TestCircuitBreakerEvaluator.make_layer(10, TIMEOUT, int_counter)) + .service_fn(|_| async { + if test_switch.load(Ordering::Relaxed) { + Ok(()) + } else { + Err(TestError::ServiceError) + } + }); + + service.ready().await.unwrap().call(()).await.unwrap(); + + for _ in 0..1_000 { + service.ready().await.unwrap().call(()).await.unwrap(); + } + + test_switch.store(false, Ordering::Relaxed); + + let mut service_error_count = 0; + let mut circuit_break_count = 0; + for _ in 0..1_000 { + match service.ready().await { + Ok(service) => { + service.call(()).await.unwrap_err(); + service_error_count += 1; + } + Err(_circuit_breaker_error) => { + circuit_break_count += 1; + } + } + } + + assert_eq!(service_error_count + circuit_break_count, 1_000); + assert_eq!(service_error_count, 11); + + tokio::time::advance(TIMEOUT).await; + + // The test request at half open fails. + for _ in 0..1_000 { + match service.ready().await { + Ok(service) => { + service.call(()).await.unwrap_err(); + service_error_count += 1; + } + Err(_circuit_breaker_error) => { + circuit_break_count += 1; + } + } + } + + assert_eq!(service_error_count + circuit_break_count, 2_000); + assert_eq!(service_error_count, 12); + + test_switch.store(true, Ordering::Relaxed); + tokio::time::advance(TIMEOUT).await; + + // The test request at half open succeeds. + for _ in 0..1_000 { + service.ready().await.unwrap().call(()).await.unwrap(); + } + } +} diff --git a/quickwit/quickwit-common/src/tower/mod.rs b/quickwit/quickwit-common/src/tower/mod.rs index a39f28ba0fe..0d761e0bcfc 100644 --- a/quickwit/quickwit-common/src/tower/mod.rs +++ b/quickwit/quickwit-common/src/tower/mod.rs @@ -21,6 +21,7 @@ mod box_layer; mod box_service; mod buffer; mod change; +mod circuit_breaker; mod delay; mod estimate_rate; mod event_listener; @@ -41,6 +42,7 @@ pub use box_layer::BoxLayer; pub use box_service::BoxService; pub use buffer::{Buffer, BufferError, BufferLayer}; pub use change::Change; +pub use circuit_breaker::{CircuitBreaker, CircuitBreakerEvaluator, CircuitBreakerLayer}; pub use delay::{Delay, DelayLayer}; pub use estimate_rate::{EstimateRate, EstimateRateLayer}; pub use event_listener::{EventListener, EventListenerLayer}; diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index aa88416ef36..47f04c022b6 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -71,9 +71,9 @@ use quickwit_common::rate_limiter::RateLimiterSettings; use quickwit_common::retry::RetryParams; use quickwit_common::runtimes::RuntimesConfig; use quickwit_common::tower::{ - BalanceChannel, BoxFutureInfaillible, BufferLayer, Change, ConstantRate, EstimateRateLayer, - EventListenerLayer, GrpcMetricsLayer, LoadShedLayer, RateLimitLayer, RetryLayer, RetryPolicy, - SmaRateEstimator, + BalanceChannel, BoxFutureInfaillible, BufferLayer, Change, CircuitBreakerEvaluator, + ConstantRate, EstimateRateLayer, EventListenerLayer, GrpcMetricsLayer, LoadShedLayer, + RateLimitLayer, RetryLayer, RetryPolicy, SmaRateEstimator, }; use quickwit_common::uri::Uri; use quickwit_common::{get_bool_from_env, spawn_named_task}; @@ -100,8 +100,10 @@ use quickwit_proto::control_plane::ControlPlaneServiceClient; use quickwit_proto::indexing::{IndexingServiceClient, ShardPositionsUpdate}; use quickwit_proto::ingest::ingester::{ IngesterService, IngesterServiceClient, IngesterServiceTowerLayerStack, IngesterStatus, + PersistFailureReason, PersistResponse, }; use quickwit_proto::ingest::router::IngestRouterServiceClient; +use quickwit_proto::ingest::IngestV2Error; use quickwit_proto::metastore::{ EntityKind, ListIndexesMetadataRequest, MetastoreError, MetastoreService, MetastoreServiceClient, @@ -786,6 +788,32 @@ pub async fn serve_quickwit( Ok(actor_exit_statuses) } +#[derive(Clone, Copy)] +struct PersistCircuitBreakerEvaluator; + +impl CircuitBreakerEvaluator for PersistCircuitBreakerEvaluator { + type Response = PersistResponse; + + type Error = IngestV2Error; + + fn is_circuit_breaker_error(&self, output: &Result) -> bool { + let Ok(persist_response) = output.as_ref() else { + return false; + }; + for persist_failure in &persist_response.failures { + // This is the error we return when the WAL is full. + if persist_failure.reason() == PersistFailureReason::ResourceExhausted { + return true; + } + } + false + } + + fn make_circuit_breaker_output(&self) -> IngestV2Error { + IngestV2Error::TooManyRequests + } +} + /// Stack of layers to use on the server side of the ingester service. fn ingester_service_layer_stack( layer_stack: IngesterServiceTowerLayerStack, @@ -793,6 +821,14 @@ fn ingester_service_layer_stack( layer_stack .stack_layer(INGEST_GRPC_SERVER_METRICS_LAYER.clone()) .stack_persist_layer(quickwit_common::tower::OneTaskPerCallLayer) + .stack_persist_layer( + // "3" may seem a little bit low, but we only consider error caused by a full WAL. + PersistCircuitBreakerEvaluator.make_layer( + 3, + Duration::from_millis(500), + crate::metrics::SERVE_METRICS.circuit_break_total.clone(), + ), + ) .stack_open_replication_stream_layer(quickwit_common::tower::OneTaskPerCallLayer) .stack_init_shards_layer(quickwit_common::tower::OneTaskPerCallLayer) .stack_retain_shards_layer(quickwit_common::tower::OneTaskPerCallLayer) diff --git a/quickwit/quickwit-serve/src/metrics.rs b/quickwit/quickwit-serve/src/metrics.rs index 7a2365a761f..7df17b09fb1 100644 --- a/quickwit/quickwit-serve/src/metrics.rs +++ b/quickwit/quickwit-serve/src/metrics.rs @@ -19,19 +19,27 @@ use once_cell::sync::Lazy; use quickwit_common::metrics::{ - new_counter_vec, new_gauge_vec, new_histogram_vec, HistogramVec, IntCounterVec, IntGaugeVec, + new_counter, new_counter_vec, new_gauge_vec, new_histogram_vec, HistogramVec, IntCounter, + IntCounterVec, IntGaugeVec, }; -pub struct RestMetrics { +pub struct ServeMetrics { pub http_requests_total: IntCounterVec<2>, pub request_duration_secs: HistogramVec<2>, pub ongoing_requests: IntGaugeVec<1>, pub pending_requests: IntGaugeVec<1>, + pub circuit_break_total: IntCounter, } -impl Default for RestMetrics { +impl Default for ServeMetrics { fn default() -> Self { - RestMetrics { + let circuit_break_total = new_counter( + "circuit_break_total", + "Circuit breaker counter", + "grpc", + &[], + ); + ServeMetrics { http_requests_total: new_counter_vec( "http_requests_total", "Total number of HTTP requests processed.", @@ -61,9 +69,10 @@ impl Default for RestMetrics { &[], ["endpoint_group"], ), + circuit_break_total, } } } /// Serve counters exposes a bunch a set of metrics about the request received to quickwit. -pub static SERVE_METRICS: Lazy = Lazy::new(RestMetrics::default); +pub static SERVE_METRICS: Lazy = Lazy::new(ServeMetrics::default);