Skip to content

Commit

Permalink
Refactor and test CooldownMap
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Sep 19, 2024
1 parent 815b032 commit 69d5ee4
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 38 deletions.
52 changes: 14 additions & 38 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@ use std::collections::{BTreeSet, HashMap};
use std::fmt;
use std::fmt::Formatter;
use std::num::NonZeroUsize;
use std::time::{Duration, Instant};
use std::time::Duration;

use anyhow::Context;
use async_trait::async_trait;
use futures::stream::FuturesUnordered;
use futures::{Future, StreamExt};
use lru::LruCache;
use quickwit_actors::{
Actor, ActorContext, ActorExitStatus, ActorHandle, DeferableReplyHandler, Handler, Mailbox,
Supervisor, Universe, WeakMailbox,
Expand Down Expand Up @@ -59,6 +58,7 @@ use serde_json::{json, Value as JsonValue};
use tokio::sync::watch;
use tracing::{debug, error, info};

use crate::cooldown_map::{CooldownMap, CooldownStatus};
use crate::debouncer::Debouncer;
use crate::indexing_scheduler::{IndexingScheduler, IndexingSchedulerState};
use crate::ingest::ingest_controller::{IngestControllerStats, RebalanceShardsCallback};
Expand Down Expand Up @@ -99,7 +99,7 @@ pub struct ControlPlane {
ingest_controller: IngestController,
metastore: MetastoreServiceClient,
model: ControlPlaneModel,
next_prune_shard_requests: LruCache<(IndexId, SourceId), Instant>,
prune_shard_cooldown: CooldownMap<(IndexId, SourceId)>,
rebuild_plan_debouncer: Debouncer,
readiness_tx: watch::Sender<bool>,
// Disables the control loop. This is useful for unit testing.
Expand Down Expand Up @@ -183,7 +183,7 @@ impl ControlPlane {
ingest_controller,
metastore: metastore.clone(),
model: Default::default(),
next_prune_shard_requests: LruCache::new(NonZeroUsize::new(1024).unwrap()),
prune_shard_cooldown: CooldownMap::new(NonZeroUsize::new(1024).unwrap()),
rebuild_plan_debouncer: Debouncer::new(REBUILD_PLAN_COOLDOWN_PERIOD),
readiness_tx,
disable_control_loop,
Expand Down Expand Up @@ -788,44 +788,20 @@ impl Handler<PruneShardsRequest> for ControlPlane {
request: PruneShardsRequest,
_ctx: &ActorContext<Self>,
) -> Result<ControlPlaneResult<EmptyResponse>, ActorExitStatus> {
// A very basic debounce is enough here, missing one call to the pruning API is fine
let next_prune_shard_request = self.next_prune_shard_requests.get_mut(&(
request.index_uid().index_id.clone(),
request.source_id.clone(),
));
let interval = request
.interval
.map(|interval| Duration::from_secs(interval as u64))
.unwrap_or_else(|| PRUNE_SHARDS_DEFAULT_COOLDOWN_PERIOD);
let should_call = if let Some(deadline) = next_prune_shard_request {
let now = Instant::now();
if now >= *deadline {
*deadline = now + interval;
true
} else {
false
}
} else {
let capacity: usize = self.next_prune_shard_requests.cap().into();
if self.next_prune_shard_requests.len() == capacity {
if let Some((_, deadline)) = self.next_prune_shard_requests.peek_lru() {
if *deadline > Instant::now() {
// the oldest is not outdated, grow the LRU
self.next_prune_shard_requests
.resize(NonZeroUsize::new(capacity * 2).unwrap());
}
}
}
self.next_prune_shard_requests.push(
(
request.index_uid().index_id.clone(),
request.source_id.clone(),
),
Instant::now() + interval,
);
true
};
if should_call {

// A very basic debounce is enough here, missing one call to the pruning API is fine
let status = self.prune_shard_cooldown.update(
(
request.index_uid().index_id.clone(),
request.source_id.clone(),
),
interval,
);
if let CooldownStatus::Ready = status {
if let Err(metastore_error) = self.metastore.prune_shards(request).await {
return convert_metastore_error(metastore_error);
};
Expand Down
164 changes: 164 additions & 0 deletions quickwit/quickwit-control-plane/src/cooldown_map.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright (C) 2024 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::fmt::Debug;
use std::hash::Hash;
use std::num::NonZeroUsize;
use std::time::{Duration, Instant};

use lru::LruCache;

/// A map that keeps track of a cooldown deadline for each of its keys.
///
/// Internally it uses an [`LruCache`] to prune the oldest entries when the
/// capacity is reached. If the capacity is reached but the oldest entry is not
/// outdated, the capacity is extended (2x).
pub struct CooldownMap<K>(LruCache<K, Instant>);

#[derive(Debug, PartialEq)]
pub enum CooldownStatus {
Ready,
InCooldown,
}

impl<K: Hash + Eq> CooldownMap<K> {
pub fn new(capacity: NonZeroUsize) -> Self {
Self(LruCache::new(capacity))
}

/// Updates the deadline for the given key if it isn't currently in cooldown.
///
/// The status returned is the one before the update (after an update, the
/// status is always `InCooldown`).
pub fn update(&mut self, key: K, cooldown_interval: Duration) -> CooldownStatus {
let deadline_opt = self.0.get_mut(&key);
let now = Instant::now();
if let Some(deadline) = deadline_opt {
if *deadline > now {
CooldownStatus::InCooldown
} else {
*deadline = now + cooldown_interval;
CooldownStatus::Ready
}
} else {
let capacity: usize = self.0.cap().into();
if self.0.len() == capacity {
if let Some((_, deadline)) = self.0.peek_lru() {
if *deadline > now {
// the oldest entry is not outdated, grow the LRU
self.0.resize(NonZeroUsize::new(capacity * 2).unwrap());
}
}
}
self.0.push(key, now + cooldown_interval);
CooldownStatus::Ready
}
}
}
#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_cooldown_map_resize() {
let mut cooldown_map = CooldownMap::new(NonZeroUsize::new(2).unwrap());
let cooldown_interval = Duration::from_secs(1);
assert_eq!(
cooldown_map.update("test_key1", cooldown_interval),
CooldownStatus::Ready
);
assert_eq!(
cooldown_map.update("test_key1", cooldown_interval),
CooldownStatus::InCooldown
);
assert_eq!(
cooldown_map.update("test_key2", cooldown_interval),
CooldownStatus::Ready
);
assert_eq!(
cooldown_map.update("test_key2", cooldown_interval),
CooldownStatus::InCooldown
);
// Hitting the capacity, the map should grow transparently
assert_eq!(
cooldown_map.update("test_key3", cooldown_interval),
CooldownStatus::Ready
);
assert_eq!(
cooldown_map.update("test_key1", cooldown_interval),
CooldownStatus::InCooldown
);
assert_eq!(
cooldown_map.update("test_key2", cooldown_interval),
CooldownStatus::InCooldown
);
assert_eq!(cooldown_map.0.cap(), NonZeroUsize::new(4).unwrap());
}

#[test]
fn test_cooldown_map_expired() {
let mut cooldown_map = CooldownMap::new(NonZeroUsize::new(2).unwrap());
let cooldown_interval_short = Duration::from_millis(100);
let cooldown_interval_long = Duration::from_secs(5);

assert_eq!(
cooldown_map.update("test_key_short", cooldown_interval_short),
CooldownStatus::Ready
);
assert_eq!(
cooldown_map.update("test_key_long", cooldown_interval_long),
CooldownStatus::Ready
);

std::thread::sleep(cooldown_interval_short.mul_f32(1.1));
assert_eq!(
cooldown_map.update("test_key_short", cooldown_interval_short),
CooldownStatus::Ready
);
assert_eq!(
cooldown_map.update("test_key_long", cooldown_interval_long),
CooldownStatus::InCooldown
);
}

#[test]
fn test_cooldown_map_eviction() {
let mut cooldown_map = CooldownMap::new(NonZeroUsize::new(2).unwrap());
let cooldown_interval_short = Duration::from_millis(100);
let cooldown_interval_long = Duration::from_secs(5);

assert_eq!(
cooldown_map.update("test_key_short", cooldown_interval_short),
CooldownStatus::Ready
);
assert_eq!(
cooldown_map.update("test_key_long_1", cooldown_interval_long),
CooldownStatus::Ready
);

// after the cooldown period `test_key_short` should be evicted when adding a new key
std::thread::sleep(cooldown_interval_short.mul_f32(1.1));
assert_eq!(cooldown_map.0.len(), 2);
assert_eq!(
cooldown_map.update("test_key_long_2", cooldown_interval_long),
CooldownStatus::Ready
);
assert_eq!(cooldown_map.0.len(), 2);
}
}
1 change: 1 addition & 0 deletions quickwit/quickwit-control-plane/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub struct IndexerNodeInfo {

pub type IndexerPool = Pool<NodeId, IndexerNodeInfo>;

mod cooldown_map;
mod debouncer;
#[cfg(test)]
mod tests;

0 comments on commit 69d5ee4

Please sign in to comment.