Skip to content

Commit

Permalink
Bugfix in the rate estimator
Browse files Browse the repository at this point in the history
The rate estimator was returning inflated results whenever
we did not record any work during a bucket period.

The fix does two things:
- It replace the 1-bit color logic `(bucket_ord / num_buckets) % 2`` to a more generic and more accurate 8 bit hash `bucket_ord % 243`.

Collision may only happen if no write targetted this specific bucket for `num_buckets x 243 x bucket_period`.

Upon read, it checks for the hash and dismisses buckets that are not matching the hash.
  • Loading branch information
fulmicoton committed Mar 8, 2024
1 parent aa56186 commit 9235400
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 81 deletions.
222 changes: 144 additions & 78 deletions quickwit/quickwit-common/src/tower/rate_estimator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};

use super::{ConstantRate, Rate};
use super::Rate;

pub trait RateEstimator: Rate {
fn update(&mut self, started_at: Instant, ended_at: Instant, work: u64);
Expand All @@ -40,9 +40,8 @@ struct InnerSmaRateEstimator {
anchor: Instant,
buckets: Box<[Bucket]>,
bucket_period_millis: u64,
num_buckets: u64,
period: Duration,
period_millis: u64,
num_buckets: u64,
}

impl SmaRateEstimator {
Expand Down Expand Up @@ -75,22 +74,28 @@ impl SmaRateEstimator {
buckets: buckets.into_boxed_slice(),
bucket_period_millis: bucket_period.as_millis() as u64,
num_buckets: num_buckets.get() as u64,
period,
period_millis: period.as_millis() as u64,
};
Self {
inner: Arc::new(inner),
}
}

/// Seeds the rate estimator with an initial rate.
pub fn with_initial_rate(self, initial_rate: ConstantRate) -> Self {
let initial_work = initial_rate.work() * self.inner.bucket_period_millis
/ initial_rate.period().as_millis() as u64;
for bucket_ord in 0..self.inner.num_buckets {
self.inner.buckets[bucket_ord as usize].increment_work(initial_work, 0);
}
self
fn work_in_bucket(&self, bucket_ord: u64) -> u64 {
self.inner.buckets[bucket_ord as usize % self.inner.buckets.len()]
.work_for_bucket(bucket_ord)
}

fn work_at(&self, now: Instant) -> u64 {
let elapsed_ms: u64 = now.duration_since(self.inner.anchor).as_millis() as u64;
let current_bucket_ord = elapsed_ms / self.inner.bucket_period_millis;
let num_buckets = (self.inner.num_buckets - 1) as u64;
let bucket_range = current_bucket_ord.saturating_sub(num_buckets)..current_bucket_ord;
let cumulative_work: u64 = bucket_range
.map(|bucket_ord| self.work_in_bucket(bucket_ord))
.sum();
(cumulative_work * self.inner.period_millis)
/ (self.inner.bucket_period_millis * num_buckets)
}
}

Expand All @@ -100,77 +105,103 @@ impl Rate for SmaRateEstimator {
/// This estimation is computed by summing the amount of work performed tracked in the previous
/// `n-1` buckets and dividing it by the duration of the `n-1` periods.
fn work(&self) -> u64 {
let now = Instant::now();
let elapsed = now.duration_since(self.inner.anchor).as_millis() as u64;
let current_bucket_ord = elapsed / self.inner.bucket_period_millis;
let current_bucket_idx = (current_bucket_ord % self.inner.num_buckets) as usize;
let cumulative_work: u64 = self
.inner
.buckets
.iter()
.enumerate()
.filter(|(bucket_idx, _)| *bucket_idx != current_bucket_idx)
.map(|(_, bucket)| bucket.work())
.sum();
let num_bucket = self.inner.num_buckets - 1;
(cumulative_work * self.inner.period_millis)
/ (self.inner.bucket_period_millis * num_bucket)
self.work_at(Instant::now())
}

fn period(&self) -> Duration {
self.inner.period
Duration::from_millis(self.inner.period_millis)
}
}

#[inline]
fn compute_bucket_ord_hash(bucket_ord: u64) -> u8 {
// We pick 241 because it is the highest prime number below 256
// that can be computed easily.
//
// The fact that it is prime makes it so that it is complemented by the
// bucket id for any value of num_buckets (well except multiples of 241)
// thanks to the chinese theorem.
(bucket_ord % 241) as u8
}

impl RateEstimator for SmaRateEstimator {
fn update(&mut self, _started_at: Instant, ended_at: Instant, work: u64) {
let elapsed = ended_at.duration_since(self.inner.anchor).as_millis() as u64;
let num_buckets = self.inner.num_buckets;
let bucket_ord = elapsed / self.inner.bucket_period_millis;
let bucket_idx = bucket_ord % num_buckets;
let bucket_color = ((bucket_ord / num_buckets) & 1) << 63;
let bucket = &self.inner.buckets[bucket_idx as usize];
bucket.increment_work(work, bucket_color);
let bucket = &self.inner.buckets[(bucket_ord % num_buckets) as usize];
bucket.increment_work(work, bucket_ord);
}
}

/// Rate estimator bucket. The 63 least significant bits of the atomic integer store the amount of
/// work, while the most significant bit is used to indicate whether the bucket needs to be reset.
/// The reset bit is also called the "color" of a bucket in an attempt to make the code more
/// readable. After each complete pass over the buckets, the color is flipped. The color `0`
/// corresponds to the even passes, while the color `1` corresponds to the odd passes.
/// Rate estimator bucket. The 56 least significant bits of the atomic integer store the amount of
/// work, while the most significant 8 bits are encoding a well-thought hash of the bucket ord.
///
/// The hash is used to ensure that we know exactly when to reset the bucket's work.
#[derive(Debug, Default)]
struct Bucket {
// This atomic is actually encoding two things:
// - low bits [0..56): the amount of work recorded in the bucket.
// - high bits [56..64): the bucket ord, or rather its last 8 bits.
bits: AtomicU64,
}

impl Bucket {
const COLOR_MASK: u64 = 1 << 63;
const WORK_MASK: u64 = (1u64 << 56) - 1;

const WORK_MASK: u64 = u64::MAX - Self::COLOR_MASK;
struct BucketVal {
work: u64,
bucket_ord_hash: u8,
}

fn work(&self) -> u64 {
self.bits.load(Ordering::Relaxed) & Self::WORK_MASK
impl From<u64> for BucketVal {
#[inline]
fn from(bucket_bits: u64) -> BucketVal {
BucketVal {
work: bucket_bits & WORK_MASK,
bucket_ord_hash: (bucket_bits >> 56) as u8,
}
}
}

fn increment_work(&self, work: u64, expected_bucket_color: u64) {
impl From<BucketVal> for u64 {
#[inline]
fn from(value: BucketVal) -> Self {
(value.bucket_ord_hash as u64) << 56 | value.work
}
}

impl Bucket {
fn work_for_bucket(&self, bucket_ord: u64) -> u64 {
let bucket_val = BucketVal::from(self.bits.load(Ordering::Relaxed));
if bucket_val.bucket_ord_hash == compute_bucket_ord_hash(bucket_ord) {
bucket_val.work
} else {
0
}
}

fn increment_work(&self, work: u64, bucket_ord: u64) {
let expected_bucket_ord_hash: u8 = compute_bucket_ord_hash(bucket_ord);
let current_bits = self.bits.fetch_add(work, Ordering::Relaxed) + work;
let current_bucket_color = current_bits & Self::COLOR_MASK;
let bucket_val = BucketVal::from(current_bits);

// If the current bucket color is not the expected one, we need to flip its color and reset
// the amount of work.
if current_bucket_color != expected_bucket_color {
// This is not the bucket we targetted, we need to retry and update the bucket with the new
// bucket_ord and a resetted value.
if bucket_val.bucket_ord_hash != expected_bucket_ord_hash {
let mut expected_bits = current_bits;
let new_color = !current_bits & Self::COLOR_MASK;
let new_bits = new_color | work;
let new_bits: u64 = BucketVal {
work,
bucket_ord_hash: expected_bucket_ord_hash,
}
.into();

while let Err(current_bits) = self.bits.compare_exchange(
expected_bits,
new_bits,
Ordering::AcqRel,
Ordering::Acquire,
) {
if current_bits & Self::COLOR_MASK == new_color {
if BucketVal::from(current_bits).bucket_ord_hash == expected_bucket_ord_hash {
// Some thread managed to successfully flip the color. We're good.
self.bits.fetch_add(work, Ordering::Relaxed);
break;
Expand All @@ -192,32 +223,31 @@ mod tests {

#[test]
fn test_bucket() {
const RED: u64 = 0 << 63;
const BLACK: u64 = 1 << 63;

let bucket = Bucket::default();
assert_eq!(bucket.work(), 0);
assert_eq!(bucket.work_for_bucket(0u64), 0);

// First pass, the bucket is red.
bucket.increment_work(1, RED);
assert_eq!(bucket.work(), 1);
bucket.increment_work(1, 0u64);
assert_eq!(bucket.work_for_bucket(0u64), 1);
assert_eq!(bucket.work_for_bucket(1u64), 0);

bucket.increment_work(2, RED);
assert_eq!(bucket.work(), 3);
bucket.increment_work(2, 0u64);
assert_eq!(bucket.work_for_bucket(0u64), 3);

// Second pass, the bucket is now black.
bucket.increment_work(5, BLACK);
assert_eq!(bucket.work(), 5);
bucket.increment_work(5, 1u64);
assert_eq!(bucket.work_for_bucket(1u64), 5);
assert_eq!(bucket.work_for_bucket(0u64), 0);

bucket.increment_work(7, BLACK);
assert_eq!(bucket.work(), 12);
bucket.increment_work(7, 1u64);
assert_eq!(bucket.work_for_bucket(1u64), 12);

// Third pass, the bucket is red again.
bucket.increment_work(9, RED);
assert_eq!(bucket.work(), 9);
bucket.increment_work(9, 2u64);
assert_eq!(bucket.work_for_bucket(2u64), 9);

bucket.increment_work(11, RED);
assert_eq!(bucket.work(), 20);
bucket.increment_work(11, 2u64);
assert_eq!(bucket.work_for_bucket(2u64), 20);

for num_threads in [1, 2, 3, 5, 10, 20] {
let barrier = Arc::new(Barrier::new(num_threads));
Expand All @@ -234,13 +264,13 @@ mod tests {
barrier.wait();
// First time we increment the work in this second pass. All the threads will
// attempt to flip the bucket's color. Only one should succeed.
bucket.increment_work(i as u64, BLACK);
bucket.increment_work(i as u64, 3u64);
}));
}
for handle in handles {
handle.join().unwrap();
}
assert_eq!(bucket.work(), cumulative_work);
assert_eq!(bucket.work_for_bucket(3u64), cumulative_work);
}
}

Expand All @@ -259,36 +289,72 @@ mod tests {
let started_at = anchor;
let ended_at = started_at + Duration::from_millis(0);
estimator.update(started_at, ended_at, 100);
assert_eq!(estimator.inner.buckets[0].work(), 100);
assert_eq!(estimator.inner.buckets[0].work_for_bucket(0), 100);

let ended_at = started_at + Duration::from_millis(999);
estimator.update(started_at, ended_at, 200);
assert_eq!(estimator.inner.buckets[0].work(), 300);
assert_eq!(estimator.inner.buckets[0].work_for_bucket(0), 300);

assert_eq!(estimator.work(), 0);
assert_eq!(estimator.work_at(anchor), 0);

let ended_at = started_at + Duration::from_millis(1_000);
estimator.update(started_at, ended_at, 300);
assert_eq!(estimator.inner.buckets[1].work(), 300);
assert_eq!(estimator.inner.buckets[1].work_for_bucket(1), 300);

let ended_at = started_at + Duration::from_millis(1_999);
estimator.update(started_at, ended_at, 600);
assert_eq!(estimator.inner.buckets[1].work(), 900);
assert_eq!(estimator.inner.buckets[1].work_for_bucket(1), 900);

assert_eq!(estimator.work(), 45);
assert_eq!(
estimator.work_at(anchor + Duration::from_secs(2)),
(300 + 900) / 20
);

let ended_at = started_at + Duration::from_millis(2_000);
estimator.update(started_at, ended_at, 800);
assert_eq!(estimator.inner.buckets[2].work(), 800);
assert_eq!(estimator.inner.buckets[2].work_for_bucket(2), 800);

let ended_at = started_at + Duration::from_millis(2_999);
estimator.update(started_at, ended_at, 1_000);
assert_eq!(estimator.inner.buckets[2].work(), 1_800);
assert_eq!(estimator.inner.buckets[2].work_for_bucket(2), 1_800);

assert_eq!(estimator.work(), 135);
assert_eq!(estimator.work_at(anchor + Duration::from_secs(3)), 135);

let ended_at = started_at + Duration::from_millis(3_000);
estimator.update(started_at, ended_at, 500);
assert_eq!(estimator.inner.buckets[0].work(), 500);
assert_eq!(estimator.inner.buckets[0].work_for_bucket(0), 0);
assert_eq!(estimator.inner.buckets[0].work_for_bucket(3), 500);
}

#[test]
fn test_sma_rate_skipped_bucket() {
let num_buckets = NonZeroUsize::new(10).unwrap();
let bucket_period = Duration::from_secs(1);
let period = Duration::from_secs(1);

let mut estimator = SmaRateEstimator::new(num_buckets, bucket_period, period);

assert_eq!(estimator.work(), 0);

let anchor = estimator.inner.anchor;

// We fill all of the bucket with 100 work.
for i in 0..10 {
let ended_at = anchor + Duration::from_secs(1) * i;
estimator.update(ended_at, ended_at, 100);
}

assert_eq!(estimator.work_at(anchor + Duration::from_secs(10)), 100);

// Now let's assume there isn't any work ongoing for 4s.
// Over the last 9 seconds, we have received 500 works
//
// After the reset, we should have the following buckets:
// We expect a mean of 44 work/s.
// |0, 0, 0, 0, 0, 100*, 100, 100, 100, 100|
//
// Since the current bucket (idx = 5) is not taken into account, this leads
// to an average of 500 / 9 = 55 work units.
assert_eq!(estimator.work_at(anchor + Duration::from_secs(15)), 44);
}
}
4 changes: 1 addition & 3 deletions quickwit/quickwit-serve/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,13 +261,11 @@ async fn start_ingest_client_if_needed(
)
.await?;
let num_buckets = NonZeroUsize::new(60).expect("60 should be non-zero");
let initial_rate = ConstantRate::new(ByteSize::mib(50).as_u64(), Duration::from_secs(1));
let rate_estimator = SmaRateEstimator::new(
num_buckets,
Duration::from_secs(10),
Duration::from_millis(100),
)
.with_initial_rate(initial_rate);
);
let memory_capacity = ingest_api_service.ask(GetMemoryCapacity).await?;
let min_rate = ConstantRate::new(ByteSize::mib(1).as_u64(), Duration::from_millis(100));
let rate_modulator = RateModulator::new(rate_estimator.clone(), memory_capacity, min_rate);
Expand Down

0 comments on commit 9235400

Please sign in to comment.