From c55261bfd672c0e5a0a1c2528df08cb2832200f8 Mon Sep 17 00:00:00 2001 From: Adrien Guillo Date: Thu, 9 Nov 2023 10:41:57 -0500 Subject: [PATCH] Rate limit ingestion per shard to 5 MB/s --- quickwit/quickwit-common/src/tower/rate.rs | 5 + .../quickwit-ingest/src/ingest_v2/fetch.rs | 3 + .../quickwit-ingest/src/ingest_v2/ingester.rs | 129 ++++++++++++- quickwit/quickwit-ingest/src/ingest_v2/mod.rs | 2 + .../src/ingest_v2/rate_limiter.rs | 176 ++++++++++++++++++ .../src/ingest_v2/replication.rs | 3 + quickwit/quickwit-serve/src/lib.rs | 3 +- 7 files changed, 315 insertions(+), 6 deletions(-) create mode 100644 quickwit/quickwit-ingest/src/ingest_v2/rate_limiter.rs diff --git a/quickwit/quickwit-common/src/tower/rate.rs b/quickwit/quickwit-common/src/tower/rate.rs index a46a50293a5..58238f2254b 100644 --- a/quickwit/quickwit-common/src/tower/rate.rs +++ b/quickwit/quickwit-common/src/tower/rate.rs @@ -53,6 +53,11 @@ impl ConstantRate { let work = bytes.as_u64(); Self::new(work, period) } + + pub fn bytes_per_sec(bytes: ByteSize) -> Self { + let work = bytes.as_u64(); + Self::new(work, Duration::from_secs(1)) + } } impl Rate for ConstantRate { diff --git a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs index 919a12498c6..7016be1259d 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs @@ -626,6 +626,7 @@ mod tests { let state = Arc::new(RwLock::new(IngesterState { mrecordlog, shards: HashMap::new(), + rate_limiters: HashMap::new(), replication_streams: HashMap::new(), replication_tasks: HashMap::new(), })); @@ -794,6 +795,7 @@ mod tests { let state = Arc::new(RwLock::new(IngesterState { mrecordlog, shards: HashMap::new(), + rate_limiters: HashMap::new(), replication_streams: HashMap::new(), replication_tasks: HashMap::new(), })); @@ -860,6 +862,7 @@ mod tests { let state = Arc::new(RwLock::new(IngesterState { mrecordlog, shards: HashMap::new(), + rate_limiters: HashMap::new(), replication_streams: HashMap::new(), replication_tasks: HashMap::new(), })); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index e70737166fd..292ec92f12d 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -49,6 +49,7 @@ use tracing::{error, info, warn}; use super::fetch::FetchTask; use super::models::{IngesterShard, PrimaryShard}; use super::mrecordlog_utils::{append_eof_record_if_necessary, check_enough_capacity}; +use super::rate_limiter::{RateLimiter, RateLimiterSettings}; use super::replication::{ ReplicationStreamTask, ReplicationStreamTaskHandle, ReplicationTask, ReplicationTaskHandle, SYN_REPLICATION_STREAM_CAPACITY, @@ -73,6 +74,7 @@ pub struct Ingester { state: Arc>, disk_capacity: ByteSize, memory_capacity: ByteSize, + rate_limiter: RateLimiter, replication_factor: usize, } @@ -87,6 +89,7 @@ impl fmt::Debug for Ingester { pub(super) struct IngesterState { pub mrecordlog: MultiRecordLog, pub shards: HashMap, + pub rate_limiters: HashMap, // Replication stream opened with followers. pub replication_streams: HashMap, // Replication tasks running for each replication stream opened with leaders. @@ -100,6 +103,7 @@ impl Ingester { wal_dir_path: &Path, disk_capacity: ByteSize, memory_capacity: ByteSize, + rate_limiter_settings: RateLimiterSettings, replication_factor: usize, ) -> IngestV2Result { let mrecordlog = MultiRecordLog::open_with_prefs( @@ -112,15 +116,19 @@ impl Ingester { let inner = IngesterState { mrecordlog, shards: HashMap::new(), + rate_limiters: HashMap::new(), replication_streams: HashMap::new(), replication_tasks: HashMap::new(), }; + let rate_limiter = RateLimiter::from_settings(rate_limiter_settings); + let mut ingester = Self { self_node_id, ingester_pool, state: Arc::new(RwLock::new(inner)), disk_capacity, memory_capacity, + rate_limiter, replication_factor, }; info!( @@ -152,7 +160,7 @@ impl Ingester { let solo_shard = SoloShard::new(ShardState::Closed, Position::Eof); let shard = IngesterShard::Solo(solo_shard); - state_guard.shards.insert(queue_id, shard); + state_guard.shards.insert(queue_id.clone(), shard); } Ok(()) } @@ -178,6 +186,10 @@ impl Ingester { }); } }; + let mut rate_limiter = self.rate_limiter.clone(); + rate_limiter.reset(); + state.rate_limiters.insert(queue_id.clone(), rate_limiter); + let shard = if let Some(follower_id) = follower_id_opt { self.init_replication_stream(state, leader_id, follower_id) .await?; @@ -316,8 +328,10 @@ impl IngesterService for Ingester { self.memory_capacity, requested_capacity, ) { - warn!("failed to persist records: {error}"); - + warn!( + "failed to persist records to ingester `{}`: {error}", + self.self_node_id + ); let persist_failure = PersistFailure { subrequest_id: subrequest.subrequest_id, index_uid: subrequest.index_uid, @@ -328,6 +342,24 @@ impl IngesterService for Ingester { persist_failures.push(persist_failure); continue; } + let rate_limiter = state_guard + .rate_limiters + .get_mut(&queue_id) + .expect("rate limiter should be initialized"); + + if !rate_limiter.acquire(requested_capacity) { + warn!("failed to persist records to shard `{queue_id}`: rate limited"); + + let persist_failure = PersistFailure { + subrequest_id: subrequest.subrequest_id, + index_uid: subrequest.index_uid, + source_id: subrequest.source_id, + shard_id: subrequest.shard_id, + reason: PersistFailureReason::RateLimited as i32, + }; + persist_failures.push(persist_failure); + continue; + } let current_position_inclusive: Position = if force_commit { let encoded_mrecords = doc_batch .docs() @@ -602,6 +634,7 @@ mod tests { use std::net::SocketAddr; use bytes::Bytes; + use quickwit_common::tower::ConstantRate; use quickwit_proto::ingest::ingester::{ IngesterServiceGrpcServer, IngesterServiceGrpcServerAdapter, PersistSubrequest, TruncateSubrequest, @@ -621,6 +654,7 @@ mod tests { let wal_dir_path = tempdir.path(); let disk_capacity = ByteSize::mb(256); let memory_capacity = ByteSize::mb(1); + let rate_limiter_settings = RateLimiterSettings::default(); let replication_factor = 2; let mut ingester = Ingester::try_new( self_node_id.clone(), @@ -628,6 +662,7 @@ mod tests { wal_dir_path, disk_capacity, memory_capacity, + rate_limiter_settings, replication_factor, ) .await @@ -726,6 +761,7 @@ mod tests { let wal_dir_path = tempdir.path(); let disk_capacity = ByteSize::mb(256); let memory_capacity = ByteSize::mb(1); + let rate_limiter_settings = RateLimiterSettings::default(); let replication_factor = 1; let mut ingester = Ingester::try_new( self_node_id.clone(), @@ -733,6 +769,7 @@ mod tests { wal_dir_path, disk_capacity, memory_capacity, + rate_limiter_settings, replication_factor, ) .await @@ -825,6 +862,7 @@ mod tests { let wal_dir_path = tempdir.path(); let disk_capacity = ByteSize::mb(256); let memory_capacity = ByteSize::mb(1); + let rate_limiter_settings = RateLimiterSettings::default(); let replication_factor = 1; let mut ingester = Ingester::try_new( self_node_id.clone(), @@ -832,6 +870,7 @@ mod tests { wal_dir_path, disk_capacity, memory_capacity, + rate_limiter_settings, replication_factor, ) .await @@ -871,6 +910,7 @@ mod tests { let disk_capacity = ByteSize::mb(256); let memory_capacity = ByteSize::mb(1); + let rate_limiter_settings = RateLimiterSettings::default(); let replication_factor = 2; let mut leader = Ingester::try_new( @@ -879,6 +919,7 @@ mod tests { wal_dir_path, disk_capacity, memory_capacity, + rate_limiter_settings, replication_factor, ) .await @@ -894,6 +935,7 @@ mod tests { wal_dir_path, disk_capacity, memory_capacity, + rate_limiter_settings, replication_factor, ) .await @@ -1020,6 +1062,7 @@ mod tests { let wal_dir_path = tempdir.path(); let disk_capacity = ByteSize::mb(256); let memory_capacity = ByteSize::mb(1); + let rate_limiter_settings = RateLimiterSettings::default(); let replication_factor = 2; let mut leader = Ingester::try_new( leader_id.clone(), @@ -1027,6 +1070,7 @@ mod tests { wal_dir_path, disk_capacity, memory_capacity, + rate_limiter_settings, replication_factor, ) .await @@ -1051,6 +1095,7 @@ mod tests { let wal_dir_path = tempdir.path(); let disk_capacity = ByteSize::mb(256); let memory_capacity = ByteSize::mb(1); + let rate_limiter_settings = RateLimiterSettings::default(); let replication_factor = 2; let follower = Ingester::try_new( follower_id.clone(), @@ -1058,6 +1103,7 @@ mod tests { wal_dir_path, disk_capacity, memory_capacity, + rate_limiter_settings, replication_factor, ) .await @@ -1192,6 +1238,7 @@ mod tests { let wal_dir_path = tempdir.path(); let disk_capacity = ByteSize::mib(256); let memory_capacity = ByteSize::mib(1); + let rate_limiter_settings = RateLimiterSettings::default(); let replication_factor = 1; let mut ingester = Ingester::try_new( self_node_id.clone(), @@ -1199,6 +1246,7 @@ mod tests { wal_dir_path, disk_capacity, memory_capacity, + rate_limiter_settings, replication_factor, ) .await @@ -1218,7 +1266,7 @@ mod tests { let persist_request = PersistRequest { leader_id: self_node_id.to_string(), - commit_type: CommitTypeV2::Force as i32, + commit_type: CommitTypeV2::Auto as i32, subrequests: vec![PersistSubrequest { subrequest_id: 0, index_uid: "test-index:0".to_string(), @@ -1249,6 +1297,71 @@ mod tests { solo_shard_01.assert_replication_position(Position::Beginning); } + #[tokio::test] + async fn test_ingester_persist_rate_limited() { + let tempdir = tempfile::tempdir().unwrap(); + let self_node_id: NodeId = "test-ingester-0".into(); + let ingester_pool = IngesterPool::default(); + let wal_dir_path = tempdir.path(); + let disk_capacity = ByteSize::mib(256); + let memory_capacity = ByteSize::mib(1); + let rate_limiter_settings = RateLimiterSettings { + burst_limit: ByteSize(1), + rate_limit: ConstantRate::bytes_per_sec(ByteSize(1)), + refill_period: Duration::from_millis(100), + }; + let replication_factor = 1; + let mut ingester = Ingester::try_new( + self_node_id.clone(), + ingester_pool, + wal_dir_path, + disk_capacity, + memory_capacity, + rate_limiter_settings, + replication_factor, + ) + .await + .unwrap(); + + let persist_request = PersistRequest { + leader_id: self_node_id.to_string(), + commit_type: CommitTypeV2::Auto as i32, + subrequests: vec![PersistSubrequest { + subrequest_id: 0, + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + shard_id: 1, + follower_id: None, + doc_batch: Some(DocBatchV2::for_test(["test-doc-010"])), + }], + }; + let persist_response = ingester.persist(persist_request).await.unwrap(); + assert_eq!(persist_response.leader_id, "test-ingester-0"); + assert_eq!(persist_response.successes.len(), 0); + assert_eq!(persist_response.failures.len(), 1); + + let persist_failure = &persist_response.failures[0]; + assert_eq!(persist_failure.subrequest_id, 0); + assert_eq!(persist_failure.index_uid, "test-index:0"); + assert_eq!(persist_failure.source_id, "test-source"); + assert_eq!(persist_failure.shard_id, 1); + assert_eq!(persist_failure.reason(), PersistFailureReason::RateLimited); + + let state_guard = ingester.state.read().await; + assert_eq!(state_guard.shards.len(), 1); + + let queue_id_01 = queue_id("test-index:0", "test-source", 1); + + let solo_shard_01 = state_guard.shards.get(&queue_id_01).unwrap(); + solo_shard_01.assert_is_solo(); + solo_shard_01.assert_is_open(); + solo_shard_01.assert_replication_position(Position::Beginning); + + state_guard + .mrecordlog + .assert_records_eq(&queue_id_01, .., &[]); + } + #[tokio::test] async fn test_ingester_persist_resource_exhausted() { let tempdir = tempfile::tempdir().unwrap(); @@ -1257,6 +1370,7 @@ mod tests { let wal_dir_path = tempdir.path(); let disk_capacity = ByteSize(0); let memory_capacity = ByteSize(0); + let rate_limiter_settings = RateLimiterSettings::default(); let replication_factor = 1; let mut ingester = Ingester::try_new( self_node_id.clone(), @@ -1264,6 +1378,7 @@ mod tests { wal_dir_path, disk_capacity, memory_capacity, + rate_limiter_settings, replication_factor, ) .await @@ -1271,7 +1386,7 @@ mod tests { let persist_request = PersistRequest { leader_id: self_node_id.to_string(), - commit_type: CommitTypeV2::Force as i32, + commit_type: CommitTypeV2::Auto as i32, subrequests: vec![PersistSubrequest { subrequest_id: 0, index_uid: "test-index:0".to_string(), @@ -1318,6 +1433,7 @@ mod tests { let wal_dir_path = tempdir.path(); let disk_capacity = ByteSize::mb(256); let memory_capacity = ByteSize::mb(1); + let rate_limiter_settings = RateLimiterSettings::default(); let replication_factor = 1; let mut ingester = Ingester::try_new( self_node_id.clone(), @@ -1325,6 +1441,7 @@ mod tests { wal_dir_path, disk_capacity, memory_capacity, + rate_limiter_settings, replication_factor, ) .await @@ -1420,6 +1537,7 @@ mod tests { let wal_dir_path = tempdir.path(); let disk_capacity = ByteSize::mb(256); let memory_capacity = ByteSize::mb(1); + let rate_limiter_settings = RateLimiterSettings::default(); let replication_factor = 1; let mut ingester = Ingester::try_new( self_node_id.clone(), @@ -1427,6 +1545,7 @@ mod tests { wal_dir_path, disk_capacity, memory_capacity, + rate_limiter_settings, replication_factor, ) .await diff --git a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs index ec950ada275..345de74ca08 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs @@ -22,6 +22,7 @@ mod ingester; mod models; mod mrecord; mod mrecordlog_utils; +mod rate_limiter; mod replication; mod router; mod shard_table; @@ -39,6 +40,7 @@ pub use self::fetch::{FetchStreamError, MultiFetchStream}; pub use self::ingester::Ingester; use self::mrecord::MRECORD_HEADER_LEN; pub use self::mrecord::{decoded_mrecords, MRecord}; +pub use self::rate_limiter::RateLimiterSettings; pub use self::router::IngestRouter; pub type IngesterPool = Pool; diff --git a/quickwit/quickwit-ingest/src/ingest_v2/rate_limiter.rs b/quickwit/quickwit-ingest/src/ingest_v2/rate_limiter.rs new file mode 100644 index 00000000000..fd68ac2ca0f --- /dev/null +++ b/quickwit/quickwit-ingest/src/ingest_v2/rate_limiter.rs @@ -0,0 +1,176 @@ +// Copyright (C) 2023 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::time::{Duration, Instant}; + +use bytesize::ByteSize; +use quickwit_common::tower::{ConstantRate, Rate}; + +#[derive(Debug, Clone, Copy)] +pub struct RateLimiterSettings { + pub burst_limit: ByteSize, + pub rate_limit: ConstantRate, + pub refill_period: Duration, +} + +impl Default for RateLimiterSettings { + fn default() -> Self { + // 10 MB burst limit. + let burst_limit = ByteSize::mb(10); + // 5 MB/s rate limit. + let rate_limit = ConstantRate::bytes_per_sec(ByteSize::mb(5)); + // Refill every 100ms. + let refill_period = Duration::from_millis(100); + + Self { + burst_limit, + rate_limit, + refill_period, + } + } +} + +/// A bursty token-based rate limiter. +#[derive(Debug, Clone)] +pub(super) struct RateLimiter { + capacity: u64, + available: u64, + refill_amount: u64, + refill_period: Duration, + refill_period_micros: u64, + refill_at: Instant, +} + +impl RateLimiter { + pub fn from_settings(settings: RateLimiterSettings) -> Self { + let capacity = settings.burst_limit.as_u64(); + + let work = settings.rate_limit.work() as u128; + let refill_period = settings.refill_period; + let rate_limit_period = settings.rate_limit.period(); + let refill_amount = work * refill_period.as_nanos() / rate_limit_period.as_nanos(); + + Self { + capacity, + available: capacity, + refill_amount: refill_amount as u64, + refill_period, + refill_period_micros: refill_period.as_micros() as u64, + refill_at: Instant::now() + refill_period, + } + } + + pub fn acquire(&mut self, capacity: ByteSize) -> bool { + if self.acquire_inner(capacity.as_u64()) { + true + } else { + self.refill(Instant::now()); + self.acquire_inner(capacity.as_u64()) + } + } + + pub fn reset(&mut self) { + self.available = self.capacity; + self.refill_at = Instant::now() + self.refill_period; + } + + fn acquire_inner(&mut self, capacity: u64) -> bool { + if self.available >= capacity { + self.available -= capacity; + true + } else { + false + } + } + + fn refill(&mut self, now: Instant) { + if now < self.refill_at { + return; + } + let elapsed = (now - self.refill_at).as_micros() as u64; + // More than one refill period may have elapsed so we need to take that into account. + let refill = self.refill_amount + self.refill_amount * elapsed / self.refill_period_micros; + self.available = std::cmp::min(self.available + refill, self.capacity); + self.refill_at = now + self.refill_period; + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_rate_limiter() { + let settings = RateLimiterSettings { + burst_limit: ByteSize::mb(2), + rate_limit: ConstantRate::bytes_per_sec(ByteSize::mb(1)), + refill_period: Duration::from_millis(100), + }; + let mut rate_limiter = RateLimiter::from_settings(settings); + assert_eq!(rate_limiter.capacity, ByteSize::mb(2).as_u64()); + assert_eq!(rate_limiter.available, ByteSize::mb(2).as_u64()); + assert_eq!(rate_limiter.refill_amount, ByteSize::kb(100).as_u64()); + assert_eq!(rate_limiter.refill_period, Duration::from_millis(100)); + + assert!(rate_limiter.acquire(ByteSize::mb(1))); + assert!(rate_limiter.acquire(ByteSize::mb(1))); + assert!(!rate_limiter.acquire(ByteSize::kb(1))); + + std::thread::sleep(Duration::from_millis(100)); + + assert!(rate_limiter.acquire(ByteSize::kb(100))); + assert!(!rate_limiter.acquire(ByteSize::kb(20))); + + std::thread::sleep(Duration::from_millis(250)); + + assert!(rate_limiter.acquire(ByteSize::kb(125))); + assert!(rate_limiter.acquire(ByteSize::kb(125))); + assert!(!rate_limiter.acquire(ByteSize::kb(20))); + } + + #[test] + fn test_rate_limiter_refill() { + let settings = RateLimiterSettings { + burst_limit: ByteSize::mb(2), + rate_limit: ConstantRate::bytes_per_sec(ByteSize::mb(1)), + refill_period: Duration::from_millis(100), + }; + let mut rate_limiter = RateLimiter::from_settings(settings); + + rate_limiter.available = 0; + let now = Instant::now(); + rate_limiter.refill(now); + assert_eq!(rate_limiter.available, 0); + + rate_limiter.available = 0; + let now = now + Duration::from_millis(100); + rate_limiter.refill(now); + assert_eq!(rate_limiter.available, ByteSize::kb(100).as_u64()); + + rate_limiter.available = 0; + let now = now + Duration::from_millis(110); + rate_limiter.refill(now); + assert_eq!(rate_limiter.available, ByteSize::kb(110).as_u64()); + + rate_limiter.available = 0; + let now = now + Duration::from_millis(210); + rate_limiter.refill(now); + assert_eq!(rate_limiter.available, ByteSize::kb(210).as_u64()); + } +} diff --git a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs index f2e124ade40..57cd36dc3ec 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs @@ -676,6 +676,7 @@ mod tests { let state = Arc::new(RwLock::new(IngesterState { mrecordlog, shards: HashMap::new(), + rate_limiters: HashMap::new(), replication_streams: HashMap::new(), replication_tasks: HashMap::new(), })); @@ -839,6 +840,7 @@ mod tests { let state = Arc::new(RwLock::new(IngesterState { mrecordlog, shards: HashMap::new(), + rate_limiters: HashMap::new(), replication_streams: HashMap::new(), replication_tasks: HashMap::new(), })); @@ -919,6 +921,7 @@ mod tests { let state = Arc::new(RwLock::new(IngesterState { mrecordlog, shards: HashMap::new(), + rate_limiters: HashMap::new(), replication_streams: HashMap::new(), replication_tasks: HashMap::new(), })); diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 310aa7ae85d..e700fe05ca5 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -69,7 +69,7 @@ use quickwit_indexing::actors::IndexingService; use quickwit_indexing::start_indexing_service; use quickwit_ingest::{ start_ingest_api_service, GetMemoryCapacity, IngestApiService, IngestRequest, IngestRouter, - IngestServiceClient, Ingester, IngesterPool, + IngestServiceClient, Ingester, IngesterPool, RateLimiterSettings, }; use quickwit_janitor::{start_janitor_service, JanitorService}; use quickwit_metastore::{ @@ -574,6 +574,7 @@ async fn setup_ingest_v2( &wal_dir_path, config.ingest_api_config.max_queue_disk_usage, config.ingest_api_config.max_queue_memory_usage, + RateLimiterSettings::default(), replication_factor, ) .await?;