From 1cdf1a545cff431b73929eff0c39b08f22b7c40c Mon Sep 17 00:00:00 2001 From: N Date: Sat, 17 Feb 2024 19:30:31 -0500 Subject: [PATCH] feat: tx events + flk rpc subscriber --- Cargo.lock | 4 +- core/application/src/env.rs | 8 +++ core/blockstore/src/lib.rs | 1 + core/consensus/src/consensus.rs | 22 +++++++-- core/consensus/src/execution.rs | 31 +++++++++++- core/indexer/src/tests.rs | 1 + core/interfaces/src/consensus.rs | 10 +++- core/interfaces/src/rpc.rs | 4 ++ core/mock/src/consensus.rs | 4 +- core/origin-demuxer/src/tests.rs | 1 + core/origin-http/src/tests.rs | 1 + core/origin-ipfs/src/tests.rs | 1 + core/pinger/src/tests.rs | 1 + core/rep-collector/src/tests.rs | 2 + core/rpc/Cargo.toml | 4 +- core/rpc/src/api/flk.rs | 7 ++- core/rpc/src/event.rs | 72 +++++++++++++++++++++++++++ core/rpc/src/lib.rs | 48 +++++++++--------- core/rpc/src/logic/flk_impl.rs | 35 ++++++++++++- core/rpc/src/tests.rs | 51 +++++++++++++++++++ core/signer/src/tests.rs | 5 ++ core/test-utils/src/consensus.rs | 4 +- core/test-utils/src/transaction.rs | 3 ++ core/types/src/application.rs | 79 +++++++++++++++++++++++++++++- core/types/src/response.rs | 4 +- core/types/src/state.rs | 10 ++++ core/types/src/transaction.rs | 27 ++++++++++ 27 files changed, 397 insertions(+), 43 deletions(-) create mode 100644 core/rpc/src/event.rs diff --git a/Cargo.lock b/Cargo.lock index 5ea2ff3ba..8b3072ce0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2808,7 +2808,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "330c60081dcc4c72131f8eb70510f1ac07223e5d4163db481a04a0befcffa412" dependencies = [ - "libloading 0.7.4", + "libloading 0.8.1", ] [[package]] @@ -6128,10 +6128,10 @@ dependencies = [ "anyhow", "async-trait", "autometrics", - "axum 0.6.20", "clap 4.4.12", "ethers", "fleek-crypto", + "futures", "hp-fixed", "hyper 0.14.28", "infusion", diff --git a/core/application/src/env.rs b/core/application/src/env.rs index c290f31d2..79fce685f 100644 --- a/core/application/src/env.rs +++ b/core/application/src/env.rs @@ -175,6 +175,13 @@ impl Env { response.change_epoch = true; } + let mut event = None; + if let TransactionResponse::Success(_) = results { + if let Some(e) = txn.event() { + event = Some(e); + } + } + let receipt = TransactionReceipt { block_hash: block.digest, block_number, @@ -183,6 +190,7 @@ impl Env { from: txn.sender(), to: txn.to(), response: results, + event, }; /* Todo(dalton): Check if the transaction resulted in the committee change(Like a current validator getting slashed) if so acknowledge that in the block response diff --git a/core/blockstore/src/lib.rs b/core/blockstore/src/lib.rs index a78d0f39f..9e8adf822 100644 --- a/core/blockstore/src/lib.rs +++ b/core/blockstore/src/lib.rs @@ -182,6 +182,7 @@ mod tests { transactions_to_lose: HashSet::new(), new_block_interval: Duration::from_secs(5), }; + let consensus = MockConsensus::::init( consensus_config, &signer, diff --git a/core/consensus/src/consensus.rs b/core/consensus/src/consensus.rs index 35f6f19f8..eb6427827 100644 --- a/core/consensus/src/consensus.rs +++ b/core/consensus/src/consensus.rs @@ -1,6 +1,6 @@ use std::collections::BTreeMap; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime}; use affair::{Executor, TokioSpawn}; @@ -12,7 +12,7 @@ use lightning_interfaces::config::ConfigConsumer; use lightning_interfaces::consensus::{ConsensusInterface, MempoolSocket}; use lightning_interfaces::infu_collection::{c, Collection}; use lightning_interfaces::signer::{SignerInterface, SubmitTxSocket}; -use lightning_interfaces::types::{Epoch, EpochInfo, UpdateMethod}; +use lightning_interfaces::types::{Epoch, EpochInfo, Event, UpdateMethod}; use lightning_interfaces::{ ApplicationInterface, BroadcastInterface, @@ -33,7 +33,7 @@ use narwhal_node::NodeStorage; use prometheus::Registry; use resolved_pathbuf::ResolvedPathBuf; use serde::{Deserialize, Serialize}; -use tokio::sync::{mpsc, Mutex, Notify}; +use tokio::sync::{mpsc, Notify}; use tokio::{pin, select, task, time}; use tracing::{error, info}; use typed_store::DBMetrics; @@ -316,6 +316,10 @@ impl + 'static, NE: Emitter> pub fn shutdown(&self) { self.execution_state.shutdown(); } + + fn set_event_tx(&mut self, tx: tokio::sync::mpsc::Sender>) { + self.execution_state.set_event_tx(tx); + } } impl WithStartAndShutdown for Consensus { @@ -333,9 +337,10 @@ impl WithStartAndShutdown for Consensus { let mut epoch_state = self .epoch_state .lock() - .await + .expect("Mutex poisened") .take() .expect("Consensus was tried to start before initialization"); + self.is_running.store(true, Ordering::Relaxed); task::spawn(async move { @@ -453,6 +458,15 @@ impl ConsensusInterface for Consensus { fn mempool(&self) -> MempoolSocket { self.mempool_socket.clone() } + + fn set_event_tx(&mut self, tx: tokio::sync::mpsc::Sender>) { + self.epoch_state + .lock() + .expect("Mutex poisened") + .as_mut() + .expect("Consensus was tried to start before initialization") + .set_event_tx(tx); + } } #[derive(Debug, Serialize, Deserialize, Clone, IsVariant, From, TryInto)] diff --git a/core/consensus/src/execution.rs b/core/consensus/src/execution.rs index 5a41803dc..1dea83965 100644 --- a/core/consensus/src/execution.rs +++ b/core/consensus/src/execution.rs @@ -1,9 +1,9 @@ -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; use async_trait::async_trait; use fastcrypto::hash::HashFunction; use fleek_blake3 as blake3; -use lightning_interfaces::types::{Block, Epoch, Metadata, NodeIndex, TransactionRequest}; +use lightning_interfaces::types::{Block, Epoch, Event, Metadata, NodeIndex, TransactionRequest}; use lightning_interfaces::{ Emitter, ExecutionEngineSocket, @@ -75,6 +75,8 @@ pub struct Execution { index_socket: Option, /// Notifications emitter notifier: NE, + /// Send the event to the RPC + event_tx: OnceLock>>, } impl Execution { @@ -93,6 +95,7 @@ impl Execution { query_runner, index_socket, notifier, + event_tx: OnceLock::new(), } } @@ -124,6 +127,26 @@ impl Execution { let results = self.executor.run(block).await.unwrap(); info!("Consensus submitted new block to application"); + match self.event_tx.get() { + Some(tx) => { + if let Err(e) = tx + .send( + results + .txn_receipts + .iter() + .filter_map(|r| r.event.clone()) + .collect(), + ) + .await + { + error!("We could not send a message to the RPC: {e}"); + } + }, + None => { + error!("Once Cell not initialized, this is a bug"); + }, + } + if results.change_epoch { change_epoch = true; self.notifier.epoch_changed(); @@ -162,6 +185,10 @@ impl Execution { self.notifier.shutdown(); self.executor.downgrade(); } + + pub fn set_event_tx(&self, tx: mpsc::Sender>) { + self.event_tx.set(tx).unwrap(); + } } #[async_trait] diff --git a/core/indexer/src/tests.rs b/core/indexer/src/tests.rs index 9f81c3515..fdd0372e3 100644 --- a/core/indexer/src/tests.rs +++ b/core/indexer/src/tests.rs @@ -124,6 +124,7 @@ async fn test_submission() { transactions_to_lose: HashSet::new(), new_block_interval: Duration::from_secs(5), }; + let consensus = MockConsensus::::init( consensus_config, &signer, diff --git a/core/interfaces/src/consensus.rs b/core/interfaces/src/consensus.rs index 15a2bd1c6..ef6b2c135 100644 --- a/core/interfaces/src/consensus.rs +++ b/core/interfaces/src/consensus.rs @@ -7,7 +7,7 @@ use crate::common::WithStartAndShutdown; use crate::config::ConfigConsumer; use crate::infu_collection::Collection; use crate::signer::SignerInterface; -use crate::types::TransactionRequest; +use crate::types::{Event, TransactionRequest}; use crate::{ ApplicationInterface, ArchiveInterface, @@ -15,6 +15,7 @@ use crate::{ ConfigProviderInterface, IndexSocket, NotifierInterface, + RpcInterface, }; /// A socket that gives services and other sub-systems the required functionality to @@ -54,9 +55,14 @@ pub trait ConsensusInterface: ) } + fn _post(&mut self, rpc: ::RpcInterface) { + self.set_event_tx(rpc.event_tx()); + } + type Certificate: LightningMessage + Clone; /// Create a new consensus service with the provided config and executor. + #[allow(clippy::too_many_arguments)] fn init>( config: Self::Config, signer: &S, @@ -72,4 +78,6 @@ pub trait ConsensusInterface: /// transaction to the consensus. #[blank = Socket::raw_bounded(64).0] fn mempool(&self) -> MempoolSocket; + + fn set_event_tx(&mut self, tx: tokio::sync::mpsc::Sender>); } diff --git a/core/interfaces/src/rpc.rs b/core/interfaces/src/rpc.rs index e0fe90638..14f262ab5 100644 --- a/core/interfaces/src/rpc.rs +++ b/core/interfaces/src/rpc.rs @@ -1,9 +1,11 @@ use infusion::c; +use tokio::sync::mpsc; use crate::common::WithStartAndShutdown; use crate::config::ConfigConsumer; use crate::consensus::MempoolSocket; use crate::infu_collection::Collection; +use crate::types::Event; use crate::{ ApplicationInterface, ArchiveInterface, @@ -51,4 +53,6 @@ pub trait RpcInterface: signer: &C::SignerInterface, archive_socket: Option>, ) -> anyhow::Result; + + fn event_tx(&self) -> mpsc::Sender>; } diff --git a/core/mock/src/consensus.rs b/core/mock/src/consensus.rs index f19a74088..fb4b5978f 100644 --- a/core/mock/src/consensus.rs +++ b/core/mock/src/consensus.rs @@ -9,7 +9,7 @@ use axum::extract::State; use axum::routing::post; use axum::{Json, Router}; use lightning_interfaces::infu_collection::{c, Collection}; -use lightning_interfaces::types::{Block, TransactionRequest}; +use lightning_interfaces::types::{Block, Event, TransactionRequest}; use lightning_interfaces::{ ApplicationInterface, BroadcastInterface, @@ -184,6 +184,8 @@ impl ConsensusInterface for MockConsensus { fn mempool(&self) -> MempoolSocket { self.mempool.clone() } + + fn set_event_tx(&mut self, _tx: mpsc::Sender>) {} } impl ConfigConsumer for MockConsensus { diff --git a/core/origin-demuxer/src/tests.rs b/core/origin-demuxer/src/tests.rs index e9f6575ab..db0080442 100644 --- a/core/origin-demuxer/src/tests.rs +++ b/core/origin-demuxer/src/tests.rs @@ -154,6 +154,7 @@ async fn create_app_state(test_name: String) -> AppState { transactions_to_lose: HashSet::new(), new_block_interval: Duration::from_secs(5), }; + let consensus = MockConsensus::::init( consensus_config, &signer, diff --git a/core/origin-http/src/tests.rs b/core/origin-http/src/tests.rs index afd141ad1..619e0483b 100644 --- a/core/origin-http/src/tests.rs +++ b/core/origin-http/src/tests.rs @@ -152,6 +152,7 @@ async fn create_app_state(test_name: String) -> AppState { transactions_to_lose: HashSet::new(), new_block_interval: Duration::from_secs(5), }; + let consensus = MockConsensus::::init( consensus_config, &signer, diff --git a/core/origin-ipfs/src/tests.rs b/core/origin-ipfs/src/tests.rs index 1ee6fb3c6..41b74d827 100644 --- a/core/origin-ipfs/src/tests.rs +++ b/core/origin-ipfs/src/tests.rs @@ -151,6 +151,7 @@ async fn create_app_state(test_name: String) -> AppState { transactions_to_lose: HashSet::new(), new_block_interval: Duration::from_secs(5), }; + let consensus = MockConsensus::::init( consensus_config, &signer, diff --git a/core/pinger/src/tests.rs b/core/pinger/src/tests.rs index ea9d06420..b0c5374ae 100644 --- a/core/pinger/src/tests.rs +++ b/core/pinger/src/tests.rs @@ -125,6 +125,7 @@ async fn init_pinger() -> Pinger { transactions_to_lose: HashSet::new(), new_block_interval: Duration::from_secs(5), }; + let consensus = MockConsensus::::init( consensus_config, &signer, diff --git a/core/rep-collector/src/tests.rs b/core/rep-collector/src/tests.rs index b6f68fc84..a10118e5f 100644 --- a/core/rep-collector/src/tests.rs +++ b/core/rep-collector/src/tests.rs @@ -149,6 +149,7 @@ async fn test_query() { transactions_to_lose: HashSet::new(), new_block_interval: Duration::from_secs(5), }; + let consensus = MockConsensus::::init( consensus_config, &signer, @@ -515,6 +516,7 @@ async fn test_reputation_calculation_and_query() { ¬ifier1, ) .unwrap(); + let consensus2 = MockConsensus::::init( consensus_config, &signer2, diff --git a/core/rpc/Cargo.toml b/core/rpc/Cargo.toml index 17d7966eb..0ecffe65b 100644 --- a/core/rpc/Cargo.toml +++ b/core/rpc/Cargo.toml @@ -36,8 +36,8 @@ lightning-openrpc = { path = "../rpc-openrpc" } lightning-openrpc-macros = { path = "../rpc-openrpc-macros" } lightning-utils = { path = "../utils" } alloy-primitives = "0.5.2" -axum.workspace = true -hyper = "0.14.27" +hyper = { version = "0.14.27", features = ["server", "full"] } +futures.workspace = true [dev-dependencies] diff --git a/core/rpc/src/api/flk.rs b/core/rpc/src/api/flk.rs index 4d45ff86b..6ba0324d6 100644 --- a/core/rpc/src/api/flk.rs +++ b/core/rpc/src/api/flk.rs @@ -2,13 +2,15 @@ use std::time::Duration; use fleek_crypto::{EthAddress, NodePublicKey}; use hp_fixed::unsigned::HpUfixed; -use jsonrpsee::core::RpcResult; +use jsonrpsee::core::{RpcResult, SubscriptionResult}; use jsonrpsee::proc_macros::rpc; use lightning_interfaces::types::{ AccountInfo, Blake3Hash, Epoch, EpochInfo, + Event, + EventType, NodeIndex, NodeInfo, NodeInfoWithIndex, @@ -191,4 +193,7 @@ pub trait FleekApi { #[method(name = "metrics")] async fn metrics(&self) -> RpcResult; + + #[subscription(name = "subscribe", item = Event)] + async fn handle_subscription(&self, event_type: Option) -> SubscriptionResult; } diff --git a/core/rpc/src/event.rs b/core/rpc/src/event.rs new file mode 100644 index 000000000..97d5cd310 --- /dev/null +++ b/core/rpc/src/event.rs @@ -0,0 +1,72 @@ +use lightning_types::Event; +use tokio::sync::{broadcast, mpsc, oneshot}; + +pub struct EventDistributor { + /// A sender that receives events from the application layer + event_tx: mpsc::Sender>, + + /// A clonable reciever that can be used to register listeners + broadcast_tx: broadcast::Sender, + + /// Shutdown signal + shutdown: oneshot::Sender<()>, +} + +impl EventDistributor { + pub fn spawn() -> Self { + let (event_tx, mut event_rx) = mpsc::channel(100); + let (broadcast_tx, _) = broadcast::channel(100); + let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); + + let this = Self { + event_tx, + broadcast_tx: broadcast_tx.clone(), + shutdown: shutdown_tx, + }; + + tokio::spawn(async move { + tokio::select! { + _ = Self::forward(&mut event_rx, &broadcast_tx) => { + tracing::info!("consensous dropped the tx") + } + _ = shutdown_rx => { + tracing::trace!("Event recieved shutdown signal") + } + } + }); + + this + } + + async fn forward(rx: &mut mpsc::Receiver>, tx: &broadcast::Sender) { + while let Some(events) = rx.recv().await { + if tx.receiver_count() > 0 { + for event in events { + match tx.send(event) { + Ok(_) => {}, + Err(e) => { + tracing::error!( + "The event broadcast channel failed to send and event {:?}", + e + ) + }, + } + } + } + } + } + + pub async fn shutdown(self) { + let _ = self.shutdown.send(()); + } +} + +impl EventDistributor { + pub fn sender(&self) -> mpsc::Sender> { + self.event_tx.clone() + } + + pub fn register_listener(&self) -> broadcast::Receiver { + self.broadcast_tx.subscribe() + } +} diff --git a/core/rpc/src/lib.rs b/core/rpc/src/lib.rs index 48d092f81..98e9b75d5 100644 --- a/core/rpc/src/lib.rs +++ b/core/rpc/src/lib.rs @@ -7,6 +7,7 @@ use hyper::service::{make_service_fn, service_fn}; use jsonrpsee::server::{stop_channel, Server as JSONRPCServer, ServerHandle}; use jsonrpsee::{Methods, RpcModule}; use lightning_interfaces::infu_collection::{c, Collection}; +use lightning_interfaces::types::Event; use lightning_interfaces::{ ApplicationInterface, ArchiveRequest, @@ -28,13 +29,15 @@ use crate::api::AdminApiServer; pub use crate::api::{EthApiServer, FleekApiServer, NetApiServer}; pub use crate::config::Config; use crate::error::RPCError; +use crate::event::EventDistributor; use crate::logic::AdminApi; pub use crate::logic::{EthApi, FleekApi, NetApi}; pub mod api; -mod api_types; +pub mod api_types; pub mod config; -mod error; +pub mod error; +pub mod event; mod logic; #[cfg(test)] @@ -49,10 +52,10 @@ pub(crate) struct Data { pub consensus_public_key: ConsensusPublicKey, /// If this is some it means the node is in archive mode pub archive_socket: Option>, + pub event_handler: EventDistributor, } impl Data { - #[allow(dead_code)] pub(crate) async fn query_runner( &self, epoch: Option, @@ -92,7 +95,7 @@ pub struct Rpc { // need interior mutability to support restarts handle: Mutex>, - _data: Arc>, + data: Arc>, } async fn health() -> &'static str { @@ -208,19 +211,11 @@ impl WithStartAndShutdown for Rpc { .body(hyper::Body::empty()) } }, - _ => { - if method == hyper::Method::POST { - match json_rpc_service.call(req).await { - Ok(res) => Ok(res), - Err(err) => hyper::Response::builder() - .status(StatusCode::INTERNAL_SERVER_ERROR) - .body(hyper::Body::from(err.to_string())), - } - } else { - hyper::Response::builder() - .status(StatusCode::NOT_FOUND) - .body(hyper::Body::empty()) - } + _ => match json_rpc_service.call(req).await { + Ok(res) => Ok(res), + Err(err) => hyper::Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(hyper::Body::from(err.to_string())), }, } } @@ -229,15 +224,11 @@ impl WithStartAndShutdown for Rpc { }); let addr = self.config.addr(); + let server = hyper::Server::bind(&addr).serve(make_service); + tokio::spawn(async move { - match axum::Server::bind(&addr) - .serve(make_service) - .with_graceful_shutdown(async move { stop.shutdown().await }) - .await - { - Ok(_) => (), - Err(err) => tracing::error!("RPC server error: {}", err), - } + let graceful = server.with_graceful_shutdown(async move { stop.shutdown().await }); + graceful.await.expect("Rpc Server to start"); }); *self.handle.lock().await = Some(server_handle); @@ -278,6 +269,7 @@ impl RpcInterface for Rpc { node_public_key: signer.get_ed25519_pk(), consensus_public_key: signer.get_bls_pk(), archive_socket, + event_handler: EventDistributor::spawn(), }); let module = Self::create_modules_from_config(&config, data.clone())?; @@ -289,9 +281,13 @@ impl RpcInterface for Rpc { module, admin_module, handle: Mutex::new(None), - _data: data, + data, }) } + + fn event_tx(&self) -> tokio::sync::mpsc::Sender> { + self.data.event_handler.sender() + } } impl ConfigConsumer for Rpc { diff --git a/core/rpc/src/logic/flk_impl.rs b/core/rpc/src/logic/flk_impl.rs index 91f33d9ec..aceb77e69 100644 --- a/core/rpc/src/logic/flk_impl.rs +++ b/core/rpc/src/logic/flk_impl.rs @@ -3,7 +3,8 @@ use std::time::Duration; use fleek_crypto::{EthAddress, NodePublicKey}; use hp_fixed::unsigned::HpUfixed; -use jsonrpsee::core::RpcResult; +use jsonrpsee::core::{RpcResult, SubscriptionResult}; +use jsonrpsee::{PendingSubscriptionSink, SubscriptionMessage}; use lightning_interfaces::infu_collection::Collection; use lightning_interfaces::types::{ AccountInfo, @@ -27,6 +28,7 @@ use lightning_interfaces::types::{ Value, }; use lightning_interfaces::{PagingParams, SyncQueryRunnerInterface}; +use lightning_types::EventType; use lightning_utils::application::QueryRunnerExt; use crate::api::FleekApiServer; @@ -422,4 +424,35 @@ impl FleekApiServer for FleekApi { Err(err) => Err(RPCError::custom(err.to_string()).into()), } } + + async fn handle_subscription( + &self, + pending: PendingSubscriptionSink, + event_type: Option, + ) -> SubscriptionResult { + let sink = pending.accept().await?; + + let mut rx = self.data.event_handler.register_listener(); + + while let Ok(event) = rx.recv().await { + if let Some(ref typee) = event_type { + if &event.event_type() != typee { + continue; + } + } + + tracing::trace!(?event, "sending event to subscriber"); + + if sink + .send(SubscriptionMessage::from_json(&event)?) + .await + .is_err() + { + tracing::trace!("flk subscription closed"); + break; + } + } + + Ok(()) + } } diff --git a/core/rpc/src/tests.rs b/core/rpc/src/tests.rs index cc59c2e2d..da7fe316e 100644 --- a/core/rpc/src/tests.rs +++ b/core/rpc/src/tests.rs @@ -60,12 +60,14 @@ use lightning_origin_demuxer::OriginDemuxer; use lightning_pool::{muxer, Config as PoolConfig, PoolProvider}; use lightning_rep_collector::ReputationAggregator; use lightning_signer::{Config as SignerConfig, Signer}; +use lightning_types::Event; use lightning_utils::application::QueryRunnerExt; use lightning_utils::rpc as utils; use reqwest::Client; use serde::{Deserialize, Serialize}; use serde_json::json; +use crate::api::FleekApiClient; use crate::config::Config as RpcConfig; use crate::Rpc; @@ -1358,3 +1360,52 @@ async fn test_admin_rpc_store() -> Result<()> { Ok(()) } + +#[tokio::test(flavor = "multi_thread")] +// #[traced_test] +async fn test_rpc_events() -> Result<()> { + // Create keys + let owner_secret_key = AccountOwnerSecretKey::generate(); + let owner_public_key = owner_secret_key.to_pk(); + + // Init application service + let mut genesis = Genesis::load().unwrap(); + genesis.account.push(GenesisAccount { + public_key: owner_public_key.into(), + flk_balance: 1000u64.into(), + stables_balance: 0, + bandwidth_balance: 0, + }); + + let port = 30023; + let (rpc, _) = init_rpc_without_consensus(Some(genesis), port) + .await + .unwrap(); + + rpc.start().await; + wait_for_server_start(port).await?; + + let sender = rpc.event_tx(); + + let client = jsonrpsee::ws_client::WsClientBuilder::default() + .build(&format!("ws://127.0.0.1:{port}/rpc/v0")) + .await?; + + let mut sub = FleekApiClient::handle_subscription(&client, None).await?; + + let event = Event::transfer( + EthAddress::from([0; 20]), + EthAddress::from([1; 20]), + EthAddress::from([2; 20]), + HpUfixed::<18>::from(10_u16), + ); + + sender + .send(vec![event.clone()]) + .await + .expect("can send event"); + + assert_eq!(sub.next().await.expect("An event from the sub")?, event); + + Ok(()) +} diff --git a/core/signer/src/tests.rs b/core/signer/src/tests.rs index 7158042a7..ecaefbdaf 100644 --- a/core/signer/src/tests.rs +++ b/core/signer/src/tests.rs @@ -93,6 +93,7 @@ async fn test_send_two_txs_in_a_row() { transactions_to_lose: HashSet::new(), new_block_interval: Duration::from_secs(5), }; + let consensus = MockConsensus::::init( consensus_config, &signer, @@ -196,6 +197,7 @@ async fn test_retry_send() { transactions_to_lose: HashSet::from([2]), // drop the 2nd transaction arriving new_block_interval: Duration::from_secs(5), }; + let consensus = MockConsensus::::init( consensus_config, &signer, @@ -247,6 +249,7 @@ async fn test_shutdown() { let (update_socket, query_runner) = (app.transaction_executor(), app.sync_query()); let mut signer = Signer::::init(Config::test(), query_runner.clone()).unwrap(); let notifier = Notifier::::init(&app); + let consensus = MockConsensus::::init( ConsensusConfig::default(), &signer, @@ -280,6 +283,7 @@ async fn test_shutdown_and_start_again() { let (update_socket, query_runner) = (app.transaction_executor(), app.sync_query()); let mut signer = Signer::::init(Config::test(), query_runner.clone()).unwrap(); let notifier = Notifier::::init(&app); + let consensus = MockConsensus::::init( ConsensusConfig::default(), &signer, @@ -329,6 +333,7 @@ async fn test_sign_raw_digest() { let (update_socket, query_runner) = (app.transaction_executor(), app.sync_query()); let mut signer = Signer::::init(Config::test(), query_runner.clone()).unwrap(); let notifier = Notifier::::init(&app); + let consensus = MockConsensus::::init( ConsensusConfig::default(), &signer, diff --git a/core/test-utils/src/consensus.rs b/core/test-utils/src/consensus.rs index d5d0efdf4..f93c4b7e8 100644 --- a/core/test-utils/src/consensus.rs +++ b/core/test-utils/src/consensus.rs @@ -8,7 +8,7 @@ use lightning_interfaces::config::ConfigConsumer; use lightning_interfaces::consensus::{ConsensusInterface, MempoolSocket}; use lightning_interfaces::infu_collection::{c, Collection}; use lightning_interfaces::signer::SignerInterface; -use lightning_interfaces::types::{Block, TransactionRequest}; +use lightning_interfaces::types::{Block, Event, TransactionRequest}; use lightning_interfaces::{ ApplicationInterface, BroadcastInterface, @@ -80,6 +80,8 @@ impl ConsensusInterface for MockConsensus { fn mempool(&self) -> MempoolSocket { self.socket.clone() } + + fn set_event_tx(&mut self, _tx: mpsc::Sender>) {} } impl WithStartAndShutdown for MockConsensus { diff --git a/core/test-utils/src/transaction.rs b/core/test-utils/src/transaction.rs index 5b4f1d7e8..b3089f1f2 100644 --- a/core/test-utils/src/transaction.rs +++ b/core/test-utils/src/transaction.rs @@ -66,6 +66,7 @@ pub fn get_index_request(block_index: u8, parent_hash: [u8; 32]) -> IndexRequest from: tx.payload.sender, to: to.clone(), response: TransactionResponse::Success(ExecutionData::None), + event: None, }); } @@ -78,6 +79,7 @@ pub fn get_index_request(block_index: u8, parent_hash: [u8; 32]) -> IndexRequest from: TransactionSender::AccountOwner(EthAddress(tx.from.0)), to: to.clone(), response: TransactionResponse::Success(ExecutionData::None), + event: None, }); } @@ -93,6 +95,7 @@ pub fn get_index_request(block_index: u8, parent_hash: [u8; 32]) -> IndexRequest from: tx.payload.sender, to: to.clone(), response: TransactionResponse::Success(ExecutionData::None), + event: None, } }) .collect(); diff --git a/core/types/src/application.rs b/core/types/src/application.rs index 71f7908b4..2f905cf51 100644 --- a/core/types/src/application.rs +++ b/core/types/src/application.rs @@ -1,7 +1,7 @@ //! The types used by the Application interface. use ethers::types::{Block as EthersBlock, H256, U64}; -use fleek_crypto::NodePublicKey; +use fleek_crypto::{EthAddress, NodePublicKey}; use hp_fixed::unsigned::HpUfixed; use serde::{Deserialize, Serialize}; @@ -13,6 +13,83 @@ pub const MAX_UPDATES_CONTENT_REGISTRY: usize = 100; /// Max number of delivery acknowledgements allowed per transaction. pub const MAX_DELIVERY_ACKNOWLEDGMENTS: usize = 1000; +macro_rules! create_events { + ( + pub enum Event { + $( + $variant:ident { + $( + $field:ident: $type:ty + ),* $(,)? + } + ),* $(,)? + } + ) => { + #[derive(Eq, Hash, Debug, PartialEq, Serialize, Deserialize, Clone, schemars::JsonSchema)] + pub enum Event { + $( + $variant { + $( + $field: $type + ),* + } + ),* + } + + #[derive(Eq, Hash, Debug, PartialEq, Serialize, Deserialize, Clone, schemars::JsonSchema)] + pub enum EventType { + $( + $variant, + )* + } + + impl Event { + pub fn event_type(&self) -> EventType { + match self { + $( + Self::$variant { .. } => EventType::$variant, + )* + } + } + } + } +} + +create_events! { + pub enum Event { + Transfer { + token: EthAddress, + from: EthAddress, + to: EthAddress, + amount: HpUfixed<18>, + }, + ServiceEvent { + service_id: u32, + event: Vec, + }, + } +} + +impl Event { + pub fn transfer( + token: EthAddress, + from: EthAddress, + to: EthAddress, + amount: HpUfixed<18>, + ) -> Self { + Self::Transfer { + token, + from, + to, + amount, + } + } + + pub fn service_event(service_id: u32, event: Vec) -> Self { + Self::ServiceEvent { service_id, event } + } +} + /// The response generated from executing an entire batch of transactions (aka a block). #[derive(Debug, Hash, Clone)] pub struct BlockExecutionResponse { diff --git a/core/types/src/response.rs b/core/types/src/response.rs index 7d3fc763f..5e19c7fd5 100644 --- a/core/types/src/response.rs +++ b/core/types/src/response.rs @@ -3,7 +3,7 @@ use fleek_crypto::{EthAddress, TransactionSender}; use serde::{Deserialize, Serialize}; use super::{Epoch, NodeInfo}; -use crate::UpdateMethod; +use crate::{Event, UpdateMethod}; /// Info on a Narwhal epoch #[derive( @@ -50,6 +50,8 @@ pub struct TransactionReceipt { pub to: TransactionDestination, /// The results of the transaction pub response: TransactionResponse, + /// The event that was emitted by the transaction + pub event: Option, } /// What state function a transaction was calling. If an ethereum transaction it will either be diff --git a/core/types/src/state.rs b/core/types/src/state.rs index c8d3bbcfa..9ca3d9b90 100644 --- a/core/types/src/state.rs +++ b/core/types/src/state.rs @@ -26,6 +26,16 @@ pub enum Tokens { FLK, } +impl Tokens { + pub fn address(&self) -> EthAddress { + // todo!(n) + match self { + Tokens::USDC => EthAddress([0; 20]), + Tokens::FLK => EthAddress([1; 20]), + } + } +} + #[derive(Debug, PartialEq, Serialize, Deserialize, Clone, Default, schemars::JsonSchema)] pub struct NodeServed { pub served: CommodityServed, diff --git a/core/types/src/transaction.rs b/core/types/src/transaction.rs index 05ec7f617..361188c05 100644 --- a/core/types/src/transaction.rs +++ b/core/types/src/transaction.rs @@ -17,6 +17,7 @@ use serde::{Deserialize, Serialize}; use super::{ Epoch, + Event, ProofOfConsensus, ProofOfMisbehavior, ProtocolParams, @@ -162,6 +163,32 @@ impl TransactionRequest { .as_u32(), } } + + pub fn event(&self) -> Option { + if let TransactionSender::AccountOwner(sender) = self.sender() { + match self { + Self::UpdateRequest(payload) => match &payload.payload.method { + UpdateMethod::Transfer { amount, token, to } => Some(Event::transfer( + token.address(), + sender, + *to, + amount.clone(), + )), + UpdateMethod::SubmitDeliveryAcknowledgmentAggregation { + service_id, + metadata: event, + .. + } => event + .to_owned() + .map(|e| Event::service_event(*service_id, e)), + _ => None, + }, + _ => None, + } + } else { + None + } + } } impl TryFrom<&TransactionRequest> for Vec {