Skip to content

Commit

Permalink
Implement shard auto-scaling (#4208)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Nov 30, 2023
1 parent 9679d56 commit 546df3d
Show file tree
Hide file tree
Showing 22 changed files with 2,405 additions and 1,098 deletions.
2 changes: 1 addition & 1 deletion quickwit/quickwit-actors/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub use universe::Universe;
pub use self::actor_context::ActorContext;
pub use self::actor_state::ActorState;
pub use self::channel_with_priority::{QueueCapacity, RecvError, SendError, TrySendError};
pub use self::mailbox::{Inbox, Mailbox};
pub use self::mailbox::{Inbox, Mailbox, WeakMailbox};
pub use self::registry::ActorObservation;
pub use self::supervisor::{Supervisor, SupervisorMetrics, SupervisorState};

Expand Down
9 changes: 9 additions & 0 deletions quickwit/quickwit-actors/src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,15 @@ pub struct WeakMailbox<A: Actor> {
ref_count: Weak<AtomicUsize>,
}

impl<A: Actor> Clone for WeakMailbox<A> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
ref_count: self.ref_count.clone(),
}
}
}

impl<A: Actor> WeakMailbox<A> {
pub fn upgrade(&self) -> Option<Mailbox<A>> {
let inner = self.inner.upgrade()?;
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ mod path_hasher;
mod progress;
pub mod pubsub;
pub mod rand;
pub mod rate_limiter;
pub mod rendezvous_hasher;
pub mod retry;
pub mod runtimes;
pub mod shared_consts;
pub mod sorted_iter;

pub mod stream_utils;
pub mod temp_dir;
#[cfg(any(test, feature = "testsuite"))]
Expand Down
16 changes: 12 additions & 4 deletions quickwit/quickwit-common/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::time::Duration;

use async_trait::async_trait;
use tokio::sync::Mutex as TokioMutex;
use tracing::warn;

use crate::type_map::TypeMap;

Expand Down Expand Up @@ -123,10 +124,17 @@ impl EventBroker {
for subscription in typed_subscriptions.values() {
let event = event.clone();
let subscriber_clone = subscription.subscriber.clone();
tokio::spawn(tokio::time::timeout(Duration::from_secs(600), async move {
let mut subscriber_lock = subscriber_clone.lock().await;
subscriber_lock.handle_event(event).await;
}));
let handle_event_fut = async move {
if tokio::time::timeout(Duration::from_secs(1), async {
subscriber_clone.lock().await.handle_event(event).await
})
.await
.is_err()
{
warn!("`{}` event handler timed out", std::any::type_name::<E>());
}
};
tokio::spawn(handle_event_fut);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@
use std::time::{Duration, Instant};

use bytesize::ByteSize;
use quickwit_common::tower::{ConstantRate, Rate};

use crate::tower::{ConstantRate, Rate};

#[derive(Debug, Clone, Copy)]
pub struct RateLimiterSettings {
// After a long period of inactivity, the rate limiter can accumulate some "credits"
// up to what we call a `burst_limit`.
//
// Until these credits are expired, the rate limiter may exceed temporarily its rate limit.
pub burst_limit: ByteSize,
pub burst_limit: u64,
pub rate_limit: ConstantRate,
// The refill period has an effect on the resolution at which the
// rate limiting is enforced.
Expand All @@ -40,7 +41,7 @@ pub struct RateLimiterSettings {
impl Default for RateLimiterSettings {
fn default() -> Self {
// 10 MB burst limit.
let burst_limit = ByteSize::mb(10);
let burst_limit = ByteSize::mb(10).as_u64();
// 5 MB/s rate limit.
let rate_limit = ConstantRate::bytes_per_sec(ByteSize::mb(5));
// Refill every 100ms.
Expand All @@ -56,9 +57,11 @@ impl Default for RateLimiterSettings {

/// A bursty token-based rate limiter.
#[derive(Debug, Clone)]
pub(super) struct RateLimiter {
capacity: u64,
available: u64,
pub struct RateLimiter {
// Maximum number of permits that can be accumulated.
max_capacity: u64,
// Number of permits available.
available_permits: u64,
refill_amount: u64,
refill_period: Duration,
refill_period_micros: u64,
Expand All @@ -68,35 +71,48 @@ pub(super) struct RateLimiter {
impl RateLimiter {
/// Creates a new rate limiter from the given settings.
pub fn from_settings(settings: RateLimiterSettings) -> Self {
let capacity = settings.burst_limit.as_u64();

let max_capacity = settings.burst_limit;
let refill_period = settings.refill_period;
let rate_limit = settings.rate_limit.rescale(refill_period);
let now = Instant::now();

Self {
capacity,
available: capacity,
max_capacity,
available_permits: max_capacity,
refill_amount: rate_limit.work(),
refill_period,
refill_period_micros: refill_period.as_micros() as u64,
refill_at: now + refill_period,
}
}

/// Acquires some capacity from the rate limiter. Returns whether the capacity was available.
pub fn acquire(&mut self, capacity: ByteSize) -> bool {
if self.acquire_inner(capacity.as_u64()) {
/// Returns the number of permits available.
pub fn available_permits(&self) -> u64 {
self.available_permits
}

/// Acquires some permits from the rate limiter. Returns whether the permits were acquired.
pub fn acquire(&mut self, num_permits: u64) -> bool {
if self.acquire_inner(num_permits) {
true
} else {
self.refill(Instant::now());
self.acquire_inner(capacity.as_u64())
self.acquire_inner(num_permits)
}
}

fn acquire_inner(&mut self, capacity: u64) -> bool {
if self.available >= capacity {
self.available -= capacity;
pub fn acquire_bytes(&mut self, bytes: ByteSize) -> bool {
self.acquire(bytes.as_u64())
}

/// Gives back some unused permits to the rate limiter.
pub fn release(&mut self, num_permits: u64) {
self.available_permits = self.max_capacity.min(self.available_permits + num_permits);
}

fn acquire_inner(&mut self, num_permits: u64) -> bool {
if self.available_permits >= num_permits {
self.available_permits -= num_permits;
true
} else {
false
Expand All @@ -110,7 +126,7 @@ impl RateLimiter {
let elapsed = (now - self.refill_at).as_micros() as u64;
// More than one refill period may have elapsed so we need to take that into account.
let refill = self.refill_amount + self.refill_amount * elapsed / self.refill_period_micros;
self.available = std::cmp::min(self.available + refill, self.capacity);
self.available_permits = self.max_capacity.min(self.available_permits + refill);
self.refill_at = now + self.refill_period;
}
}
Expand All @@ -120,61 +136,79 @@ mod tests {
use super::*;

#[test]
fn test_rate_limiter() {
fn test_rate_limiter_acquire() {
let settings = RateLimiterSettings {
burst_limit: ByteSize::mb(2),
burst_limit: ByteSize::mb(2).as_u64(),
rate_limit: ConstantRate::bytes_per_sec(ByteSize::mb(1)),
refill_period: Duration::from_millis(100),
};
let mut rate_limiter = RateLimiter::from_settings(settings);
assert_eq!(rate_limiter.capacity, ByteSize::mb(2).as_u64());
assert_eq!(rate_limiter.available, ByteSize::mb(2).as_u64());
assert_eq!(rate_limiter.max_capacity, ByteSize::mb(2).as_u64());
assert_eq!(rate_limiter.available_permits, ByteSize::mb(2).as_u64());
assert_eq!(rate_limiter.refill_amount, ByteSize::kb(100).as_u64());
assert_eq!(rate_limiter.refill_period, Duration::from_millis(100));

assert!(rate_limiter.acquire(ByteSize::mb(1)));
assert!(rate_limiter.acquire(ByteSize::mb(1)));
assert!(!rate_limiter.acquire(ByteSize::kb(1)));
assert!(rate_limiter.acquire_bytes(ByteSize::mb(1)));
assert!(rate_limiter.acquire_bytes(ByteSize::mb(1)));
assert!(!rate_limiter.acquire_bytes(ByteSize::kb(1)));

std::thread::sleep(Duration::from_millis(100));

assert!(rate_limiter.acquire(ByteSize::kb(100)));
assert!(!rate_limiter.acquire(ByteSize::kb(20)));
assert!(rate_limiter.acquire_bytes(ByteSize::kb(100)));
assert!(!rate_limiter.acquire_bytes(ByteSize::kb(20)));

std::thread::sleep(Duration::from_millis(250));

assert!(rate_limiter.acquire(ByteSize::kb(125)));
assert!(rate_limiter.acquire(ByteSize::kb(125)));
assert!(!rate_limiter.acquire(ByteSize::kb(20)));
assert!(rate_limiter.acquire_bytes(ByteSize::kb(125)));
assert!(rate_limiter.acquire_bytes(ByteSize::kb(125)));
assert!(!rate_limiter.acquire_bytes(ByteSize::kb(20)));
}

#[test]
fn test_rate_limiter_release() {
let settings = RateLimiterSettings {
burst_limit: 1,
rate_limit: ConstantRate::bytes_per_sec(ByteSize::mb(1)),
refill_period: Duration::from_millis(100),
};
let mut rate_limiter = RateLimiter::from_settings(settings);
rate_limiter.acquire(1);
assert_eq!(rate_limiter.available_permits, 0);

rate_limiter.release(1);
assert_eq!(rate_limiter.available_permits, 1);

rate_limiter.release(1);
assert_eq!(rate_limiter.available_permits, 1);
}

#[test]
fn test_rate_limiter_refill() {
let settings = RateLimiterSettings {
burst_limit: ByteSize::mb(2),
burst_limit: ByteSize::mb(2).as_u64(),
rate_limit: ConstantRate::bytes_per_sec(ByteSize::mb(1)),
refill_period: Duration::from_millis(100),
};
let mut rate_limiter = RateLimiter::from_settings(settings);

rate_limiter.available = 0;
rate_limiter.available_permits = 0;
let now = Instant::now();
rate_limiter.refill(now);
assert_eq!(rate_limiter.available, 0);
assert_eq!(rate_limiter.available_permits, 0);

rate_limiter.available = 0;
rate_limiter.available_permits = 0;
let now = now + Duration::from_millis(100);
rate_limiter.refill(now);
assert_eq!(rate_limiter.available, ByteSize::kb(100).as_u64());
assert_eq!(rate_limiter.available_permits, ByteSize::kb(100).as_u64());

rate_limiter.available = 0;
rate_limiter.available_permits = 0;
let now = now + Duration::from_millis(110);
rate_limiter.refill(now);
assert_eq!(rate_limiter.available, ByteSize::kb(110).as_u64());
assert_eq!(rate_limiter.available_permits, ByteSize::kb(110).as_u64());

rate_limiter.available = 0;
rate_limiter.available_permits = 0;
let now = now + Duration::from_millis(210);
rate_limiter.refill(now);
assert_eq!(rate_limiter.available, ByteSize::kb(210).as_u64());
assert_eq!(rate_limiter.available_permits, ByteSize::kb(210).as_u64());
}
}
6 changes: 3 additions & 3 deletions quickwit/quickwit-common/src/tower/rate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,18 @@ impl ConstantRate {
/// # Panics
///
/// This function panics if `period` is 0.
pub fn new(work: u64, period: Duration) -> Self {
pub const fn new(work: u64, period: Duration) -> Self {
assert!(!period.is_zero());

Self { work, period }
}

pub fn bytes_per_period(bytes: ByteSize, period: Duration) -> Self {
pub const fn bytes_per_period(bytes: ByteSize, period: Duration) -> Self {
let work = bytes.as_u64();
Self::new(work, period)
}

pub fn bytes_per_sec(bytes: ByteSize) -> Self {
pub const fn bytes_per_sec(bytes: ByteSize) -> Self {
Self::bytes_per_period(bytes, Duration::from_secs(1))
}

Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-control-plane/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ documentation = "https://quickwit.io/docs/"
[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
fnv = { workspace = true }
dyn-clone = { workspace = true }
fnv = { workspace = true }
http = { workspace = true }
hyper = { workspace = true }
itertools = { workspace = true }
Expand Down
Loading

0 comments on commit 546df3d

Please sign in to comment.