Skip to content

Commit

Permalink
Subscribe Events (#41)
Browse files Browse the repository at this point in the history
* Add subscribe_events api

* Add Event types

* Setup the channel

* Receive the events in this commit

* Display events

* Fix the display issue

* Success display events

* Code clean

* Code clean 2

* Self review
  • Loading branch information
boundless-forest authored Oct 18, 2023
1 parent 501ad49 commit 16b7fd9
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 9 deletions.
110 changes: 106 additions & 4 deletions src/handler/dashboard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,15 @@ use ratatui::{
style::Stylize,
widgets::*,
};
use scale_info::{Path, PortableType, Type, TypeDefSequence};
use scale_value::{scale::decode_as_type, Composite, Value, ValueDef};
use sp_core::Encode;
use sp_runtime::{
traits::{Block as BlockT, Hash, Header as HeaderT},
DigestItem,
};
use sp_storage::StorageData;
use subxt_metadata::Metadata;
use tokio::sync::mpsc::UnboundedReceiver;
// this crate
use crate::{
Expand All @@ -23,12 +27,16 @@ use crate::{
};

const BLOCKS_MAX_LIMIT: usize = 30;
const EVENTS_MAX_LIMIT: usize = 5;

pub(crate) struct DashBoard<CI: ChainInfo> {
pub metadata: Metadata,
pub system_pane_info: SystemPaneInfo,
pub blocks_rev: UnboundedReceiver<HeaderForChain<CI>>,
pub blocks: StatefulList<BlockForChain<CI>>,
pub selected_block: Option<BlockForChain<CI>>,
pub events_rev: UnboundedReceiver<Vec<StorageData>>,
pub events: StatefulList<Value<u32>>,
pub tab_titles: Vec<String>,
pub index: usize,
}
Expand All @@ -37,12 +45,17 @@ impl<CI: ChainInfo> DashBoard<CI> {
pub(crate) fn new(
system_pane_info: SystemPaneInfo,
blocks_rev: UnboundedReceiver<HeaderForChain<CI>>,
events_rev: UnboundedReceiver<Vec<StorageData>>,
metadata: Metadata,
) -> DashBoard<CI> {
DashBoard {
metadata,
system_pane_info,
blocks_rev,
events_rev,
selected_block: None,
blocks: StatefulList::with_items(VecDeque::with_capacity(BLOCKS_MAX_LIMIT)),
events: StatefulList::with_items(VecDeque::with_capacity(EVENTS_MAX_LIMIT)),
tab_titles: vec![String::from("Blocks"), String::from("Events")],
index: 0,
}
Expand Down Expand Up @@ -84,6 +97,37 @@ where
B: Backend,
CI: ChainInfo,
{
fn vec_event_records_type_id(metadata: &mut Metadata) -> Option<u32> {
let event_records_type_id = metadata
.types()
.types
.iter()
.find(|ty| {
ty.ty.path
== Path::from_segments_unchecked(vec![
"frame_system".to_string(),
"EventRecord".to_string(),
])
})
.map(|ty| ty.id)
.unwrap();

let ty_mut = metadata.types_mut();
let vec_event_records_ty = Type::new(
Path::default(),
vec![],
TypeDefSequence::new(event_records_type_id.into()),
vec![],
);
let vec_event_records_type_id = ty_mut.types.len() as u32;
ty_mut
.types
.push(PortableType { id: vec_event_records_type_id, ty: vec_event_records_ty });

Some(vec_event_records_type_id)
}

let vec_event_records_type_id = vec_event_records_type_id(&mut app.metadata).unwrap();
loop {
terminal.draw(|f| ui(f, &mut app))?;

Expand All @@ -96,6 +140,51 @@ where
}
}

if let Ok(storage_data) = app.events_rev.try_recv() {
for data in storage_data {
let value = decode_as_type(
&mut data.0.as_ref(),
vec_event_records_type_id,
app.metadata.types(),
);

if let Ok(event_records) = value {
match event_records.value {
ValueDef::Composite(event_records) => match event_records {
Composite::Named(_) => continue,
Composite::Unnamed(event_records) =>
for record in event_records {
match record.value {
ValueDef::Composite(inner) => match inner {
Composite::Named(v) => {
let event_values: Vec<Value<u32>> = v
.into_iter()
.filter(|d| d.0 == "event")
.map(|d| d.1)
.collect();

for event in event_values {
if app.events.items.len()
== app.events.items.capacity()
{
app.events.items.pop_front();
} else {
app.events.items.push_back(event);
}
}
},
Composite::Unnamed(_) => continue,
},
_ => continue,
}
},
},
_ => continue,
}
}
}
}

if let Event::Key(key) = read()? {
if key.kind == KeyEventKind::Press {
match key.code {
Expand Down Expand Up @@ -280,14 +369,27 @@ where
}
}

fn draw_events_tab<B, CI>(f: &mut Frame<B>, _app: &mut DashBoard<CI>, area: Rect)
fn draw_events_tab<B, CI>(f: &mut Frame<B>, app: &mut DashBoard<CI>, area: Rect)
where
B: Backend,
CI: ChainInfo,
{
let text = vec![Line::from("Event Page")];
let paragraph = Paragraph::new(text);
f.render_widget(paragraph, area);
let mut text = "".to_string();
for e in &app.events.items {
text.push_str(&format!(
"{}\n",
serde_json::to_string(e).unwrap_or("Decode Error Occurred.".to_string())
));
}
let l = Paragraph::new(text)
.wrap(Wrap { trim: true })
.block(
Block::default()
.borders(Borders::ALL)
.title(format!("Latest {} Events", EVENTS_MAX_LIMIT)),
)
.style(Style::default().fg(Color::Yellow));
f.render_widget(l, area);
}

pub struct StatefulList<T> {
Expand Down
28 changes: 25 additions & 3 deletions src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ impl<CI: ChainInfo> Handler<CI> {
let mut terminal = Terminal::new(backend)?;

let (blocks_tx, blocks_rx) = mpsc::unbounded_channel();
let system_pane_info = self.client.system_pane_info().await?;
let mut headers_subs = self.client.subscribe_finalized_heads().await.unwrap();
let (events_tx, events_rx) = mpsc::unbounded_channel();

let mut headers_subs = self.client.subscribe_finalized_heads().await.unwrap();
tokio::spawn(async move {
while let Some(header) = headers_subs.next().await {
if let Ok(header) = header {
Expand All @@ -119,7 +119,29 @@ impl<CI: ChainInfo> Handler<CI> {
}
});

let dashboard = DashBoard::new(system_pane_info, blocks_rx);
let mut events_subs = self.client.subscribe_events().await.unwrap();
tokio::spawn(async move {
while let Some(storage_set) = events_subs.next().await {
if let Ok(storage) = storage_set {
let data = storage
.changes
.into_iter()
.filter_map(|(_k, data)| data)
.collect();
if events_tx.send(data).is_err() {
break;
}
}
}
});

let system_pane_info = self.client.system_pane_info().await?;
let dashboard = DashBoard::new(
system_pane_info,
blocks_rx,
events_rx,
self.metadata.clone(),
);
run_dashboard(self.client.clone(), &mut terminal, dashboard).await?;

// restore terminal
Expand Down
1 change: 1 addition & 0 deletions src/networks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub trait ChainInfo: Sync + Send {
/// The hash type of the chain
type Hash: Serialize
+ DeserializeOwned
+ 'static
+ Send
+ FromStr
+ From<H256>
Expand Down
18 changes: 16 additions & 2 deletions src/rpc/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use jsonrpsee::{
rpc_params,
};
use serde::de::DeserializeOwned;
use sp_core::Decode;
use sp_core::{twox_128, Decode};
use sp_runtime::generic::SignedBlock;
use sp_storage::StorageKey;
use sp_storage::{StorageChangeSet, StorageKey};
use sp_version::RuntimeVersion;
use subxt_metadata::Metadata;
// this crate
Expand Down Expand Up @@ -122,4 +122,18 @@ pub trait SubscribeApi {
)
.await
}

async fn subscribe_events(
&self,
) -> RpcResult<Subscription<StorageChangeSet<HashForChain<Self::ChainInfo>>>> {
let mut key = twox_128("System".as_bytes()).to_vec();
key.extend(twox_128("Events".as_bytes()));

self.subscribe(
"state_subscribeStorage",
rpc_params![vec![StorageKey(key)]],
"state_unsubscribeStorage",
)
.await
}
}

0 comments on commit 16b7fd9

Please sign in to comment.