Skip to content

Commit

Permalink
refactor: break tools out from monitor into agent-utils
Browse files Browse the repository at this point in the history
  • Loading branch information
prestwich committed Aug 10, 2022
1 parent 24347f7 commit 7afda1e
Show file tree
Hide file tree
Showing 24 changed files with 344 additions and 284 deletions.
18 changes: 18 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ resolver = "2"

members = [
"accumulator",
"agent-utils",
"nomad-types",
"nomad-core",
"nomad-base",
Expand Down
26 changes: 26 additions & 0 deletions agent-utils/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[package]
name = "agent-utils"
version = "0.1.0"
edition = "2021"
authors = ["James Prestwich <[email protected]>"]
description = "Utils for building better agents"
repository = "https://github.com/nomad-xyz/rust"
license = "MIT OR Apache-2.0"
keywords = ["Ethereum", "Nomad"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "1.0.1", features = ["rt", "macros"] }
tracing = "0.1.35"
ethers = { git = "https://github.com/gakonst/ethers-rs", branch = "master", default-features = false }

tracing-subscriber = "0.2.15"
eyre = "0.6.8"
warp = "0.3.2"
async-trait = "0.1.56"
futures-util = "0.3.21"
tracing-test = "0.2.3"

nomad-core = { path = "../nomad-core", default-features = false }
nomad-xyz-configuration = { version = "0.1.0-rc.25", path = "../configuration" }
11 changes: 11 additions & 0 deletions agent-utils/src/aliases.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use std::collections::HashMap;

use tokio::{sync::mpsc, task::JoinHandle};

pub type Restartable<Task> = JoinHandle<crate::TaskResult<Task>>;

pub type Faucet<T> = mpsc::UnboundedReceiver<T>;
pub type Sink<T> = mpsc::UnboundedSender<T>;

pub type NetworkMap<'a, T> = HashMap<&'a str, T>;
pub type HomeReplicaMap<'a, T> = HashMap<&'a str, std::collections::HashMap<&'a str, T>>;
1 change: 1 addition & 0 deletions agent-utils/src/init.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

181 changes: 181 additions & 0 deletions agent-utils/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
pub mod aliases;
pub mod init;
pub mod macros;
pub mod pipe;
pub mod utils;

use std::panic;

pub use aliases::*;

use tokio::task::JoinHandle;

#[derive(Debug)]
pub enum TaskResult<T> {
Recoverable {
task: T,
err: eyre::Report,
},
Unrecoverable {
err: eyre::Report,
worth_logging: bool,
},
}

pub trait ProcessStep: std::fmt::Display {
fn spawn(self) -> Restartable<Self>
where
Self: 'static + Send + Sync + Sized;

/// Run the task until it panics. Errors result in a task restart with the
/// same channels. This means that an error causes the task to lose only
/// the data that is in-scope when it faults.
fn run_until_panic(self) -> JoinHandle<()>
where
Self: 'static + Send + Sync + Sized,
{
let task_description = format!("{}", self);
tokio::spawn(async move {
let mut handle = self.spawn();
loop {
let result = handle.await;

let again = match result {
Ok(TaskResult::Recoverable { task, err }) => {
tracing::warn!(
error = %err,
task = task_description.as_str(),
"Restarting task",
);
task
}

Ok(TaskResult::Unrecoverable { err, worth_logging }) => {
if worth_logging {
tracing::error!(err = %err, task = task_description.as_str(), "Unrecoverable error encountered");
} else {
tracing::trace!(err = %err, task = task_description.as_str(), "Unrecoverable error encountered");
}
break;
}

Err(e) => {
let panic_res = e.try_into_panic();

if panic_res.is_err() {
tracing::trace!(
task = task_description.as_str(),
"Internal task cancelled",
);
break;
}
let p = panic_res.unwrap();
tracing::error!(task = task_description.as_str(), "Internal task panicked");
panic::resume_unwind(p);
}
};

utils::noisy_sleep(15_000).await;
handle = again.spawn();
}
})
}
}

#[cfg(test)]
mod test {
use crate::{ProcessStep, TaskResult};

struct RecoverableTask;
impl std::fmt::Display for RecoverableTask {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "RecoverableTask")
}
}

impl ProcessStep for RecoverableTask {
fn spawn(self) -> crate::Restartable<Self>
where
Self: 'static + Send + Sync + Sized,
{
tokio::spawn(async move {
TaskResult::Recoverable {
task: self,
err: eyre::eyre!("This error was recoverable"),
}
})
}
}

#[tokio::test]
#[tracing_test::traced_test]
async fn test_recovery() {
let handle = RecoverableTask.run_until_panic();
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
handle.abort();
let result = handle.await;

assert!(logs_contain("RecoverableTask"));
assert!(logs_contain("Restarting task"));
assert!(logs_contain("This error was recoverable"));
assert!(result.is_err() && result.unwrap_err().is_cancelled());
}

struct UnrecoverableTask;
impl std::fmt::Display for UnrecoverableTask {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "UnrecoverableTask")
}
}

impl ProcessStep for UnrecoverableTask {
fn spawn(self) -> crate::Restartable<Self>
where
Self: 'static + Send + Sync + Sized,
{
tokio::spawn(async move {
TaskResult::Unrecoverable {
err: eyre::eyre!("This error was unrecoverable"),
worth_logging: true,
}
})
}
}

#[tokio::test]
#[tracing_test::traced_test]
async fn test_unrecoverable() {
let handle = UnrecoverableTask.run_until_panic();
let result = handle.await;
assert!(logs_contain("UnrecoverableTask"));
assert!(logs_contain("Unrecoverable error encountered"));
assert!(logs_contain("This error was unrecoverable"));
assert!(result.is_ok());
}

struct PanicTask;
impl std::fmt::Display for PanicTask {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "PanicTask")
}
}

impl ProcessStep for PanicTask {
fn spawn(self) -> crate::Restartable<Self>
where
Self: 'static + Send + Sync + Sized,
{
tokio::spawn(async move { panic!("intentional panic :)") })
}
}

#[tokio::test]
#[tracing_test::traced_test]
async fn test_panic() {
let handle = PanicTask.run_until_panic();
let result = handle.await;
assert!(logs_contain("PanicTask"));
assert!(logs_contain("Internal task panicked"));
assert!(result.is_err() && result.unwrap_err().is_panic());
}
}
8 changes: 4 additions & 4 deletions agents/monitor/src/macros.rs → agent-utils/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ macro_rules! bail_task_if {
($cond:expr, $self:ident, $err:expr) => {
if $cond {
let err = eyre::eyre!($err);
return $crate::steps::TaskResult::Recoverable { task: $self, err };
return $crate::TaskResult::Recoverable { task: $self, err };
}
};
}
Expand All @@ -22,7 +22,7 @@ macro_rules! unwrap_channel_item_unrecoverable {
tracing::debug!(
task = %$self, "inbound channel broke"
);
return $crate::steps::TaskResult::Unrecoverable{err: eyre::eyre!("inbound channel broke"), worth_logging: false}
return $crate::TaskResult::Unrecoverable{err: eyre::eyre!("inbound channel broke"), worth_logging: false}
}
$channel_item.unwrap()
}};
Expand All @@ -38,7 +38,7 @@ macro_rules! unwrap_pipe_item_unrecoverable {
tracing::debug!(
task = %$self, "inbound pipe broke"
);
return $crate::steps::TaskResult::Unrecoverable{err: eyre::eyre!("inbound pipe broke"), worth_logging: false}
return $crate::TaskResult::Unrecoverable{err: eyre::eyre!("inbound pipe broke"), worth_logging: false}
}
$pipe_item.unwrap()
}};
Expand All @@ -59,7 +59,7 @@ macro_rules! unwrap_result_recoverable {
macro_rules! send_unrecoverable {
($tx:expr, $item:expr, $self:ident) => {
if $tx.send($item).is_err() {
return $crate::steps::TaskResult::Unrecoverable {
return $crate::TaskResult::Unrecoverable {
err: eyre::eyre!("outbound channel broke"),
worth_logging: false,
};
Expand Down
42 changes: 42 additions & 0 deletions agent-utils/src/pipe.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use std::fmt::Debug;

use eyre::bail;

use crate::aliases::*;

#[derive(Debug)]
pub struct Pipe<T> {
rx: Faucet<T>,
tx: Sink<T>,
contents: Option<T>,
}

impl<T> Pipe<T>
where
T: Debug + Send + Sync + 'static,
{
pub fn new(rx: Faucet<T>, tx: Sink<T>, contents: Option<T>) -> Self {
Self { rx, tx, contents }
}

pub fn read(&self) -> Option<&T> {
self.contents.as_ref()
}

pub fn finish(&mut self) -> eyre::Result<()> {
if let Some(contents) = self.contents.take() {
self.tx.send(contents)?;
}
Ok(())
}

pub async fn next(&mut self) -> eyre::Result<&T> {
self.finish()?;

self.contents = self.rx.recv().await;
if self.contents.is_none() {
bail!("rx broke")
}
Ok(self.read().expect("checked"))
}
}
10 changes: 9 additions & 1 deletion agents/monitor/src/utils.rs → agent-utils/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::{collections::HashMap, pin::Pin};

use ethers::prelude::rand::{rngs::ThreadRng, Rng};
use tokio::sync::mpsc::UnboundedReceiver;

// polls all channels in a hashmap
pub(crate) fn nexts<K: ToOwned, T>(
pub fn nexts<K: ToOwned, T>(
map: &mut HashMap<K, UnboundedReceiver<T>>,
) -> Vec<Pin<Box<impl std::future::Future<Output = (<K as ToOwned>::Owned, Option<T>)> + '_>>> {
map.iter_mut()
Expand All @@ -15,3 +16,10 @@ pub(crate) fn nexts<K: ToOwned, T>(
.map(Box::pin)
.collect()
}

// adds up to a second of random delay to cause production tasks to not be synced
pub fn noisy_sleep(approx_millis: u64) -> tokio::time::Sleep {
let noise = ThreadRng::default().gen_range(0..1000u64);
let duration = std::time::Duration::from_millis(approx_millis + noise);
tokio::time::sleep(duration)
}
4 changes: 3 additions & 1 deletion agents/monitor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ tracing = "0.1.35"
ethers = { git = "https://github.com/gakonst/ethers-rs", branch = "master", default-features = false }

nomad-core = { path = "../../nomad-core", default-features = false }
agent-utils = { path = "../../agent-utils" }
nomad-xyz-configuration = { version = "0.1.0-rc.25", path = "../../configuration" }
nomad-ethereum = { version = "0.1.0", path = "../../chains/nomad-ethereum" }

tracing-subscriber = "0.2.15"
eyre = "0.6.8"
nomad-ethereum = { version = "0.1.0", path = "../../chains/nomad-ethereum" }
warp = "0.3.2"
async-trait = "0.1.56"
futures-util = "0.3.21"
Expand Down
3 changes: 2 additions & 1 deletion agents/monitor/src/domain.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{collections::HashMap, sync::Arc};

use agent_utils::ProcessStep;
use nomad_ethereum::bindings::{home::Home, replica::Replica};
use nomad_xyz_configuration::{contracts::CoreContracts, NomadConfig};
use tokio::sync::mpsc::unbounded_channel;
Expand All @@ -15,7 +16,7 @@ use crate::{
relay_wait::RelayWait,
update_wait::UpdateWait,
},
DispatchFaucet, ProcessFaucet, ProcessStep, Provider, RelayFaucet, UpdateFaucet,
DispatchFaucet, ProcessFaucet, Provider, RelayFaucet, UpdateFaucet,
};

#[derive(Debug)]
Expand Down
Loading

0 comments on commit 7afda1e

Please sign in to comment.