From 66b025fd521b4f75400ae25ffd1b1011e02fb43e Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Tue, 11 Jun 2024 18:44:56 +0900 Subject: [PATCH] Issue/5082 initial sleep to unsync indexers (#5083) * Removing the indexer local concept of t0 for a global definition * Added initialize time. Also, this PR increased NUDGE_TIME from 2s to 5s. Closes #5082 --- .../src/actors/cooperative_indexing.rs | 58 +++++++++++++++++-- .../quickwit-indexing/src/actors/indexer.rs | 9 +++ 2 files changed, 61 insertions(+), 6 deletions(-) diff --git a/quickwit/quickwit-indexing/src/actors/cooperative_indexing.rs b/quickwit/quickwit-indexing/src/actors/cooperative_indexing.rs index b2ab848e0d5..07ddb1c80ef 100644 --- a/quickwit/quickwit-indexing/src/actors/cooperative_indexing.rs +++ b/quickwit/quickwit-indexing/src/actors/cooperative_indexing.rs @@ -21,13 +21,17 @@ use std::hash::{DefaultHasher, Hash, Hasher}; use std::sync::Arc; use std::time::Duration; +use once_cell::sync::Lazy; use quickwit_proto::indexing::{CpuCapacity, PipelineMetrics, PIPELINE_FULL_CAPACITY}; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio::time::Instant; /// We allow ourselves to adjust the sleep time by at most `NUDGE_TOLERANCE` /// in order to steer a pipeline to its phase. -const NUDGE_TOLERANCE: Duration = Duration::from_secs(2); +const NUDGE_TOLERANCE: Duration = Duration::from_secs(5); + +// Origin of time. It is used to compute the phase of the pipeline. +static ORIGIN_OF_TIME: Lazy = Lazy::new(Instant::now); /// Cooperative indexing is a mechanism to deal with a large amount of pipelines. /// @@ -77,7 +81,6 @@ pub(crate) struct CooperativeIndexingCycle { target_phase: Duration, commit_timeout: Duration, indexing_permits: Arc, - t0: Instant, } impl CooperativeIndexingCycle { @@ -104,14 +107,31 @@ impl CooperativeIndexingCycle { commit_timeout: Duration, indexing_permits: Arc, ) -> CooperativeIndexingCycle { + // Force the initial of the origin of time. + let _t0 = *ORIGIN_OF_TIME; CooperativeIndexingCycle { target_phase, commit_timeout, indexing_permits, - t0: Instant::now(), } } + pub fn initial_sleep_duration(&self) -> Duration { + let t0 = *ORIGIN_OF_TIME; + let commit_timeout_millis = self.commit_timeout.as_millis() as u64; + let current_phase_millis: u64 = t0.elapsed().as_millis() as u64 % commit_timeout_millis; + let target_phase_millis: u64 = self.target_phase.as_millis() as u64 % commit_timeout_millis; + let initial_sleep_millis: u64 = (commit_timeout_millis + target_phase_millis + - current_phase_millis) + % commit_timeout_millis; + if initial_sleep_millis + 2 * NUDGE_TOLERANCE.as_millis() as u64 > commit_timeout_millis { + // We are reasonably close to the target phase. No need to sleep. The nudge + // will be enough. + return Duration::default(); + } + Duration::from_millis(initial_sleep_millis) + } + pub async fn cooperative_indexing_period(&self) -> CooperativeIndexingPeriod { let t_wake = Instant::now(); let permit = Semaphore::acquire_owned(self.indexing_permits.clone()) @@ -123,7 +143,6 @@ impl CooperativeIndexingCycle { t_work_start, commit_timeout: self.commit_timeout, target_phase: self.target_phase, - t0: self.t0, _permit: permit, } } @@ -136,7 +155,6 @@ pub(crate) struct CooperativeIndexingPeriod { t_work_start: Instant, commit_timeout: Duration, target_phase: Duration, - t0: Instant, _permit: OwnedSemaphorePermit, } @@ -161,7 +179,8 @@ impl CooperativeIndexingPeriod { fn compute_sleep_duration(&self, t_work_end: Instant) -> Duration { let commit_timeout_millis = self.commit_timeout.as_millis() as u64; - let phase_millis: u64 = ((t_work_end - self.t0).as_millis() as u64) % commit_timeout_millis; + let phase_millis: u64 = + ((t_work_end - *ORIGIN_OF_TIME).as_millis() as u64) % commit_timeout_millis; let delta_phase: i64 = phase_millis as i64 - self.target_phase.as_millis() as i64; // delta phase is within (-commit_timeout_millis, commit_timeout_millis) // We fold it back to [-commit_timeout_millis/2, commit_timeout_millis/2) @@ -227,6 +246,33 @@ mod tests { ); } + #[tokio::test] + async fn test_initial_sleep_time() { + tokio::time::pause(); + let t0 = *ORIGIN_OF_TIME; + for target_phase_secs in [0, 1, 2, 5, 10, 15, 20, 25, 29, 30, 1_000] { + for start_time_secs in [0, 1, 2, 5, 10, 15, 20, 25, 29, 30] { + let target_phase = Duration::from_secs(target_phase_secs); + let semaphore = Arc::new(Semaphore::new(1)); + tokio::time::sleep(Duration::from_secs(start_time_secs)).await; + let cooperative_indexing = CooperativeIndexingCycle::new_with_phase( + target_phase, + Duration::from_secs(30), + semaphore.clone(), + ); + let initial_sleep_duration: Duration = + cooperative_indexing.initial_sleep_duration(); + tokio::time::sleep(initial_sleep_duration).await; + let target_phase_millis = cooperative_indexing.target_phase.as_millis() as i64; + let commit_timeout_ms = cooperative_indexing.commit_timeout.as_millis() as i64; + let phase_millis = + (t0.elapsed().as_millis() as i64 - target_phase_millis) % commit_timeout_ms; + assert!(phase_millis >= -100, "{phase_millis}"); + assert!(phase_millis <= (NUDGE_TOLERANCE.as_millis() as i64) * 2 + 100); + } + } + } + #[tokio::test] async fn test_cooperative_indexing_simple() { tokio::time::pause(); diff --git a/quickwit/quickwit-indexing/src/actors/indexer.rs b/quickwit/quickwit-indexing/src/actors/indexer.rs index 22de7b30290..f561d2869ab 100644 --- a/quickwit/quickwit-indexing/src/actors/indexer.rs +++ b/quickwit/quickwit-indexing/src/actors/indexer.rs @@ -398,6 +398,15 @@ impl Actor for Indexer { false } + async fn initialize(&mut self, ctx: &ActorContext) -> Result<(), ActorExitStatus> { + if let Some(cooperative_indexing_cycle) = &self.indexer_state.cooperative_indexing_opt { + let initial_sleep_duration = cooperative_indexing_cycle.initial_sleep_duration(); + ctx.pause(); + ctx.schedule_self_msg(initial_sleep_duration, Command::Resume); + } + Ok(()) + } + async fn on_drained_messages( &mut self, ctx: &ActorContext,