Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(agents): chain-centric watcher #224

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 0 additions & 30 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ members = [
"agents/kathy",
"agents/updater",
"agents/relayer",
"agents/watcher",
# "agents/watcher",
"agents/processor",
"tools/kms-cli",
"tools/nomad-cli",
Expand Down
3 changes: 2 additions & 1 deletion agents/kathy/src/kathy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,13 @@ impl NomadAgent for Kathy {
}

fn build_channel(&self, replica: &str) -> Self::Channel {
let home = self.connections().home().expect("!home");
Self::Channel {
base: self.channel_base(replica),
home_lock: self.home_lock.clone(),
generator: self.generator.clone(),
messages_dispatched: self.messages_dispatched.with_label_values(&[
self.home().name(),
home.name(),
replica,
Self::AGENT_NAME,
]),
Expand Down
14 changes: 9 additions & 5 deletions agents/processor/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,10 +357,11 @@ impl NomadAgent for Processor {
}

fn build_channel(&self, replica: &str) -> Self::Channel {
let home = self.connections().home().expect("!home");
Self::Channel {
base: self.channel_base(replica),
next_message_nonce: self.next_message_nonces.with_label_values(&[
self.home().name(),
home.name(),
replica,
Self::AGENT_NAME,
]),
Expand Down Expand Up @@ -395,15 +396,16 @@ impl NomadAgent for Processor {
self.assert_home_not_failed().await??;

info!("Starting Processor tasks");
let home = self.connections().home().expect("!home");

// tree sync
info!("Starting ProverSync");
let db = NomadDB::new(self.home().name(), self.db());
let db = NomadDB::new(home.name(), self.db());
let sync = ProverSync::from_disk(db.clone());
let prover_sync_task = sync.spawn();

info!("Starting indexer");
let home_sync_task = self.home().sync();
let home_sync_task = home.sync();

let home_fail_watch_task = self.watch_home_fail(self.interval);

Expand All @@ -415,10 +417,11 @@ impl NomadAgent for Processor {
if !self.subsidized_remotes.is_empty() {
// Get intersection of specified remotes (replicas in settings)
// and subsidized remotes
let replicas = self.connections().replicas().expect("!replicas");
let specified_subsidized: Vec<&str> = self
.subsidized_remotes
.iter()
.filter(|r| self.replicas().contains_key(*r))
.filter(|r| replicas.contains_key(*r))
.map(AsRef::as_ref)
.collect();

Expand All @@ -430,7 +433,8 @@ impl NomadAgent for Processor {
// if we have a bucket, add a task to push to it
if let Some(config) = &self.config {
info!(bucket = %config.bucket, "Starting S3 push tasks");
let pusher = Pusher::new(self.core.home.name(), &config.bucket, db.clone()).await;
let home = self.connections().home().expect("!home");
let pusher = Pusher::new(home.name(), &config.bucket, db.clone()).await;
tasks.push(pusher.spawn())
}

Expand Down
10 changes: 5 additions & 5 deletions agents/relayer/src/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,11 @@ impl NomadAgent for Relayer {
}

fn build_channel(&self, replica: &str) -> Self::Channel {
let home = self.connections().home().expect("!home");
Self::Channel {
base: self.channel_base(replica),
updates_relayed_count: self.updates_relayed_counts.with_label_values(&[
self.home().name(),
home.name(),
replica,
Self::AGENT_NAME,
]),
Expand Down Expand Up @@ -195,8 +196,8 @@ impl NomadAgent for Relayer {
mod test {
use ethers::prelude::{ProviderError, H256};
use nomad_base::{
chains::PageSettings, CommonIndexers, ContractSync, ContractSyncMetrics, CoreMetrics,
HomeIndexers, IndexSettings, NomadDB,
chains::PageSettings, AgentConnections, CommonIndexers, ContractSync, ContractSyncMetrics,
CoreMetrics, HomeIndexers, IndexSettings, NomadDB,
};
use nomad_core::ChainCommunicationError;
use nomad_test::mocks::{MockHomeContract, MockIndexer, MockReplicaContract};
Expand Down Expand Up @@ -293,8 +294,7 @@ mod test {

// Setting agent
let core = AgentCore {
home,
replicas,
connections: AgentConnections::Default { home, replicas },
db,
metrics,
indexer: IndexSettings::default(),
Expand Down
10 changes: 6 additions & 4 deletions agents/updater/src/updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ impl Updater {
finalization_seconds: u64,
core: AgentCore,
) -> Self {
let home_name = core.home.name();
let home = core.connections.home().expect("!home");
let home_name = home.name();
let signed_attestation_count = core
.metrics
.new_int_counter(
Expand Down Expand Up @@ -73,9 +74,10 @@ impl Updater {

impl From<&Updater> for UpdaterChannel {
fn from(updater: &Updater) -> Self {
let home = updater.connections().home().expect("!home");
UpdaterChannel {
home: updater.home(),
db: NomadDB::new(updater.home().name(), updater.db()),
home: home.clone(),
db: NomadDB::new(home.name(), updater.db()),
signer: updater.signer.clone(),
signed_attestation_count: updater.signed_attestation_count.clone(),
submitted_update_count: updater.submitted_update_count.clone(),
Expand Down Expand Up @@ -199,7 +201,7 @@ impl NomadAgent for Updater {
let home_fail_watch_task = self.watch_home_fail(self.interval_seconds);

info!("Starting updater sync task...");
let sync_task = self.home().sync();
let sync_task = self.connections().home().expect("!home").sync();

// Run a single error-catching task for producing and submitting
// updates. While we use the agent channel pattern, this run task
Expand Down
85 changes: 61 additions & 24 deletions nomad-base/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,55 @@ use tokio::{task::JoinHandle, time::sleep};

const MAX_EXPONENTIAL: u32 = 7; // 2^7 = 128 second timeout

/// General or agent-specific connection map
#[derive(Debug, Clone)]
pub enum AgentConnections {
/// Connections for watchers
Watcher {
/// A map of boxed Homes
homes: HashMap<String, Arc<CachingHome>>,
// ...
},
/// Connections for other agents
Default {
/// A boxed Home
home: Arc<CachingHome>,
/// A map of boxed Replicas
replicas: HashMap<String, Arc<CachingReplica>>,
},
}

/// Accessor methods for AgentConnections
impl AgentConnections {
/// Get an optional clone of home
pub fn home(&self) -> Option<Arc<CachingHome>> {
use AgentConnections::*;
match self {
Default { home, .. } => Some(home.clone()),
_ => None,
}
}

/// Get an optional clone of the map of replicas
pub fn replicas(&self) -> Option<HashMap<String, Arc<CachingReplica>>> {
use AgentConnections::*;
match self {
Default { replicas, .. } => Some(replicas.clone()),
_ => None,
}
}

/// Get an optional clone of a replica by its name
pub fn replica_by_name(&self, name: &str) -> Option<Arc<CachingReplica>> {
self.replicas().and_then(|r| r.get(name).map(Clone::clone))
}
}

/// Properties shared across all agents
#[derive(Debug, Clone)]
pub struct AgentCore {
/// A boxed Home
pub home: Arc<CachingHome>,
/// A map of boxed Replicas
pub replicas: HashMap<String, Arc<CachingReplica>>,
/// Agent connections
pub connections: AgentConnections,
/// A persistent KV Store (currently implemented as rocksdb)
pub db: DB,
/// Prometheus metrics
Expand Down Expand Up @@ -78,10 +120,14 @@ pub trait NomadAgent: Send + Sync + Sized + std::fmt::Debug + AsRef<AgentCore> {

/// Build channel base for home <> replica channel
fn channel_base(&self, replica: &str) -> ChannelBase {
let home = self.connections().home().expect("!home");
ChannelBase {
home: self.home(),
replica: self.replica_by_name(replica).expect("!replica exist"),
db: NomadDB::new(self.home().name(), self.db()),
home: home.clone(),
replica: self
.connections()
.replica_by_name(replica)
.expect("!replica exist"),
db: NomadDB::new(home.name(), self.db()),
}
}

Expand All @@ -95,19 +141,9 @@ pub trait NomadAgent: Send + Sync + Sized + std::fmt::Debug + AsRef<AgentCore> {
self.as_ref().db.clone()
}

/// Return a reference to a home contract
fn home(&self) -> Arc<CachingHome> {
self.as_ref().home.clone()
}

/// Get a reference to the replicas map
fn replicas(&self) -> &HashMap<String, Arc<CachingReplica>> {
&self.as_ref().replicas
}

/// Get a reference to a replica by its name
fn replica_by_name(&self, name: &str) -> Option<Arc<CachingReplica>> {
self.replicas().get(name).map(Clone::clone)
/// Return a reference to the connections object
fn connections(&self) -> &AgentConnections {
&self.as_ref().connections
}

/// Run the agent with the given home and replica
Expand Down Expand Up @@ -200,7 +236,8 @@ pub trait NomadAgent: Send + Sync + Sized + std::fmt::Debug + AsRef<AgentCore> {
let span = info_span!("run_all");
tokio::spawn(async move {
// this is the unused must use
let names: Vec<&str> = self.replicas().keys().map(|k| k.as_str()).collect();
let replicas = self.connections().replicas().expect("!replicas");
let names: Vec<&str> = replicas.keys().map(|k| k.as_str()).collect();

// quick check that at least 1 replica is configured
names
Expand All @@ -214,7 +251,7 @@ pub trait NomadAgent: Send + Sync + Sized + std::fmt::Debug + AsRef<AgentCore> {
if Self::AGENT_NAME != "kathy" {
// Only the processor needs to index messages so default is
// just indexing updates
let sync_task = self.home().sync();
let sync_task = self.connections().home().expect("!home").sync();

tasks.push(sync_task);
}
Expand All @@ -236,7 +273,7 @@ pub trait NomadAgent: Send + Sync + Sized + std::fmt::Debug + AsRef<AgentCore> {
#[allow(clippy::unit_arg)]
fn watch_home_fail(&self, interval: u64) -> Instrumented<JoinHandle<Result<()>>> {
let span = info_span!("home_watch");
let home = self.home();
let home = self.connections().home().expect("!home");
let home_failure_checks = self.metrics().home_failure_checks();
let home_failure_observations = self.metrics().home_failure_observations();

Expand All @@ -259,7 +296,7 @@ pub trait NomadAgent: Send + Sync + Sized + std::fmt::Debug + AsRef<AgentCore> {
fn assert_home_not_failed(&self) -> Instrumented<JoinHandle<Result<()>>> {
use nomad_core::Common;
let span = info_span!("check_home_state");
let home = self.home();
let home = self.connections().home().expect("!home");
tokio::spawn(async move {
if home.state().await? == nomad_core::State::Failed {
Err(BaseError::FailedHome.into())
Expand Down
8 changes: 4 additions & 4 deletions nomad-base/src/settings/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
//! corresponding env file and/or secrets.json file.

use crate::{
agent::AgentCore, CachingHome, CachingReplica, CommonIndexerVariants, CommonIndexers,
ContractSync, ContractSyncMetrics, HomeIndexerVariants, HomeIndexers, Homes, NomadDB, Replicas,
agent::AgentCore, AgentConnections, CachingHome, CachingReplica, CommonIndexerVariants,
CommonIndexers, ContractSync, ContractSyncMetrics, HomeIndexerVariants, HomeIndexers, Homes,
NomadDB, Replicas,
};
use color_eyre::{eyre::bail, Result};
use nomad_core::{db::DB, Common, ContractLocator};
Expand Down Expand Up @@ -419,8 +420,7 @@ impl Settings {
.await?;

Ok(AgentCore {
home,
replicas,
connections: AgentConnections::Default { home, replicas },
db,
settings: self.clone(),
metrics,
Expand Down