Skip to content

Commit

Permalink
feat(ethexe): introduce test-only field status for Service (#4182)
Browse files Browse the repository at this point in the history
  • Loading branch information
clearloop authored Sep 5, 2024
1 parent 86843d3 commit 6b4608a
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 15 deletions.
44 changes: 44 additions & 0 deletions ethexe/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ use std::{
};
use utils::*;

#[cfg(test)]
pub use {futures::lock::Mutex, tests::Status};

/// ethexe service.
pub struct Service {
db: Database,
Expand All @@ -61,6 +64,10 @@ pub struct Service {
validator: Option<ethexe_validator::Validator>,
metrics_service: Option<MetricsService>,
rpc: Option<ethexe_rpc::RpcService>,

// service status
#[cfg(test)]
status: Arc<Mutex<Status>>,
}

// TODO: consider to move this to another module #4176
Expand Down Expand Up @@ -191,6 +198,8 @@ impl Service {
metrics_service,
rpc,
block_time: config.block_time,
#[cfg(test)]
status: Default::default(),
})
}

Expand Down Expand Up @@ -229,6 +238,8 @@ impl Service {
validator,
metrics_service,
rpc,
#[cfg(test)]
status: Default::default(),
}
}

Expand Down Expand Up @@ -416,6 +427,8 @@ impl Service {
metrics_service,
rpc,
block_time,
#[cfg(test)]
status,
} = self;

if let Some(metrics_service) = metrics_service {
Expand Down Expand Up @@ -459,6 +472,11 @@ impl Service {
let mut collection_round_timer = StoppableTimer::new(block_time / 4);
let mut validation_round_timer = StoppableTimer::new(block_time / 4);

#[cfg(test)]
{
*status.lock().await = Status::Active;
}

loop {
tokio::select! {
observer_event = observer_events.next() => {
Expand Down Expand Up @@ -542,6 +560,10 @@ impl Service {
}
}

#[cfg(test)]
{
*status.lock().await = Status::Terminated;
}
Ok(())
}

Expand Down Expand Up @@ -782,6 +804,12 @@ impl Service {
}
}
}

#[cfg(test)]
/// Get the pointer of service status
pub fn status(&self) -> Arc<Mutex<Status>> {
self.status.clone()
}
}

mod utils {
Expand Down Expand Up @@ -843,6 +871,22 @@ mod tests {
};
use tempfile::tempdir;

/// Service status
#[derive(Default, PartialEq)]
pub enum Status {
#[default]
Pending,
Active,
Terminated,
}

impl Status {
/// If the service is active
pub fn active(&self) -> bool {
*self == Status::Active
}
}

#[tokio::test]
async fn basics() {
gear_utils::init_default_logger();
Expand Down
52 changes: 37 additions & 15 deletions ethexe/cli/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

//! Integration tests.

use crate::service::Service;
use crate::service::{Service, Status};
use alloy::{
node_bindings::{Anvil, AnvilInstance},
providers::{ext::AnvilApi, Provider},
Expand All @@ -35,10 +35,13 @@ use ethexe_processor::Processor;
use ethexe_sequencer::Sequencer;
use ethexe_signer::Signer;
use ethexe_validator::Validator;
use futures::StreamExt;
use futures::{lock::Mutex, StreamExt};
use gear_core::ids::prelude::*;
use gprimitives::{ActorId, CodeId, MessageId, H160, H256};
use std::{sync::Arc, time::Duration};
use std::{
sync::Arc,
time::{Duration, SystemTime},
};
use tokio::{
sync::{
mpsc::{self, Receiver},
Expand Down Expand Up @@ -154,6 +157,7 @@ struct TestEnv {
sender_address: ActorId,
block_time: Duration,
running_service_handle: Option<JoinHandle<Result<()>>>,
service_status: Option<Arc<Mutex<Status>>>,
}

impl TestEnv {
Expand Down Expand Up @@ -233,6 +237,7 @@ impl TestEnv {
sender_address: ActorId::from(H160::from(sender_address.0)),
block_time,
running_service_handle: None,
service_status: None,
};

Ok(env)
Expand Down Expand Up @@ -290,13 +295,11 @@ impl TestEnv {
None,
);

self.service_status = Some(service.status());

let handle = task::spawn(service.run());
self.running_service_handle = Some(handle);

// Sleep to wait for the new service to start
// TODO: find a better way to wait for the service to start #4099
tokio::time::sleep(Duration::from_secs(1)).await;

self.service_initialized().await?;
Ok(())
}

Expand Down Expand Up @@ -325,6 +328,26 @@ impl TestEnv {

Ok((tx_hash, code_id))
}

/// Wait for service initialized
pub async fn service_initialized(&self) -> Result<()> {
let Some(status) = &self.service_status else {
return Err(anyhow!("Service not start"));
};

let now = SystemTime::now();
loop {
if status.lock().await.active() {
return Ok(());
}

if now.elapsed()? > self.block_time {
break;
}
}

Err(anyhow!("Service initialization timed out."))
}
}

impl Drop for TestEnv {
Expand Down Expand Up @@ -664,9 +687,9 @@ async fn ping_reorg() {
.await
.unwrap();

// Await for service block with user reply handling
// TODO: this is for better logs reading only, should find a better solution #4099
tokio::time::sleep(env.block_time).await;
env.service_initialized()
.await
.expect("service uninitalized");

log::info!("📗 Reverting to the program creation snapshot");
provider
Expand Down Expand Up @@ -748,10 +771,9 @@ async fn ping_reorg() {
.await
.unwrap();

// Await for service block with user reply handling
// TODO: this is for better logs reading only, should find a better solution #4099
tokio::time::sleep(Duration::from_secs(1)).await;

env.service_initialized()
.await
.expect("service uninitalized");
log::info!("📗 Done");
}

Expand Down

0 comments on commit 6b4608a

Please sign in to comment.