Skip to content

Commit

Permalink
sidechain: make slotstream synchronous
Browse files Browse the repository at this point in the history
  • Loading branch information
OverOrion committed Jul 20, 2023
1 parent 65d6903 commit ebce1c5
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 28 deletions.
11 changes: 2 additions & 9 deletions service/src/sidechain_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use crate::{
parentchain_handler::HandleParentchain,
Config,
};
use futures::executor::block_on;
use itp_enclave_api::{
direct_request::DirectRequest, enclave_base::EnclaveBase, sidechain::Sidechain,
};
Expand Down Expand Up @@ -90,15 +89,9 @@ where
let sidechain_enclave_api = enclave;
println!("[+] Spawning thread for sidechain block production");
tokio::task::spawn_blocking(move || {
let future = start_slot_worker(
|| execute_trusted_calls(sidechain_enclave_api.as_ref()),
SLOT_DURATION,
);
block_on(future);
start_slot_worker(|| execute_trusted_calls(sidechain_enclave_api.as_ref()), SLOT_DURATION);
println!("[!] Sidechain block production loop has terminated");
})
.await
.map_err(|e| Error::Custom(Box::new(e)))?;
});

// ------------------------------------------------------------------------
// start sidechain pruning loop
Expand Down
30 changes: 11 additions & 19 deletions sidechain/consensus/slots/src/slot_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,25 @@
//! provides generic functionality for slots.

use crate::time_until_next_slot;
use futures_timer::Delay;
use std::time::Duration;
use std::{thread, time::Duration};

/// Executes given `task` repeatedly when the next slot becomes available.
pub async fn start_slot_worker<F>(task: F, slot_duration: Duration)
pub fn start_slot_worker<F>(task: F, slot_duration: Duration)
where
F: Fn(),
{
let mut slot_stream = SlotStream::new(slot_duration);

loop {
slot_stream.next_slot().await;
slot_stream.next_slot();
task();
}
}

/// Stream to calculate the slot schedule with.
pub struct SlotStream {
slot_duration: Duration,
inner_delay: Option<Delay>,
inner_delay: Option<std::time::Instant>,
}

impl SlotStream {
Expand All @@ -53,25 +52,18 @@ impl SlotStream {
impl SlotStream {
/// Waits for the duration of `inner_delay`.
/// Upon timeout, `inner_delay` is reset according to the time left until next slot.
pub async fn next_slot(&mut self) {
self.inner_delay = match self.inner_delay.take() {
None => {
// Delay is not initialized in this case,
// so we have to initialize with the time until the next slot.
let wait_dur = time_until_next_slot(self.slot_duration);
Some(Delay::new(wait_dur))
},
Some(d) => Some(d),
};

if let Some(inner_delay) = self.inner_delay.take() {
inner_delay.await;
pub fn next_slot(&mut self) {
if let Some(inner_delay) = self.inner_delay {
if inner_delay > std::time::Instant::now() {
// Sleep until the next slot is available
thread::sleep(inner_delay - std::time::Instant::now());
}
}

let ends_in = time_until_next_slot(self.slot_duration);

// Re-schedule delay for next slot.
self.inner_delay = Some(Delay::new(ends_in));
self.inner_delay = Some(std::time::Instant::now() + ends_in);
}
}

Expand Down

0 comments on commit ebce1c5

Please sign in to comment.