From 16b7fd9c676f61f9552b0bd5e76a873ee743678b Mon Sep 17 00:00:00 2001 From: Bear Wang Date: Wed, 18 Oct 2023 17:50:23 +0800 Subject: [PATCH] Subscribe Events (#41) * Add subscribe_events api * Add Event types * Setup the channel * Receive the events in this commit * Display events * Fix the display issue * Success display events * Code clean * Code clean 2 * Self review --- src/handler/dashboard.rs | 110 +++++++++++++++++++++++++++++++++++++-- src/handler/mod.rs | 28 ++++++++-- src/networks/mod.rs | 1 + src/rpc/api.rs | 18 ++++++- 4 files changed, 148 insertions(+), 9 deletions(-) diff --git a/src/handler/dashboard.rs b/src/handler/dashboard.rs index 05f7c5a..e0e977c 100644 --- a/src/handler/dashboard.rs +++ b/src/handler/dashboard.rs @@ -10,11 +10,15 @@ use ratatui::{ style::Stylize, widgets::*, }; +use scale_info::{Path, PortableType, Type, TypeDefSequence}; +use scale_value::{scale::decode_as_type, Composite, Value, ValueDef}; use sp_core::Encode; use sp_runtime::{ traits::{Block as BlockT, Hash, Header as HeaderT}, DigestItem, }; +use sp_storage::StorageData; +use subxt_metadata::Metadata; use tokio::sync::mpsc::UnboundedReceiver; // this crate use crate::{ @@ -23,12 +27,16 @@ use crate::{ }; const BLOCKS_MAX_LIMIT: usize = 30; +const EVENTS_MAX_LIMIT: usize = 5; pub(crate) struct DashBoard { + pub metadata: Metadata, pub system_pane_info: SystemPaneInfo, pub blocks_rev: UnboundedReceiver>, pub blocks: StatefulList>, pub selected_block: Option>, + pub events_rev: UnboundedReceiver>, + pub events: StatefulList>, pub tab_titles: Vec, pub index: usize, } @@ -37,12 +45,17 @@ impl DashBoard { pub(crate) fn new( system_pane_info: SystemPaneInfo, blocks_rev: UnboundedReceiver>, + events_rev: UnboundedReceiver>, + metadata: Metadata, ) -> DashBoard { DashBoard { + metadata, system_pane_info, blocks_rev, + events_rev, selected_block: None, blocks: StatefulList::with_items(VecDeque::with_capacity(BLOCKS_MAX_LIMIT)), + events: StatefulList::with_items(VecDeque::with_capacity(EVENTS_MAX_LIMIT)), tab_titles: vec![String::from("Blocks"), String::from("Events")], index: 0, } @@ -84,6 +97,37 @@ where B: Backend, CI: ChainInfo, { + fn vec_event_records_type_id(metadata: &mut Metadata) -> Option { + let event_records_type_id = metadata + .types() + .types + .iter() + .find(|ty| { + ty.ty.path + == Path::from_segments_unchecked(vec![ + "frame_system".to_string(), + "EventRecord".to_string(), + ]) + }) + .map(|ty| ty.id) + .unwrap(); + + let ty_mut = metadata.types_mut(); + let vec_event_records_ty = Type::new( + Path::default(), + vec![], + TypeDefSequence::new(event_records_type_id.into()), + vec![], + ); + let vec_event_records_type_id = ty_mut.types.len() as u32; + ty_mut + .types + .push(PortableType { id: vec_event_records_type_id, ty: vec_event_records_ty }); + + Some(vec_event_records_type_id) + } + + let vec_event_records_type_id = vec_event_records_type_id(&mut app.metadata).unwrap(); loop { terminal.draw(|f| ui(f, &mut app))?; @@ -96,6 +140,51 @@ where } } + if let Ok(storage_data) = app.events_rev.try_recv() { + for data in storage_data { + let value = decode_as_type( + &mut data.0.as_ref(), + vec_event_records_type_id, + app.metadata.types(), + ); + + if let Ok(event_records) = value { + match event_records.value { + ValueDef::Composite(event_records) => match event_records { + Composite::Named(_) => continue, + Composite::Unnamed(event_records) => + for record in event_records { + match record.value { + ValueDef::Composite(inner) => match inner { + Composite::Named(v) => { + let event_values: Vec> = v + .into_iter() + .filter(|d| d.0 == "event") + .map(|d| d.1) + .collect(); + + for event in event_values { + if app.events.items.len() + == app.events.items.capacity() + { + app.events.items.pop_front(); + } else { + app.events.items.push_back(event); + } + } + }, + Composite::Unnamed(_) => continue, + }, + _ => continue, + } + }, + }, + _ => continue, + } + } + } + } + if let Event::Key(key) = read()? { if key.kind == KeyEventKind::Press { match key.code { @@ -280,14 +369,27 @@ where } } -fn draw_events_tab(f: &mut Frame, _app: &mut DashBoard, area: Rect) +fn draw_events_tab(f: &mut Frame, app: &mut DashBoard, area: Rect) where B: Backend, CI: ChainInfo, { - let text = vec![Line::from("Event Page")]; - let paragraph = Paragraph::new(text); - f.render_widget(paragraph, area); + let mut text = "".to_string(); + for e in &app.events.items { + text.push_str(&format!( + "{}\n", + serde_json::to_string(e).unwrap_or("Decode Error Occurred.".to_string()) + )); + } + let l = Paragraph::new(text) + .wrap(Wrap { trim: true }) + .block( + Block::default() + .borders(Borders::ALL) + .title(format!("Latest {} Events", EVENTS_MAX_LIMIT)), + ) + .style(Style::default().fg(Color::Yellow)); + f.render_widget(l, area); } pub struct StatefulList { diff --git a/src/handler/mod.rs b/src/handler/mod.rs index cebb4c1..8e0e2a0 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -106,9 +106,9 @@ impl Handler { let mut terminal = Terminal::new(backend)?; let (blocks_tx, blocks_rx) = mpsc::unbounded_channel(); - let system_pane_info = self.client.system_pane_info().await?; - let mut headers_subs = self.client.subscribe_finalized_heads().await.unwrap(); + let (events_tx, events_rx) = mpsc::unbounded_channel(); + let mut headers_subs = self.client.subscribe_finalized_heads().await.unwrap(); tokio::spawn(async move { while let Some(header) = headers_subs.next().await { if let Ok(header) = header { @@ -119,7 +119,29 @@ impl Handler { } }); - let dashboard = DashBoard::new(system_pane_info, blocks_rx); + let mut events_subs = self.client.subscribe_events().await.unwrap(); + tokio::spawn(async move { + while let Some(storage_set) = events_subs.next().await { + if let Ok(storage) = storage_set { + let data = storage + .changes + .into_iter() + .filter_map(|(_k, data)| data) + .collect(); + if events_tx.send(data).is_err() { + break; + } + } + } + }); + + let system_pane_info = self.client.system_pane_info().await?; + let dashboard = DashBoard::new( + system_pane_info, + blocks_rx, + events_rx, + self.metadata.clone(), + ); run_dashboard(self.client.clone(), &mut terminal, dashboard).await?; // restore terminal diff --git a/src/networks/mod.rs b/src/networks/mod.rs index 44b0523..e690589 100644 --- a/src/networks/mod.rs +++ b/src/networks/mod.rs @@ -35,6 +35,7 @@ pub trait ChainInfo: Sync + Send { /// The hash type of the chain type Hash: Serialize + DeserializeOwned + + 'static + Send + FromStr + From diff --git a/src/rpc/api.rs b/src/rpc/api.rs index 9d84de0..0ebd286 100644 --- a/src/rpc/api.rs +++ b/src/rpc/api.rs @@ -5,9 +5,9 @@ use jsonrpsee::{ rpc_params, }; use serde::de::DeserializeOwned; -use sp_core::Decode; +use sp_core::{twox_128, Decode}; use sp_runtime::generic::SignedBlock; -use sp_storage::StorageKey; +use sp_storage::{StorageChangeSet, StorageKey}; use sp_version::RuntimeVersion; use subxt_metadata::Metadata; // this crate @@ -122,4 +122,18 @@ pub trait SubscribeApi { ) .await } + + async fn subscribe_events( + &self, + ) -> RpcResult>>> { + let mut key = twox_128("System".as_bytes()).to_vec(); + key.extend(twox_128("Events".as_bytes())); + + self.subscribe( + "state_subscribeStorage", + rpc_params![vec![StorageKey(key)]], + "state_unsubscribeStorage", + ) + .await + } }