Skip to content

Commit

Permalink
Use LruCache for debounce map
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Sep 18, 2024
1 parent d703a8b commit 815b032
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 15 deletions.
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/quickwit-control-plane/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ bytesize = { workspace = true }
fnv = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
lru = { workspace = true }
mockall = { workspace = true, optional = true }
once_cell = { workspace = true }
rand = { workspace = true }
Expand Down
43 changes: 28 additions & 15 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::hash_map::Entry;
use std::collections::{BTreeSet, HashMap};
use std::fmt;
use std::fmt::Formatter;
use std::num::NonZeroUsize;
use std::time::{Duration, Instant};

use anyhow::Context;
use async_trait::async_trait;
use futures::stream::FuturesUnordered;
use futures::{Future, StreamExt};
use lru::LruCache;
use quickwit_actors::{
Actor, ActorContext, ActorExitStatus, ActorHandle, DeferableReplyHandler, Handler, Mailbox,
Supervisor, Universe, WeakMailbox,
Expand Down Expand Up @@ -98,7 +99,7 @@ pub struct ControlPlane {
ingest_controller: IngestController,
metastore: MetastoreServiceClient,
model: ControlPlaneModel,
next_prune_shard_requests: HashMap<(IndexId, SourceId), Instant>,
next_prune_shard_requests: LruCache<(IndexId, SourceId), Instant>,
rebuild_plan_debouncer: Debouncer,
readiness_tx: watch::Sender<bool>,
// Disables the control loop. This is useful for unit testing.
Expand Down Expand Up @@ -182,7 +183,7 @@ impl ControlPlane {
ingest_controller,
metastore: metastore.clone(),
model: Default::default(),
next_prune_shard_requests: HashMap::new(),
next_prune_shard_requests: LruCache::new(NonZeroUsize::new(1024).unwrap()),
rebuild_plan_debouncer: Debouncer::new(REBUILD_PLAN_COOLDOWN_PERIOD),
readiness_tx,
disable_control_loop,
Expand Down Expand Up @@ -788,29 +789,41 @@ impl Handler<PruneShardsRequest> for ControlPlane {
_ctx: &ActorContext<Self>,
) -> Result<ControlPlaneResult<EmptyResponse>, ActorExitStatus> {
// A very basic debounce is enough here, missing one call to the pruning API is fine
let next_prune_shard_request = self.next_prune_shard_requests.entry((
let next_prune_shard_request = self.next_prune_shard_requests.get_mut(&(
request.index_uid().index_id.clone(),
request.source_id.clone(),
));
let interval = request
.interval
.map(|interval| Duration::from_secs(interval as u64))
.unwrap_or_else(|| PRUNE_SHARDS_DEFAULT_COOLDOWN_PERIOD);
let should_call = match next_prune_shard_request {
Entry::Vacant(entry) => {
entry.insert(Instant::now() + interval);
let should_call = if let Some(deadline) = next_prune_shard_request {
let now = Instant::now();
if now >= *deadline {
*deadline = now + interval;
true
} else {
false
}
Entry::Occupied(mut entry) => {
let deadline = entry.get_mut();
let now = Instant::now();
if now >= *deadline {
*deadline = now + interval;
true
} else {
false
} else {
let capacity: usize = self.next_prune_shard_requests.cap().into();
if self.next_prune_shard_requests.len() == capacity {
if let Some((_, deadline)) = self.next_prune_shard_requests.peek_lru() {
if *deadline > Instant::now() {
// the oldest is not outdated, grow the LRU
self.next_prune_shard_requests
.resize(NonZeroUsize::new(capacity * 2).unwrap());
}
}
}
self.next_prune_shard_requests.push(
(
request.index_uid().index_id.clone(),
request.source_id.clone(),
),
Instant::now() + interval,
);
true
};
if should_call {
if let Err(metastore_error) = self.metastore.prune_shards(request).await {
Expand Down

0 comments on commit 815b032

Please sign in to comment.