Skip to content

Commit

Permalink
feat: tx events + flk rpc subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
nhtyy committed Feb 18, 2024
1 parent 68e55f8 commit 1cdf1a5
Show file tree
Hide file tree
Showing 27 changed files with 397 additions and 43 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2808,7 +2808,7 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "330c60081dcc4c72131f8eb70510f1ac07223e5d4163db481a04a0befcffa412"
dependencies = [
"libloading 0.7.4",
"libloading 0.8.1",
]

[[package]]
Expand Down Expand Up @@ -6128,10 +6128,10 @@ dependencies = [
"anyhow",
"async-trait",
"autometrics",
"axum 0.6.20",
"clap 4.4.12",
"ethers",
"fleek-crypto",
"futures",
"hp-fixed",
"hyper 0.14.28",
"infusion",
Expand Down
8 changes: 8 additions & 0 deletions core/application/src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,13 @@ impl Env<UpdatePerm> {
response.change_epoch = true;
}

let mut event = None;
if let TransactionResponse::Success(_) = results {
if let Some(e) = txn.event() {
event = Some(e);
}
}

let receipt = TransactionReceipt {
block_hash: block.digest,
block_number,
Expand All @@ -183,6 +190,7 @@ impl Env<UpdatePerm> {
from: txn.sender(),
to: txn.to(),
response: results,
event,
};
/* Todo(dalton): Check if the transaction resulted in the committee change(Like a current validator getting slashed)
if so acknowledge that in the block response
Expand Down
1 change: 1 addition & 0 deletions core/blockstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ mod tests {
transactions_to_lose: HashSet::new(),
new_block_interval: Duration::from_secs(5),
};

let consensus = MockConsensus::<TestBinding>::init(
consensus_config,
&signer,
Expand Down
22 changes: 18 additions & 4 deletions core/consensus/src/consensus.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::BTreeMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};

use affair::{Executor, TokioSpawn};
Expand All @@ -12,7 +12,7 @@ use lightning_interfaces::config::ConfigConsumer;
use lightning_interfaces::consensus::{ConsensusInterface, MempoolSocket};
use lightning_interfaces::infu_collection::{c, Collection};
use lightning_interfaces::signer::{SignerInterface, SubmitTxSocket};
use lightning_interfaces::types::{Epoch, EpochInfo, UpdateMethod};
use lightning_interfaces::types::{Epoch, EpochInfo, Event, UpdateMethod};
use lightning_interfaces::{
ApplicationInterface,
BroadcastInterface,
Expand All @@ -33,7 +33,7 @@ use narwhal_node::NodeStorage;
use prometheus::Registry;
use resolved_pathbuf::ResolvedPathBuf;
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, Mutex, Notify};
use tokio::sync::{mpsc, Notify};
use tokio::{pin, select, task, time};
use tracing::{error, info};
use typed_store::DBMetrics;
Expand Down Expand Up @@ -316,6 +316,10 @@ impl<Q: SyncQueryRunnerInterface, P: PubSub<PubSubMsg> + 'static, NE: Emitter>
pub fn shutdown(&self) {
self.execution_state.shutdown();
}

fn set_event_tx(&mut self, tx: tokio::sync::mpsc::Sender<Vec<Event>>) {
self.execution_state.set_event_tx(tx);
}
}

impl<C: Collection> WithStartAndShutdown for Consensus<C> {
Expand All @@ -333,9 +337,10 @@ impl<C: Collection> WithStartAndShutdown for Consensus<C> {
let mut epoch_state = self
.epoch_state
.lock()
.await
.expect("Mutex poisened")
.take()
.expect("Consensus was tried to start before initialization");

self.is_running.store(true, Ordering::Relaxed);

task::spawn(async move {
Expand Down Expand Up @@ -453,6 +458,15 @@ impl<C: Collection> ConsensusInterface<C> for Consensus<C> {
fn mempool(&self) -> MempoolSocket {
self.mempool_socket.clone()
}

fn set_event_tx(&mut self, tx: tokio::sync::mpsc::Sender<Vec<Event>>) {
self.epoch_state
.lock()
.expect("Mutex poisened")
.as_mut()
.expect("Consensus was tried to start before initialization")
.set_event_tx(tx);
}
}

#[derive(Debug, Serialize, Deserialize, Clone, IsVariant, From, TryInto)]
Expand Down
31 changes: 29 additions & 2 deletions core/consensus/src/execution.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::sync::Arc;
use std::sync::{Arc, OnceLock};

use async_trait::async_trait;
use fastcrypto::hash::HashFunction;
use fleek_blake3 as blake3;
use lightning_interfaces::types::{Block, Epoch, Metadata, NodeIndex, TransactionRequest};
use lightning_interfaces::types::{Block, Epoch, Event, Metadata, NodeIndex, TransactionRequest};
use lightning_interfaces::{
Emitter,
ExecutionEngineSocket,
Expand Down Expand Up @@ -75,6 +75,8 @@ pub struct Execution<Q: SyncQueryRunnerInterface, NE: Emitter> {
index_socket: Option<IndexSocket>,
/// Notifications emitter
notifier: NE,
/// Send the event to the RPC
event_tx: OnceLock<mpsc::Sender<Vec<Event>>>,
}

impl<Q: SyncQueryRunnerInterface, NE: Emitter> Execution<Q, NE> {
Expand All @@ -93,6 +95,7 @@ impl<Q: SyncQueryRunnerInterface, NE: Emitter> Execution<Q, NE> {
query_runner,
index_socket,
notifier,
event_tx: OnceLock::new(),
}
}

Expand Down Expand Up @@ -124,6 +127,26 @@ impl<Q: SyncQueryRunnerInterface, NE: Emitter> Execution<Q, NE> {
let results = self.executor.run(block).await.unwrap();
info!("Consensus submitted new block to application");

match self.event_tx.get() {
Some(tx) => {
if let Err(e) = tx
.send(
results
.txn_receipts
.iter()
.filter_map(|r| r.event.clone())
.collect(),
)
.await
{
error!("We could not send a message to the RPC: {e}");
}
},
None => {
error!("Once Cell not initialized, this is a bug");
},
}

if results.change_epoch {
change_epoch = true;
self.notifier.epoch_changed();
Expand Down Expand Up @@ -162,6 +185,10 @@ impl<Q: SyncQueryRunnerInterface, NE: Emitter> Execution<Q, NE> {
self.notifier.shutdown();
self.executor.downgrade();
}

pub fn set_event_tx(&self, tx: mpsc::Sender<Vec<Event>>) {
self.event_tx.set(tx).unwrap();
}
}

#[async_trait]
Expand Down
1 change: 1 addition & 0 deletions core/indexer/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ async fn test_submission() {
transactions_to_lose: HashSet::new(),
new_block_interval: Duration::from_secs(5),
};

let consensus = MockConsensus::<TestBinding>::init(
consensus_config,
&signer,
Expand Down
10 changes: 9 additions & 1 deletion core/interfaces/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ use crate::common::WithStartAndShutdown;
use crate::config::ConfigConsumer;
use crate::infu_collection::Collection;
use crate::signer::SignerInterface;
use crate::types::TransactionRequest;
use crate::types::{Event, TransactionRequest};
use crate::{
ApplicationInterface,
ArchiveInterface,
BroadcastInterface,
ConfigProviderInterface,
IndexSocket,
NotifierInterface,
RpcInterface,
};

/// A socket that gives services and other sub-systems the required functionality to
Expand Down Expand Up @@ -54,9 +55,14 @@ pub trait ConsensusInterface<C: Collection>:
)
}

fn _post(&mut self, rpc: ::RpcInterface) {
self.set_event_tx(rpc.event_tx());
}

type Certificate: LightningMessage + Clone;

/// Create a new consensus service with the provided config and executor.
#[allow(clippy::too_many_arguments)]
fn init<S: SignerInterface<C>>(
config: Self::Config,
signer: &S,
Expand All @@ -72,4 +78,6 @@ pub trait ConsensusInterface<C: Collection>:
/// transaction to the consensus.
#[blank = Socket::raw_bounded(64).0]
fn mempool(&self) -> MempoolSocket;

fn set_event_tx(&mut self, tx: tokio::sync::mpsc::Sender<Vec<Event>>);
}
4 changes: 4 additions & 0 deletions core/interfaces/src/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use infusion::c;
use tokio::sync::mpsc;

use crate::common::WithStartAndShutdown;
use crate::config::ConfigConsumer;
use crate::consensus::MempoolSocket;
use crate::infu_collection::Collection;
use crate::types::Event;
use crate::{
ApplicationInterface,
ArchiveInterface,
Expand Down Expand Up @@ -51,4 +53,6 @@ pub trait RpcInterface<C: Collection>:
signer: &C::SignerInterface,
archive_socket: Option<ArchiveSocket<C>>,
) -> anyhow::Result<Self>;

fn event_tx(&self) -> mpsc::Sender<Vec<Event>>;
}
4 changes: 3 additions & 1 deletion core/mock/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use axum::extract::State;
use axum::routing::post;
use axum::{Json, Router};
use lightning_interfaces::infu_collection::{c, Collection};
use lightning_interfaces::types::{Block, TransactionRequest};
use lightning_interfaces::types::{Block, Event, TransactionRequest};
use lightning_interfaces::{
ApplicationInterface,
BroadcastInterface,
Expand Down Expand Up @@ -184,6 +184,8 @@ impl<C: Collection> ConsensusInterface<C> for MockConsensus<C> {
fn mempool(&self) -> MempoolSocket {
self.mempool.clone()
}

fn set_event_tx(&mut self, _tx: mpsc::Sender<Vec<Event>>) {}
}

impl<C: Collection> ConfigConsumer for MockConsensus<C> {
Expand Down
1 change: 1 addition & 0 deletions core/origin-demuxer/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ async fn create_app_state(test_name: String) -> AppState {
transactions_to_lose: HashSet::new(),
new_block_interval: Duration::from_secs(5),
};

let consensus = MockConsensus::<TestBinding>::init(
consensus_config,
&signer,
Expand Down
1 change: 1 addition & 0 deletions core/origin-http/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ async fn create_app_state(test_name: String) -> AppState {
transactions_to_lose: HashSet::new(),
new_block_interval: Duration::from_secs(5),
};

let consensus = MockConsensus::<TestBinding>::init(
consensus_config,
&signer,
Expand Down
1 change: 1 addition & 0 deletions core/origin-ipfs/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ async fn create_app_state(test_name: String) -> AppState {
transactions_to_lose: HashSet::new(),
new_block_interval: Duration::from_secs(5),
};

let consensus = MockConsensus::<TestBinding>::init(
consensus_config,
&signer,
Expand Down
1 change: 1 addition & 0 deletions core/pinger/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ async fn init_pinger() -> Pinger<TestBinding> {
transactions_to_lose: HashSet::new(),
new_block_interval: Duration::from_secs(5),
};

let consensus = MockConsensus::<TestBinding>::init(
consensus_config,
&signer,
Expand Down
2 changes: 2 additions & 0 deletions core/rep-collector/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ async fn test_query() {
transactions_to_lose: HashSet::new(),
new_block_interval: Duration::from_secs(5),
};

let consensus = MockConsensus::<TestBinding>::init(
consensus_config,
&signer,
Expand Down Expand Up @@ -515,6 +516,7 @@ async fn test_reputation_calculation_and_query() {
&notifier1,
)
.unwrap();

let consensus2 = MockConsensus::<TestBinding>::init(
consensus_config,
&signer2,
Expand Down
4 changes: 2 additions & 2 deletions core/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ lightning-openrpc = { path = "../rpc-openrpc" }
lightning-openrpc-macros = { path = "../rpc-openrpc-macros" }
lightning-utils = { path = "../utils" }
alloy-primitives = "0.5.2"
axum.workspace = true
hyper = "0.14.27"
hyper = { version = "0.14.27", features = ["server", "full"] }
futures.workspace = true


[dev-dependencies]
Expand Down
7 changes: 6 additions & 1 deletion core/rpc/src/api/flk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ use std::time::Duration;

use fleek_crypto::{EthAddress, NodePublicKey};
use hp_fixed::unsigned::HpUfixed;
use jsonrpsee::core::RpcResult;
use jsonrpsee::core::{RpcResult, SubscriptionResult};
use jsonrpsee::proc_macros::rpc;
use lightning_interfaces::types::{
AccountInfo,
Blake3Hash,
Epoch,
EpochInfo,
Event,
EventType,
NodeIndex,
NodeInfo,
NodeInfoWithIndex,
Expand Down Expand Up @@ -191,4 +193,7 @@ pub trait FleekApi {

#[method(name = "metrics")]
async fn metrics(&self) -> RpcResult<String>;

#[subscription(name = "subscribe", item = Event)]
async fn handle_subscription(&self, event_type: Option<EventType>) -> SubscriptionResult;
}
Loading

0 comments on commit 1cdf1a5

Please sign in to comment.