From 1fba1d12eac4ae73c1a165e550fb634989406d28 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Fri, 5 Jul 2024 10:17:50 +0900 Subject: [PATCH] Adds a circuit breaker layer. (#5134) The piece that estimates whether the next request is likely to fail is extremely simplistic for the moment. It simply counter the number of errors (not taking in account successes) that happened in a given time window. The reason is that for the moment, we want to use it for persist requests when the WAL is full. On airmail, the aggressive retry logic of the client was causing a massive grpc storm on the faulty indexer node, taking all of its CPU and preventing it from getting out of that state. In this case, the error estimation logic is very simple, a full WAL guarantees that no further persist request will be successful for a little while. --- quickwit/quickwit-common/Cargo.toml | 3 +- .../src/tower/circuit_breaker.rs | 372 ++++++++++++++++++ quickwit/quickwit-common/src/tower/mod.rs | 2 + quickwit/quickwit-serve/src/lib.rs | 42 +- quickwit/quickwit-serve/src/metrics.rs | 19 +- 5 files changed, 429 insertions(+), 9 deletions(-) create mode 100644 quickwit/quickwit-common/src/tower/circuit_breaker.rs diff --git a/quickwit/quickwit-common/Cargo.toml b/quickwit/quickwit-common/Cargo.toml index 675522c7d9f..83170a8ec56 100644 --- a/quickwit/quickwit-common/Cargo.toml +++ b/quickwit/quickwit-common/Cargo.toml @@ -37,7 +37,7 @@ siphasher = { workspace = true } tempfile = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } -tokio-metrics ={ workspace = true } +tokio-metrics = { workspace = true } tokio-stream = { workspace = true } tonic = { workspace = true } tower = { workspace = true } @@ -51,3 +51,4 @@ named_tasks = ["tokio/tracing"] serde_json = { workspace = true } tempfile = { workspace = true } proptest = { workspace = true } +tokio = { workspace = true, features = ["test-util"] } diff --git a/quickwit/quickwit-common/src/tower/circuit_breaker.rs b/quickwit/quickwit-common/src/tower/circuit_breaker.rs new file mode 100644 index 00000000000..ae80516ae83 --- /dev/null +++ b/quickwit/quickwit-common/src/tower/circuit_breaker.rs @@ -0,0 +1,372 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::future::Future; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll}; +use std::time::Duration; + +use pin_project::pin_project; +use prometheus::IntCounter; +use tokio::time::Instant; +use tower::{Layer, Service}; + +/// The circuit breaker layer implements the [circuit breaker pattern](https://martinfowler.com/bliki/CircuitBreaker.html). +/// +/// It counts the errors emitted by the inner service, and if the number of errors exceeds a certain +/// threshold within a certain time window, it will "open" the circuit. +/// +/// Requests will then be rejected for a given timeout. +/// After this timeout, the circuit breaker ends up in a HalfOpen state. It will allow a single +/// request to pass through. Depending on the result of this request, the circuit breaker will +/// either close the circuit again or open it again. +/// +/// Implementation detail: +/// +/// A circuit breaker needs to have some logic to estimate the chances for the next request +/// to fail. In this implementation, we use a simple heuristic that does not take in account +/// successes. We simply count the number or errors which happened in the last window. +/// +/// The circuit breaker does not attempt to measure accurately the error rate. +/// Instead, it counts errors, and check for the time window in which these errors occurred. +/// This approach is accurate enough, robust, very easy to code and avoids calling the +/// `Instant::now()` at every error in the open state. +#[derive(Debug, Clone)] +pub struct CircuitBreakerLayer { + max_error_count_per_time_window: u32, + time_window: Duration, + timeout: Duration, + evaluator: Evaluator, + circuit_break_total: prometheus::IntCounter, +} + +pub trait CircuitBreakerEvaluator: Clone { + type Response; + type Error; + fn is_circuit_breaker_error(&self, output: &Result) -> bool; + fn make_circuit_breaker_output(&self) -> Self::Error; + fn make_layer( + self, + max_num_errors_per_secs: u32, + timeout: Duration, + circuit_break_total: prometheus::IntCounter, + ) -> CircuitBreakerLayer { + CircuitBreakerLayer { + max_error_count_per_time_window: max_num_errors_per_secs, + time_window: Duration::from_secs(1), + timeout, + evaluator: self, + circuit_break_total, + } + } +} + +impl Layer for CircuitBreakerLayer { + type Service = CircuitBreaker; + + fn layer(&self, service: S) -> CircuitBreaker { + let time_window = Duration::from_millis(self.time_window.as_millis() as u64); + let timeout = Duration::from_millis(self.timeout.as_millis() as u64); + CircuitBreaker { + underlying: service, + circuit_breaker_inner: Arc::new(Mutex::new(CircuitBreakerInner { + max_error_count_per_time_window: self.max_error_count_per_time_window, + time_window, + timeout, + state: CircuitBreakerState::Closed(ClosedState { + error_counter: 0u32, + error_window_end: Instant::now() + time_window, + }), + evaluator: self.evaluator.clone(), + circuit_break_total: self.circuit_break_total.clone(), + })), + } + } +} + +struct CircuitBreakerInner { + max_error_count_per_time_window: u32, + time_window: Duration, + timeout: Duration, + evaluator: Evaluator, + state: CircuitBreakerState, + circuit_break_total: IntCounter, +} + +impl CircuitBreakerInner { + fn get_state(&mut self) -> CircuitBreakerState { + let new_state = match self.state { + CircuitBreakerState::Open { until } => { + let now = Instant::now(); + if now < until { + CircuitBreakerState::Open { until } + } else { + CircuitBreakerState::HalfOpen + } + } + other => other, + }; + self.state = new_state; + new_state + } + + fn receive_error(&mut self) { + match self.state { + CircuitBreakerState::HalfOpen => { + self.circuit_break_total.inc(); + self.state = CircuitBreakerState::Open { + until: Instant::now() + self.timeout, + } + } + CircuitBreakerState::Open { .. } => {} + CircuitBreakerState::Closed(ClosedState { + error_counter, + error_window_end, + }) => { + if error_counter < self.max_error_count_per_time_window { + self.state = CircuitBreakerState::Closed(ClosedState { + error_counter: error_counter + 1, + error_window_end, + }); + return; + } + let now = Instant::now(); + if now < error_window_end { + self.circuit_break_total.inc(); + self.state = CircuitBreakerState::Open { + until: now + self.timeout, + }; + } else { + self.state = CircuitBreakerState::Closed(ClosedState { + error_counter: 0u32, + error_window_end: now + self.time_window, + }); + } + } + } + } + + fn receive_success(&mut self) { + match self.state { + CircuitBreakerState::HalfOpen | CircuitBreakerState::Open { .. } => { + self.state = CircuitBreakerState::Closed(ClosedState { + error_counter: 0u32, + error_window_end: Instant::now() + self.time_window, + }); + } + CircuitBreakerState::Closed { .. } => { + // We could actually take that as a signal. + } + } + } +} + +#[derive(Clone)] +pub struct CircuitBreaker { + underlying: S, + circuit_breaker_inner: Arc>>, +} + +impl std::fmt::Debug for CircuitBreaker { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("CircuitBreaker").finish() + } +} + +#[derive(Debug, Clone, Copy)] +enum CircuitBreakerState { + Open { until: Instant }, + HalfOpen, + Closed(ClosedState), +} + +#[derive(Debug, Clone, Copy)] +struct ClosedState { + error_counter: u32, + error_window_end: Instant, +} + +impl Service for CircuitBreaker +where + S: Service, + Evaluator: CircuitBreakerEvaluator, +{ + type Response = S::Response; + type Error = S::Error; + type Future = CircuitBreakerFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + let mut inner = self.circuit_breaker_inner.lock().unwrap(); + let state = inner.get_state(); + match state { + CircuitBreakerState::Closed { .. } | CircuitBreakerState::HalfOpen => { + self.underlying.poll_ready(cx) + } + CircuitBreakerState::Open { .. } => { + let circuit_break_error = inner.evaluator.make_circuit_breaker_output(); + Poll::Ready(Err(circuit_break_error)) + } + } + } + + fn call(&mut self, request: R) -> Self::Future { + CircuitBreakerFuture { + underlying_fut: self.underlying.call(request), + circuit_breaker_inner: self.circuit_breaker_inner.clone(), + } + } +} + +#[pin_project] +pub struct CircuitBreakerFuture { + #[pin] + underlying_fut: F, + circuit_breaker_inner: Arc>>, +} + +impl Future for CircuitBreakerFuture +where + F: Future>, + Evaluator: CircuitBreakerEvaluator, +{ + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let circuit_breaker_inner = self.circuit_breaker_inner.clone(); + let poll_res = self.project().underlying_fut.poll(cx); + match poll_res { + Poll::Pending => Poll::Pending, + Poll::Ready(result) => { + let mut circuit_breaker_inner_lock = circuit_breaker_inner.lock().unwrap(); + let is_circuit_breaker_error = circuit_breaker_inner_lock + .evaluator + .is_circuit_breaker_error(&result); + if is_circuit_breaker_error { + circuit_breaker_inner_lock.receive_error(); + } else { + circuit_breaker_inner_lock.receive_success(); + } + Poll::Ready(result) + } + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::atomic::{AtomicBool, Ordering}; + + use tower::{ServiceBuilder, ServiceExt}; + + use super::*; + + #[derive(Debug)] + enum TestError { + CircuitBreak, + ServiceError, + } + + #[derive(Debug, Clone, Copy)] + struct TestCircuitBreakerEvaluator; + + impl CircuitBreakerEvaluator for TestCircuitBreakerEvaluator { + type Response = (); + type Error = TestError; + + fn is_circuit_breaker_error(&self, output: &Result) -> bool { + output.is_err() + } + + fn make_circuit_breaker_output(&self) -> TestError { + TestError::CircuitBreak + } + } + + #[tokio::test] + async fn test_circuit_breaker() { + tokio::time::pause(); + let test_switch: Arc = Arc::new(AtomicBool::new(true)); + + const TIMEOUT: Duration = Duration::from_millis(500); + + let int_counter: prometheus::IntCounter = + IntCounter::new("circuit_break_total_test", "test circuit breaker counter").unwrap(); + let mut service = ServiceBuilder::new() + .layer(TestCircuitBreakerEvaluator.make_layer(10, TIMEOUT, int_counter)) + .service_fn(|_| async { + if test_switch.load(Ordering::Relaxed) { + Ok(()) + } else { + Err(TestError::ServiceError) + } + }); + + service.ready().await.unwrap().call(()).await.unwrap(); + + for _ in 0..1_000 { + service.ready().await.unwrap().call(()).await.unwrap(); + } + + test_switch.store(false, Ordering::Relaxed); + + let mut service_error_count = 0; + let mut circuit_break_count = 0; + for _ in 0..1_000 { + match service.ready().await { + Ok(service) => { + service.call(()).await.unwrap_err(); + service_error_count += 1; + } + Err(_circuit_breaker_error) => { + circuit_break_count += 1; + } + } + } + + assert_eq!(service_error_count + circuit_break_count, 1_000); + assert_eq!(service_error_count, 11); + + tokio::time::advance(TIMEOUT).await; + + // The test request at half open fails. + for _ in 0..1_000 { + match service.ready().await { + Ok(service) => { + service.call(()).await.unwrap_err(); + service_error_count += 1; + } + Err(_circuit_breaker_error) => { + circuit_break_count += 1; + } + } + } + + assert_eq!(service_error_count + circuit_break_count, 2_000); + assert_eq!(service_error_count, 12); + + test_switch.store(true, Ordering::Relaxed); + tokio::time::advance(TIMEOUT).await; + + // The test request at half open succeeds. + for _ in 0..1_000 { + service.ready().await.unwrap().call(()).await.unwrap(); + } + } +} diff --git a/quickwit/quickwit-common/src/tower/mod.rs b/quickwit/quickwit-common/src/tower/mod.rs index a39f28ba0fe..0d761e0bcfc 100644 --- a/quickwit/quickwit-common/src/tower/mod.rs +++ b/quickwit/quickwit-common/src/tower/mod.rs @@ -21,6 +21,7 @@ mod box_layer; mod box_service; mod buffer; mod change; +mod circuit_breaker; mod delay; mod estimate_rate; mod event_listener; @@ -41,6 +42,7 @@ pub use box_layer::BoxLayer; pub use box_service::BoxService; pub use buffer::{Buffer, BufferError, BufferLayer}; pub use change::Change; +pub use circuit_breaker::{CircuitBreaker, CircuitBreakerEvaluator, CircuitBreakerLayer}; pub use delay::{Delay, DelayLayer}; pub use estimate_rate::{EstimateRate, EstimateRateLayer}; pub use event_listener::{EventListener, EventListenerLayer}; diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index aa88416ef36..47f04c022b6 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -71,9 +71,9 @@ use quickwit_common::rate_limiter::RateLimiterSettings; use quickwit_common::retry::RetryParams; use quickwit_common::runtimes::RuntimesConfig; use quickwit_common::tower::{ - BalanceChannel, BoxFutureInfaillible, BufferLayer, Change, ConstantRate, EstimateRateLayer, - EventListenerLayer, GrpcMetricsLayer, LoadShedLayer, RateLimitLayer, RetryLayer, RetryPolicy, - SmaRateEstimator, + BalanceChannel, BoxFutureInfaillible, BufferLayer, Change, CircuitBreakerEvaluator, + ConstantRate, EstimateRateLayer, EventListenerLayer, GrpcMetricsLayer, LoadShedLayer, + RateLimitLayer, RetryLayer, RetryPolicy, SmaRateEstimator, }; use quickwit_common::uri::Uri; use quickwit_common::{get_bool_from_env, spawn_named_task}; @@ -100,8 +100,10 @@ use quickwit_proto::control_plane::ControlPlaneServiceClient; use quickwit_proto::indexing::{IndexingServiceClient, ShardPositionsUpdate}; use quickwit_proto::ingest::ingester::{ IngesterService, IngesterServiceClient, IngesterServiceTowerLayerStack, IngesterStatus, + PersistFailureReason, PersistResponse, }; use quickwit_proto::ingest::router::IngestRouterServiceClient; +use quickwit_proto::ingest::IngestV2Error; use quickwit_proto::metastore::{ EntityKind, ListIndexesMetadataRequest, MetastoreError, MetastoreService, MetastoreServiceClient, @@ -786,6 +788,32 @@ pub async fn serve_quickwit( Ok(actor_exit_statuses) } +#[derive(Clone, Copy)] +struct PersistCircuitBreakerEvaluator; + +impl CircuitBreakerEvaluator for PersistCircuitBreakerEvaluator { + type Response = PersistResponse; + + type Error = IngestV2Error; + + fn is_circuit_breaker_error(&self, output: &Result) -> bool { + let Ok(persist_response) = output.as_ref() else { + return false; + }; + for persist_failure in &persist_response.failures { + // This is the error we return when the WAL is full. + if persist_failure.reason() == PersistFailureReason::ResourceExhausted { + return true; + } + } + false + } + + fn make_circuit_breaker_output(&self) -> IngestV2Error { + IngestV2Error::TooManyRequests + } +} + /// Stack of layers to use on the server side of the ingester service. fn ingester_service_layer_stack( layer_stack: IngesterServiceTowerLayerStack, @@ -793,6 +821,14 @@ fn ingester_service_layer_stack( layer_stack .stack_layer(INGEST_GRPC_SERVER_METRICS_LAYER.clone()) .stack_persist_layer(quickwit_common::tower::OneTaskPerCallLayer) + .stack_persist_layer( + // "3" may seem a little bit low, but we only consider error caused by a full WAL. + PersistCircuitBreakerEvaluator.make_layer( + 3, + Duration::from_millis(500), + crate::metrics::SERVE_METRICS.circuit_break_total.clone(), + ), + ) .stack_open_replication_stream_layer(quickwit_common::tower::OneTaskPerCallLayer) .stack_init_shards_layer(quickwit_common::tower::OneTaskPerCallLayer) .stack_retain_shards_layer(quickwit_common::tower::OneTaskPerCallLayer) diff --git a/quickwit/quickwit-serve/src/metrics.rs b/quickwit/quickwit-serve/src/metrics.rs index 7a2365a761f..7df17b09fb1 100644 --- a/quickwit/quickwit-serve/src/metrics.rs +++ b/quickwit/quickwit-serve/src/metrics.rs @@ -19,19 +19,27 @@ use once_cell::sync::Lazy; use quickwit_common::metrics::{ - new_counter_vec, new_gauge_vec, new_histogram_vec, HistogramVec, IntCounterVec, IntGaugeVec, + new_counter, new_counter_vec, new_gauge_vec, new_histogram_vec, HistogramVec, IntCounter, + IntCounterVec, IntGaugeVec, }; -pub struct RestMetrics { +pub struct ServeMetrics { pub http_requests_total: IntCounterVec<2>, pub request_duration_secs: HistogramVec<2>, pub ongoing_requests: IntGaugeVec<1>, pub pending_requests: IntGaugeVec<1>, + pub circuit_break_total: IntCounter, } -impl Default for RestMetrics { +impl Default for ServeMetrics { fn default() -> Self { - RestMetrics { + let circuit_break_total = new_counter( + "circuit_break_total", + "Circuit breaker counter", + "grpc", + &[], + ); + ServeMetrics { http_requests_total: new_counter_vec( "http_requests_total", "Total number of HTTP requests processed.", @@ -61,9 +69,10 @@ impl Default for RestMetrics { &[], ["endpoint_group"], ), + circuit_break_total, } } } /// Serve counters exposes a bunch a set of metrics about the request received to quickwit. -pub static SERVE_METRICS: Lazy = Lazy::new(RestMetrics::default); +pub static SERVE_METRICS: Lazy = Lazy::new(ServeMetrics::default);