Skip to content

Commit

Permalink
remove builder event types
Browse files Browse the repository at this point in the history
  • Loading branch information
imabdulbasit committed Jul 16, 2024
1 parent d6db092 commit 70291d7
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 95 deletions.
106 changes: 22 additions & 84 deletions src/events_source.rs
Original file line number Diff line number Diff line change
@@ -1,64 +1,20 @@
use async_broadcast::{broadcast, InactiveReceiver, Sender as BroadcastSender};
use async_trait::async_trait;
use futures::future::BoxFuture;
use futures::stream::{self, BoxStream, Stream, StreamExt};
use futures::stream::{BoxStream, Stream, StreamExt};
use hotshot_types::{
data::{DaProposal, QuorumProposal},
error::HotShotError,
event::{error_adaptor, Event, EventType},
message::Proposal,
traits::node_implementation::{ConsensusTime, NodeType},
traits::node_implementation::NodeType,
PeerConfig,
};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tide_disco::method::ReadState;
const RETAINED_EVENTS_COUNT: usize = 4096;

/// A builder event
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(bound(deserialize = "Types: NodeType"))]
pub struct BuilderEvent<Types: NodeType> {
/// The view number that this event originates from
pub view_number: Types::Time,

/// The underlying event
pub event: BuilderEventType<Types>,
}

// impl From event to builder event
impl<Types: NodeType> From<Event<Types>> for BuilderEvent<Types> {
fn from(event: Event<Types>) -> Self {
BuilderEvent {
view_number: event.view_number,
event: match event.event {
EventType::Error { error } => BuilderEventType::HotshotError { error },
EventType::Transactions { transactions } => {
BuilderEventType::HotshotTransactions { transactions }
}
EventType::Decide {
leaf_chain,
block_size,
..
} => {
let latest_decide_view_num = leaf_chain[0].leaf.view_number();
BuilderEventType::HotshotDecide {
latest_decide_view_num,
block_size,
}
}
EventType::DaProposal { proposal, sender } => {
BuilderEventType::HotshotDaProposal { proposal, sender }
}
EventType::QuorumProposal { proposal, sender } => {
BuilderEventType::HotshotQuorumProposal { proposal, sender }
}
_ => BuilderEventType::Unknown,
},
}
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(bound(deserialize = "Types: NodeType"))]
pub enum BuilderEventType<Types: NodeType> {
Expand Down Expand Up @@ -107,10 +63,10 @@ pub trait EventsSource<Types>
where
Types: NodeType,
{
type EventStream: Stream<Item = Arc<BuilderEvent<Types>>> + Unpin + Send + 'static;
type EventStream: Stream<Item = Arc<Event<Types>>> + Unpin + Send + 'static;
async fn get_event_stream(&self) -> Self::EventStream;

async fn subscribe_events(&self) -> BoxStream<'static, Arc<BuilderEvent<Types>>> {
async fn subscribe_events(&self) -> BoxStream<'static, Arc<Event<Types>>> {
self.get_event_stream().await.boxed()
}
}
Expand All @@ -126,14 +82,24 @@ where
#[derive(Debug)]
pub struct EventsStreamer<Types: NodeType> {
// required for api subscription
inactive_to_subscribe_clone_recv: InactiveReceiver<Arc<BuilderEvent<Types>>>,
subscriber_send_channel: BroadcastSender<Arc<BuilderEvent<Types>>>,
inactive_to_subscribe_clone_recv: InactiveReceiver<Arc<Event<Types>>>,
subscriber_send_channel: BroadcastSender<Arc<Event<Types>>>,

// required for sending startup info
known_nodes_with_stake: Vec<PeerConfig<Types::SignatureKey>>,
non_staked_node_count: usize,
}

impl<Types: NodeType> EventsStreamer<Types> {
pub fn known_node_with_stake(&self) -> Vec<PeerConfig<Types::SignatureKey>> {
self.known_nodes_with_stake.clone()
}

pub fn non_staked_node_count(&self) -> usize {
self.non_staked_node_count
}
}

#[async_trait]
impl<Types: NodeType> EventConsumer<Types> for EventsStreamer<Types> {
async fn handle_event(&mut self, event: Event<Types>) {
Expand All @@ -157,38 +123,19 @@ impl<Types: NodeType> EventConsumer<Types> for EventsStreamer<Types> {
Event { .. } => false,
};
if filter {
let builder_event = Arc::new(BuilderEvent::from(event));
let _status = self.subscriber_send_channel.broadcast(builder_event).await;
let _status = self.subscriber_send_channel.broadcast(event.into()).await;
}
}
}

#[async_trait]
impl<Types: NodeType> EventsSource<Types> for EventsStreamer<Types> {
type EventStream = BoxStream<'static, Arc<BuilderEvent<Types>>>;
type EventStream = BoxStream<'static, Arc<Event<Types>>>;

async fn get_event_stream(&self) -> Self::EventStream {
let recv_channel = self.inactive_to_subscribe_clone_recv.activate_cloned();
let startup_event_initialized = false;
let startup_event = self.get_startup_event().clone();
stream::unfold(
(recv_channel, startup_event, startup_event_initialized),
|(mut recv_channel, startup_event, mut startup_event_initialized)| async move {
let event_res = if startup_event_initialized {
recv_channel.recv().await.ok()
} else {
startup_event_initialized = true;
Some(Arc::new(startup_event.clone()))
};
event_res.map(|event| {
(
event,
(recv_channel, startup_event, startup_event_initialized),
)
})
},
)
.boxed()
self.inactive_to_subscribe_clone_recv
.activate_cloned()
.boxed()
}
}
impl<Types: NodeType> EventsStreamer<Types> {
Expand All @@ -197,7 +144,7 @@ impl<Types: NodeType> EventsStreamer<Types> {
non_staked_node_count: usize,
) -> Self {
let (mut subscriber_send_channel, to_subscribe_clone_recv) =
broadcast::<Arc<BuilderEvent<Types>>>(RETAINED_EVENTS_COUNT);
broadcast::<Arc<Event<Types>>>(RETAINED_EVENTS_COUNT);
// set the overflow to true to drop older messages from the channel
subscriber_send_channel.set_overflow(true);
// set the await active to false to not block the sender
Expand All @@ -210,15 +157,6 @@ impl<Types: NodeType> EventsStreamer<Types> {
non_staked_node_count,
}
}
pub fn get_startup_event(&self) -> BuilderEvent<Types> {
BuilderEvent {
view_number: Types::Time::genesis(),
event: BuilderEventType::StartupInfo {
known_node_with_stake: self.known_nodes_with_stake.clone(),
non_staked_node_count: self.non_staked_node_count,
},
}
}
}

#[async_trait]
Expand Down
22 changes: 11 additions & 11 deletions src/test.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
#[cfg(test)]
mod tests {
use crate::events_source::{BuilderEvent, EventConsumer, EventsStreamer}; // EventsUpdater};
//use crate::fetch::Fetch;
use crate::events_source::{EventConsumer, EventsStreamer}; // EventsUpdater};
//use crate::fetch::Fetch;
use crate::events::{define_api, Error, Options};
use async_compatibility_layer::art::async_spawn;
use async_compatibility_layer::logging::{setup_backtrace, setup_logging};
use async_std::sync::RwLock;
use futures::stream::StreamExt;
use hotshot_types::{
constants::{Version01, STATIC_VER_0_1},
data::ViewNumber,
event::{Event, EventType},
traits::node_implementation::{ConsensusTime, NodeType},
Expand All @@ -19,6 +18,7 @@ mod tests {
use std::time::Duration;
use surf_disco::Client;
use tide_disco::{App, Url};
use vbs::version::{StaticVersion, StaticVersionType};

// return a empty transaction event
fn generate_event<Types: NodeType<Time = ViewNumber>>(view_number: u64) -> Event<Types> {
Expand Down Expand Up @@ -49,15 +49,15 @@ mod tests {
let mut app = App::<_, Error>::with_state(events_streamer.clone());

let hotshot_events_api =
define_api::<Arc<RwLock<EventsStreamer<TestTypes>>>, TestTypes, Version01>(
define_api::<Arc<RwLock<EventsStreamer<TestTypes>>>, TestTypes, StaticVersion<0, 1>>(
&Options::default(),
)
.expect("Failed to define hotshot eventsAPI");

app.register_module("hotshot_events", hotshot_events_api)
.expect("Failed to register hotshot events API");

async_spawn(app.serve(api_url, STATIC_VER_0_1));
async_spawn(app.serve(api_url, StaticVersion::<0, 1>::instance()));
let total_count = 5;
let send_handle = async_spawn(async move {
let mut send_count = 0;
Expand Down Expand Up @@ -100,18 +100,18 @@ mod tests {
let mut app = App::<_, Error>::with_state(events_streamer.clone());

let hotshot_events_api =
define_api::<Arc<RwLock<EventsStreamer<TestTypes>>>, TestTypes, Version01>(
define_api::<Arc<RwLock<EventsStreamer<TestTypes>>>, TestTypes, StaticVersion<0, 1>>(
&Options::default(),
)
.expect("Failed to define hotshot eventsAPI");

app.register_module("hotshot_events", hotshot_events_api)
.expect("Failed to register hotshot events API");

async_spawn(app.serve(api_url, STATIC_VER_0_1));
async_spawn(app.serve(api_url, StaticVersion::<0, 1>::instance()));

// Start Client 1
let client_1 = Client::<Error, Version01>::new(
let client_1 = Client::<Error, StaticVersion<0, 1>>::new(
format!("http://localhost:{}/hotshot_events", port)
.parse()
.unwrap(),
Expand All @@ -123,14 +123,14 @@ mod tests {
// client 1 subscribe to hotshot events
let mut events_1 = client_1
.socket("events")
.subscribe::<BuilderEvent<TestTypes>>()
.subscribe::<Event<TestTypes>>()
.await
.unwrap();

tracing::info!("Client 1 Subscribed to events");

// Start Client 2
let client_2 = Client::<Error, Version01>::new(
let client_2 = Client::<Error, StaticVersion<0, 1>>::new(
format!("http://localhost:{}/hotshot_events", port)
.parse()
.unwrap(),
Expand All @@ -142,7 +142,7 @@ mod tests {
// client 2 subscrive to hotshot events
let mut events_2 = client_2
.socket("events")
.subscribe::<BuilderEvent<TestTypes>>()
.subscribe::<Event<TestTypes>>()
.await
.unwrap();

Expand Down

0 comments on commit 70291d7

Please sign in to comment.