Skip to content

Commit

Permalink
Added initialize time.
Browse files Browse the repository at this point in the history
Also, this PR increased NUDGE_TIME from 2s to 5s.

Closes #5082
  • Loading branch information
fulmicoton committed Jun 5, 2024
1 parent 567faf1 commit b9c95a0
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 1 deletion.
13 changes: 12 additions & 1 deletion quickwit/quickwit-indexing/src/actors/cooperative_indexing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use tokio::time::Instant;

/// We allow ourselves to adjust the sleep time by at most `NUDGE_TOLERANCE`
/// in order to steer a pipeline to its phase.
const NUDGE_TOLERANCE: Duration = Duration::from_secs(2);
const NUDGE_TOLERANCE: Duration = Duration::from_secs(5);

// Origin of time. It is used to compute the phase of the pipeline.
static ORIGIN_OF_TIME: Lazy<Instant> = Lazy::new(Instant::now);
Expand Down Expand Up @@ -107,13 +107,24 @@ impl CooperativeIndexingCycle {
commit_timeout: Duration,
indexing_permits: Arc<Semaphore>,
) -> CooperativeIndexingCycle {
// Force the initial of the origin of time.
let _to = *ORIGIN_OF_TIME;
CooperativeIndexingCycle {
target_phase,
commit_timeout,
indexing_permits,
}
}

pub fn initial_sleep_duration(&self) -> Duration {
let t0 = *ORIGIN_OF_TIME;
let commit_timeout_ms = self.commit_timeout.as_millis() as u64;
let current_phase_millis: u64 = t0.elapsed().as_millis() as u64 % commit_timeout_ms;
let target_phase_millis: u64 = self.target_phase.as_millis() as u64 % commit_timeout_ms;
let initial_sleep_millis: u64 = (commit_timeout_ms + target_phase_millis - current_phase_millis) % commit_timeout_ms;
Duration::from_millis(initial_sleep_millis)
}

pub async fn cooperative_indexing_period(&self) -> CooperativeIndexingPeriod {
let t_wake = Instant::now();
let permit = Semaphore::acquire_owned(self.indexing_permits.clone())
Expand Down
9 changes: 9 additions & 0 deletions quickwit/quickwit-indexing/src/actors/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,15 @@ impl Actor for Indexer {
false
}

async fn initialize(&mut self, ctx: &ActorContext<Self>) -> Result<(), ActorExitStatus> {
if let Some(cooperative_indexing_cycle) = &self.indexer_state.cooperative_indexing_opt {
let initial_sleep_duration = cooperative_indexing_cycle.initial_sleep_duration();
ctx.pause();
ctx.schedule_self_msg(initial_sleep_duration, Command::Resume);
}
Ok(())
}

async fn on_drained_messages(
&mut self,
ctx: &ActorContext<Self>,
Expand Down

0 comments on commit b9c95a0

Please sign in to comment.