Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ethexe): introduce test-only field status for Service #4182

Merged
merged 12 commits into from
Sep 5, 2024
44 changes: 44 additions & 0 deletions ethexe/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ use std::{
};
use utils::*;

#[cfg(test)]
pub use {futures::lock::Mutex, tests::Status};

/// ethexe service.
pub struct Service {
db: Database,
Expand All @@ -61,6 +64,10 @@ pub struct Service {
validator: Option<ethexe_validator::Validator>,
metrics_service: Option<MetricsService>,
rpc: Option<ethexe_rpc::RpcService>,

// service status
#[cfg(test)]
status: Arc<Mutex<Status>>,
}

// TODO: consider to move this to another module #4176
Expand Down Expand Up @@ -191,6 +198,8 @@ impl Service {
metrics_service,
rpc,
block_time: config.block_time,
#[cfg(test)]
status: Default::default(),
})
}

Expand Down Expand Up @@ -229,6 +238,8 @@ impl Service {
validator,
metrics_service,
rpc,
#[cfg(test)]
status: Default::default(),
}
}

Expand Down Expand Up @@ -416,6 +427,8 @@ impl Service {
metrics_service,
rpc,
block_time,
#[cfg(test)]
status,
} = self;

if let Some(metrics_service) = metrics_service {
Expand Down Expand Up @@ -459,6 +472,11 @@ impl Service {
let mut collection_round_timer = StoppableTimer::new(block_time / 4);
let mut validation_round_timer = StoppableTimer::new(block_time / 4);

#[cfg(test)]
{
*status.lock().await = Status::Active;
}

loop {
tokio::select! {
observer_event = observer_events.next() => {
Expand Down Expand Up @@ -542,6 +560,10 @@ impl Service {
}
}

#[cfg(test)]
{
*status.lock().await = Status::Terminated;
}
Ok(())
}

Expand Down Expand Up @@ -782,6 +804,12 @@ impl Service {
}
}
}

#[cfg(test)]
/// Get the pointer of service status
pub fn status(&self) -> Arc<Mutex<Status>> {
self.status.clone()
}
}

mod utils {
Expand Down Expand Up @@ -843,6 +871,22 @@ mod tests {
};
use tempfile::tempdir;

/// Service status
#[derive(Default, PartialEq)]
pub enum Status {
#[default]
Pending,
Active,
Terminated,
}

impl Status {
/// If the service is active
pub fn active(&self) -> bool {
*self == Status::Active
}
}

#[tokio::test]
async fn basics() {
gear_utils::init_default_logger();
Expand Down
52 changes: 37 additions & 15 deletions ethexe/cli/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

//! Integration tests.

use crate::service::Service;
use crate::service::{Service, Status};
use alloy::{
node_bindings::{Anvil, AnvilInstance},
providers::{ext::AnvilApi, Provider},
Expand All @@ -35,10 +35,13 @@ use ethexe_processor::Processor;
use ethexe_sequencer::Sequencer;
use ethexe_signer::Signer;
use ethexe_validator::Validator;
use futures::StreamExt;
use futures::{lock::Mutex, StreamExt};
use gear_core::ids::prelude::*;
use gprimitives::{ActorId, CodeId, MessageId, H160, H256};
use std::{sync::Arc, time::Duration};
use std::{
sync::Arc,
time::{Duration, SystemTime},
};
use tokio::{
sync::{
mpsc::{self, Receiver},
Expand Down Expand Up @@ -154,6 +157,7 @@ struct TestEnv {
sender_address: ActorId,
block_time: Duration,
running_service_handle: Option<JoinHandle<Result<()>>>,
service_status: Option<Arc<Mutex<Status>>>,
}

impl TestEnv {
Expand Down Expand Up @@ -233,6 +237,7 @@ impl TestEnv {
sender_address: ActorId::from(H160::from(sender_address.0)),
block_time,
running_service_handle: None,
service_status: None,
};

Ok(env)
Expand Down Expand Up @@ -290,13 +295,11 @@ impl TestEnv {
None,
);

self.service_status = Some(service.status());

let handle = task::spawn(service.run());
self.running_service_handle = Some(handle);

// Sleep to wait for the new service to start
// TODO: find a better way to wait for the service to start #4099
tokio::time::sleep(Duration::from_secs(1)).await;

self.service_initialized().await?;
Ok(())
}

Expand Down Expand Up @@ -325,6 +328,26 @@ impl TestEnv {

Ok((tx_hash, code_id))
}

/// Wait for service initialized
pub async fn service_initialized(&self) -> Result<()> {
let Some(status) = &self.service_status else {
return Err(anyhow!("Service not start"));
};

let now = SystemTime::now();
loop {
if status.lock().await.active() {
return Ok(());
}

if now.elapsed()? > self.block_time {
break;
}
}

Err(anyhow!("Service initialization timed out."))
}
}

impl Drop for TestEnv {
Expand Down Expand Up @@ -664,9 +687,9 @@ async fn ping_reorg() {
.await
.unwrap();

// Await for service block with user reply handling
// TODO: this is for better logs reading only, should find a better solution #4099
tokio::time::sleep(env.block_time).await;
env.service_initialized()
.await
.expect("service uninitalized");

log::info!("📗 Reverting to the program creation snapshot");
provider
Expand Down Expand Up @@ -748,10 +771,9 @@ async fn ping_reorg() {
.await
.unwrap();

// Await for service block with user reply handling
// TODO: this is for better logs reading only, should find a better solution #4099
tokio::time::sleep(Duration::from_secs(1)).await;

env.service_initialized()
.await
.expect("service uninitalized");
log::info!("📗 Done");
}

Expand Down
Loading