Skip to content

Commit

Permalink
Debounce shard pruning request (#5374)
Browse files Browse the repository at this point in the history
* Add debounce to shard prune request

* Use pruning interval as control plane cooldown

* Address smaller review comments

* Simplify debounce logic

* Use LruCache for debounce map

* Refactor and test CooldownMap

* Revert grpc number reuse

* Rename grpc field interval to interval_secs
  • Loading branch information
rdettai authored Sep 19, 2024
1 parent 565becd commit f3dd8f2
Show file tree
Hide file tree
Showing 18 changed files with 533 additions and 126 deletions.
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/quickwit-control-plane/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ bytesize = { workspace = true }
fnv = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
lru = { workspace = true }
mockall = { workspace = true, optional = true }
once_cell = { workspace = true }
rand = { workspace = true }
Expand Down
45 changes: 42 additions & 3 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use std::collections::{BTreeSet, HashMap};
use std::fmt;
use std::fmt::Formatter;
use std::num::NonZeroUsize;
use std::time::Duration;

use anyhow::Context;
Expand Down Expand Up @@ -49,14 +50,15 @@ use quickwit_proto::metastore::{
serde_utils, AddSourceRequest, CreateIndexRequest, CreateIndexResponse, DeleteIndexRequest,
DeleteShardsRequest, DeleteSourceRequest, EmptyResponse, FindIndexTemplateMatchesRequest,
IndexMetadataResponse, IndexTemplateMatch, MetastoreError, MetastoreResult, MetastoreService,
MetastoreServiceClient, ToggleSourceRequest, UpdateIndexRequest,
MetastoreServiceClient, PruneShardsRequest, ToggleSourceRequest, UpdateIndexRequest,
};
use quickwit_proto::types::{IndexUid, NodeId, ShardId, SourceUid};
use quickwit_proto::types::{IndexId, IndexUid, NodeId, ShardId, SourceId, SourceUid};
use serde::Serialize;
use serde_json::{json, Value as JsonValue};
use tokio::sync::watch;
use tracing::{debug, error, info};

use crate::cooldown_map::{CooldownMap, CooldownStatus};
use crate::debouncer::Debouncer;
use crate::indexing_scheduler::{IndexingScheduler, IndexingSchedulerState};
use crate::ingest::ingest_controller::{IngestControllerStats, RebalanceShardsCallback};
Expand All @@ -71,13 +73,16 @@ pub(crate) const CONTROL_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, featur
Duration::from_secs(5)
};

/// Minimum period between two identical shard pruning operations.
const PRUNE_SHARDS_DEFAULT_COOLDOWN_PERIOD: Duration = Duration::from_secs(120);

/// Minimum period between two rebuild plan operations.
const REBUILD_PLAN_COOLDOWN_PERIOD: Duration = Duration::from_secs(2);

#[derive(Debug)]
struct ControlPlanLoop;

#[derive(Debug, Default)]
#[derive(Debug, Default, Clone, Copy)]
struct RebuildPlan;

pub struct ControlPlane {
Expand All @@ -94,6 +99,7 @@ pub struct ControlPlane {
ingest_controller: IngestController,
metastore: MetastoreServiceClient,
model: ControlPlaneModel,
prune_shard_cooldown: CooldownMap<(IndexId, SourceId)>,
rebuild_plan_debouncer: Debouncer,
readiness_tx: watch::Sender<bool>,
// Disables the control loop. This is useful for unit testing.
Expand Down Expand Up @@ -177,6 +183,7 @@ impl ControlPlane {
ingest_controller,
metastore: metastore.clone(),
model: Default::default(),
prune_shard_cooldown: CooldownMap::new(NonZeroUsize::new(1024).unwrap()),
rebuild_plan_debouncer: Debouncer::new(REBUILD_PLAN_COOLDOWN_PERIOD),
readiness_tx,
disable_control_loop,
Expand Down Expand Up @@ -772,6 +779,38 @@ impl Handler<DeleteSourceRequest> for ControlPlane {
}
}

#[async_trait]
impl Handler<PruneShardsRequest> for ControlPlane {
type Reply = ControlPlaneResult<EmptyResponse>;

async fn handle(
&mut self,
request: PruneShardsRequest,
_ctx: &ActorContext<Self>,
) -> Result<ControlPlaneResult<EmptyResponse>, ActorExitStatus> {
let interval = request
.interval_secs
.map(|interval_secs| Duration::from_secs(interval_secs as u64))
.unwrap_or_else(|| PRUNE_SHARDS_DEFAULT_COOLDOWN_PERIOD);

// A very basic debounce is enough here, missing one call to the pruning API is fine
let status = self.prune_shard_cooldown.update(
(
request.index_uid().index_id.clone(),
request.source_id.clone(),
),
interval,
);
if let CooldownStatus::Ready = status {
if let Err(metastore_error) = self.metastore.prune_shards(request).await {
return convert_metastore_error(metastore_error);
};
}
// Return ok regardless of whether the call was successful or debounced
Ok(Ok(EmptyResponse {}))
}
}

// This is neither a proxied call nor a metastore callback.
#[async_trait]
impl Handler<GetOrCreateOpenShardsRequest> for ControlPlane {
Expand Down
164 changes: 164 additions & 0 deletions quickwit/quickwit-control-plane/src/cooldown_map.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright (C) 2024 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::fmt::Debug;
use std::hash::Hash;
use std::num::NonZeroUsize;
use std::time::{Duration, Instant};

use lru::LruCache;

/// A map that keeps track of a cooldown deadline for each of its keys.
///
/// Internally it uses an [`LruCache`] to prune the oldest entries when the
/// capacity is reached. If the capacity is reached but the oldest entry is not
/// outdated, the capacity is extended (2x).
pub struct CooldownMap<K>(LruCache<K, Instant>);

#[derive(Debug, PartialEq)]
pub enum CooldownStatus {
Ready,
InCooldown,
}

impl<K: Hash + Eq> CooldownMap<K> {
pub fn new(capacity: NonZeroUsize) -> Self {
Self(LruCache::new(capacity))
}

/// Updates the deadline for the given key if it isn't currently in cooldown.
///
/// The status returned is the one before the update (after an update, the
/// status is always `InCooldown`).
pub fn update(&mut self, key: K, cooldown_interval: Duration) -> CooldownStatus {
let deadline_opt = self.0.get_mut(&key);
let now = Instant::now();
if let Some(deadline) = deadline_opt {
if *deadline > now {
CooldownStatus::InCooldown
} else {
*deadline = now + cooldown_interval;
CooldownStatus::Ready
}
} else {
let capacity: usize = self.0.cap().into();
if self.0.len() == capacity {
if let Some((_, deadline)) = self.0.peek_lru() {
if *deadline > now {
// the oldest entry is not outdated, grow the LRU
self.0.resize(NonZeroUsize::new(capacity * 2).unwrap());
}
}
}
self.0.push(key, now + cooldown_interval);
CooldownStatus::Ready
}
}
}
#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_cooldown_map_resize() {
let mut cooldown_map = CooldownMap::new(NonZeroUsize::new(2).unwrap());
let cooldown_interval = Duration::from_secs(1);
assert_eq!(
cooldown_map.update("test_key1", cooldown_interval),
CooldownStatus::Ready
);
assert_eq!(
cooldown_map.update("test_key1", cooldown_interval),
CooldownStatus::InCooldown
);
assert_eq!(
cooldown_map.update("test_key2", cooldown_interval),
CooldownStatus::Ready
);
assert_eq!(
cooldown_map.update("test_key2", cooldown_interval),
CooldownStatus::InCooldown
);
// Hitting the capacity, the map should grow transparently
assert_eq!(
cooldown_map.update("test_key3", cooldown_interval),
CooldownStatus::Ready
);
assert_eq!(
cooldown_map.update("test_key1", cooldown_interval),
CooldownStatus::InCooldown
);
assert_eq!(
cooldown_map.update("test_key2", cooldown_interval),
CooldownStatus::InCooldown
);
assert_eq!(cooldown_map.0.cap(), NonZeroUsize::new(4).unwrap());
}

#[test]
fn test_cooldown_map_expired() {
let mut cooldown_map = CooldownMap::new(NonZeroUsize::new(2).unwrap());
let cooldown_interval_short = Duration::from_millis(100);
let cooldown_interval_long = Duration::from_secs(5);

assert_eq!(
cooldown_map.update("test_key_short", cooldown_interval_short),
CooldownStatus::Ready
);
assert_eq!(
cooldown_map.update("test_key_long", cooldown_interval_long),
CooldownStatus::Ready
);

std::thread::sleep(cooldown_interval_short.mul_f32(1.1));
assert_eq!(
cooldown_map.update("test_key_short", cooldown_interval_short),
CooldownStatus::Ready
);
assert_eq!(
cooldown_map.update("test_key_long", cooldown_interval_long),
CooldownStatus::InCooldown
);
}

#[test]
fn test_cooldown_map_eviction() {
let mut cooldown_map = CooldownMap::new(NonZeroUsize::new(2).unwrap());
let cooldown_interval_short = Duration::from_millis(100);
let cooldown_interval_long = Duration::from_secs(5);

assert_eq!(
cooldown_map.update("test_key_short", cooldown_interval_short),
CooldownStatus::Ready
);
assert_eq!(
cooldown_map.update("test_key_long_1", cooldown_interval_long),
CooldownStatus::Ready
);

// after the cooldown period `test_key_short` should be evicted when adding a new key
std::thread::sleep(cooldown_interval_short.mul_f32(1.1));
assert_eq!(cooldown_map.0.len(), 2);
assert_eq!(
cooldown_map.update("test_key_long_2", cooldown_interval_long),
CooldownStatus::Ready
);
assert_eq!(cooldown_map.0.len(), 2);
}
}
14 changes: 5 additions & 9 deletions quickwit/quickwit-control-plane/src/debouncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use std::sync::{Arc, Mutex};
use std::time::Duration;

use quickwit_actors::{Actor, ActorContext, DeferableReplyHandler, Handler};
use quickwit_actors::{Actor, ActorContext, Handler};

/// A debouncer is a helper to debounce events.
///
Expand Down Expand Up @@ -93,15 +93,15 @@ impl Debouncer {

fn emit_message<A, M>(&self, ctx: &ActorContext<A>)
where
A: Actor + Handler<M> + DeferableReplyHandler<M>,
A: Actor + Handler<M>,
M: Default + std::fmt::Debug + Send + Sync + 'static,
{
let _ = ctx.mailbox().send_message_with_high_priority(M::default());
}

fn schedule_post_cooldown_callback<A, M>(&self, ctx: &ActorContext<A>)
where
A: Actor + Handler<M> + DeferableReplyHandler<M>,
A: Actor + Handler<M>,
M: Default + std::fmt::Debug + Send + Sync + 'static,
{
let ctx_clone = ctx.clone();
Expand All @@ -116,12 +116,8 @@ impl Debouncer {
.schedule_event(callback, self.cooldown_period);
}

pub fn self_send_with_cooldown<M>(
&self,
ctx: &ActorContext<impl Handler<M> + DeferableReplyHandler<M>>,
) where
M: Default + std::fmt::Debug + Send + Sync + 'static,
{
pub fn self_send_with_cooldown<M>(&self, ctx: &ActorContext<impl Handler<M>>)
where M: Default + std::fmt::Debug + Send + Sync + 'static {
let cooldown_state = self.accept_transition(Transition::Emit);
match cooldown_state {
DebouncerState::NoCooldown => {
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-control-plane/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub struct IndexerNodeInfo {

pub type IndexerPool = Pool<NodeId, IndexerNodeInfo>;

mod cooldown_map;
mod debouncer;
#[cfg(test)]
mod tests;
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ impl QueueSharedState {
source_id: source_id.clone(),
max_age_secs,
max_count,
interval_secs: Some(pruning_interval.as_secs() as u32),
})
.await;
if let Err(err) = result {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ use quickwit_proto::metastore::{
ListShardsRequest, ListShardsResponse, ListSplitsRequest, ListSplitsResponse,
ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreResult, MetastoreService,
MetastoreServiceClient, MetastoreServiceStream, OpenShardsRequest, OpenShardsResponse,
PruneShardsRequest, PruneShardsResponse, PublishSplitsRequest, ResetSourceCheckpointRequest,
StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest,
PruneShardsRequest, PublishSplitsRequest, ResetSourceCheckpointRequest, StageSplitsRequest,
ToggleSourceRequest, UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest,
UpdateSplitsDeleteOpstampResponse,
};

Expand Down Expand Up @@ -120,6 +120,12 @@ impl MetastoreService for ControlPlaneMetastore {
Ok(response)
}

// Proxy through the control plane to debounce queries
async fn prune_shards(&self, request: PruneShardsRequest) -> MetastoreResult<EmptyResponse> {
self.control_plane.prune_shards(request).await?;
Ok(EmptyResponse {})
}

// Other metastore API calls.

async fn index_metadata(
Expand Down Expand Up @@ -237,14 +243,6 @@ impl MetastoreService for ControlPlaneMetastore {
self.metastore.delete_shards(request).await
}

async fn prune_shards(
&self,
request: PruneShardsRequest,
) -> MetastoreResult<PruneShardsResponse> {
// TODO this call should go through the control plane which should apply debounce
self.metastore.prune_shards(request).await
}

// Index Template API

async fn create_index_template(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use quickwit_proto::metastore::{
AcquireShardsRequest, AcquireShardsResponse, DeleteQuery, DeleteShardsRequest,
DeleteShardsResponse, DeleteTask, EntityKind, ListShardsSubrequest, ListShardsSubresponse,
MetastoreError, MetastoreResult, OpenShardSubrequest, OpenShardSubresponse, PruneShardsRequest,
PruneShardsResponse,
};
use quickwit_proto::types::{IndexUid, PublishToken, SourceId, SplitId};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -667,7 +666,7 @@ impl FileBackedIndex {
pub(crate) fn prune_shards(
&mut self,
request: PruneShardsRequest,
) -> MetastoreResult<MutationOccurred<PruneShardsResponse>> {
) -> MetastoreResult<MutationOccurred<()>> {
self.get_shards_for_source_mut(&request.source_id)?
.prune_shards(request)
}
Expand Down
Loading

0 comments on commit f3dd8f2

Please sign in to comment.