Skip to content

Commit

Permalink
Merge pull request #146 from Concordium/indexer-support
Browse files Browse the repository at this point in the history
Indexer support
  • Loading branch information
abizjak authored Dec 8, 2023
2 parents 1dad853 + cf1be36 commit b6168da
Show file tree
Hide file tree
Showing 14 changed files with 848 additions and 208 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
- Rename `find_first_finalized_block_no_later_than` into
`find_first_finalized_block_no_earlier_than` since that correctly reflects its
semantics with respect to time and is much clearer.a
- Make the `Client::new` method slightly more general by accepting a
`TryInto<Endpoint>`. This allows passing URLs as strings directly.
- Add a new `indexer` module that provides boilerplate for robustly traversing
the chain.

## 3.2.0

Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ tokio-stream = "0.1"
concordium_base = { version = "3.2.0", path = "./concordium-base/rust-src/concordium_base/", features = ["encryption"] }
concordium-smart-contract-engine = { version = "3.0", path = "./concordium-base/smart-contracts/wasm-chain-integration/", default-features = false, features = ["async"]}
aes-gcm = { version = "0.10", features = ["std"] }
tracing = "0.1"

[features]
postgres = ["tokio-postgres"]
Expand Down
1 change: 0 additions & 1 deletion examples/balance-summary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ async fn main() -> anyhow::Result<()> {
move |acc| {
let mut client = closure_client.clone();
async move {
let block = block;
let info = client.get_account_info(&acc.into(), &block).await?.response;
let additional_stake = info
.account_stake
Expand Down
1 change: 0 additions & 1 deletion examples/list-account-balances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ async fn main() -> anyhow::Result<()> {
is_baker,
acc_type,
};
if row.is_baker {}
acc_balances.push(row);
}
}
Expand Down
98 changes: 29 additions & 69 deletions examples/list-account-creations.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
//! List all account creations in a given time span.
use anyhow::Context;
use clap::AppSettings;
use concordium_rust_sdk::{
indexer::{TransactionIndexer, TraverseConfig},
types::{AbsoluteBlockHeight, BlockItemSummary, BlockItemSummaryDetails, CredentialType},
v2,
};
use futures::TryStreamExt;
use structopt::StructOpt;

#[derive(StructOpt)]
Expand All @@ -21,7 +20,7 @@ struct App {
help = "Number of parallel queries to make.",
default_value = "4"
)]
num: u64,
num: usize,
#[structopt(long = "from", help = "Starting time. Defaults to genesis time.")]
from: Option<chrono::DateTime<chrono::Utc>>,
#[structopt(long = "to", help = "End time. Defaults to infinity.")]
Expand All @@ -36,9 +35,9 @@ async fn main() -> anyhow::Result<()> {
App::from_clap(&matches)
};

let mut client = v2::Client::new(app.endpoint).await?;
let mut client = v2::Client::new(app.endpoint.clone()).await?;

let mut h = if let Some(start_time) = app.from {
let h = if let Some(start_time) = app.from {
let start = client
.find_first_finalized_block_no_earlier_than(.., start_time)
.await?;
Expand All @@ -47,75 +46,36 @@ async fn main() -> anyhow::Result<()> {
AbsoluteBlockHeight::from(0u64)
};

let mut block_stream = client.get_finalized_blocks_from(h).await?;
let (sender, mut receiver) = tokio::sync::mpsc::channel(2 * app.num);
let handle = tokio::spawn(
TraverseConfig::new_single(app.endpoint, h)
.set_max_parallel(app.num)
.traverse(TransactionIndexer, sender),
);

while let Ok(chunk) = block_stream.next_chunk(app.num as usize).await {
let mut handles = Vec::with_capacity(chunk.len());

for (_h, block) in (u64::from(h)..).zip(chunk) {
let cc = client.clone();
handles.push(tokio::spawn(async move {
let mut cc = cc.clone();
let bi = cc
.get_block_info(block.block_hash)
.await
.context("Block info.")?
.response;
if bi.transaction_count != 0 {
let summary = cc
.get_block_transaction_events(block.block_hash)
.await
.context("Block summary.")?
.response
.try_collect::<Vec<_>>()
.await?;
Ok(Some((bi, Some(summary))))
} else {
Ok::<_, anyhow::Error>(Some((bi, None)))
}
}))
}
let mut success = true;
for res in futures::future::join_all(handles).await {
if let Some((bi, summary)) = res?? {
if let Some(end) = app.to.as_ref() {
if end <= &bi.block_slot_time {
return Ok(());
}
}
h = h.next();
if let Some(summary) = summary {
for BlockItemSummary {
index: _,
energy_cost: _,
hash: _,
details,
} in summary
{
match details {
BlockItemSummaryDetails::AccountTransaction(_) => {}
BlockItemSummaryDetails::AccountCreation(x) => {
let acc_type = match x.credential_type {
CredentialType::Initial => "initial",
CredentialType::Normal => "normal",
};
println!(
"{}, {}, {}, {}",
x.address, bi.block_hash, bi.block_slot_time, acc_type
);
}
BlockItemSummaryDetails::Update(_) => (),
}
}
}
} else {
success = false;
while let Some((bi, summaries)) = receiver.recv().await {
if let Some(to) = app.to {
if to < bi.block_slot_time {
break;
}
}
if !success {
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
for BlockItemSummary { details, .. } in summaries {
match details {
BlockItemSummaryDetails::AccountTransaction(_) => {}
BlockItemSummaryDetails::AccountCreation(x) => {
let acc_type = match x.credential_type {
CredentialType::Initial => "initial",
CredentialType::Normal => "normal",
};
println!(
"{}, {}, {}, {}",
x.address, bi.block_hash, bi.block_slot_time, acc_type
);
}
BlockItemSummaryDetails::Update(_) => (),
}
}
}
handle.abort();
Ok(())
}
103 changes: 31 additions & 72 deletions examples/list-all-transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ use anyhow::Context;
use chrono::Utc;
use clap::AppSettings;
use concordium_rust_sdk::{
indexer::{TransactionIndexer, TraverseConfig},
types::{
AbsoluteBlockHeight, AccountTransactionEffects, BlockItemSummaryDetails, TransactionType,
},
v2,
};
use futures::TryStreamExt;
use std::collections::HashSet;
use structopt::StructOpt;

Expand All @@ -26,7 +26,7 @@ struct App {
help = "How many queries to make in parallel.",
default_value = "1"
)]
num: u64,
num: usize,
#[structopt(long = "from", help = "Starting time. Defaults to genesis time.")]
from: Option<chrono::DateTime<chrono::Utc>>,
#[structopt(long = "to", help = "End time. Defaults to infinity.")]
Expand Down Expand Up @@ -57,12 +57,10 @@ async fn main() -> anyhow::Result<()> {
.collect::<anyhow::Result<HashSet<TransactionType>>>()
.context("Could not read transaction types.")?;

let mut client = v2::Client::new(app.endpoint).await?;

let _cs = client.get_consensus_info().await?;
let mut client = v2::Client::new(app.endpoint.clone()).await?;

// Find the block to start at.
let mut h = if let Some(start_time) = app.from {
let h = if let Some(start_time) = app.from {
let start = client
.find_first_finalized_block_no_earlier_than(.., start_time)
.await?;
Expand All @@ -71,75 +69,36 @@ async fn main() -> anyhow::Result<()> {
AbsoluteBlockHeight::from(0u64)
};
// Query blocks by increasing height.
let mut block_stream = client.get_finalized_blocks_from(h).await?;
loop {
let Ok((error, chunk)) = block_stream
.next_chunk_timeout(app.num as usize, std::time::Duration::from_millis(500))
.await
else {
// if we failed and end time is not yet here, then wait a bit
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
continue;
};
let mut handles = Vec::with_capacity(app.num as usize);
for block in chunk {
let mut cc = client.clone();
handles.push(tokio::spawn(async move {
let bi = cc
.get_block_info(block.block_hash)
.await
.context("Block info.")?
.response;
if bi.transaction_count != 0 {
let summary = cc
.get_block_transaction_events(block.block_hash)
.await
.context("Block summary.")?
.response
.try_collect::<Vec<_>>()
.await?;
Ok((bi, Some(summary)))
} else {
Ok::<_, anyhow::Error>((bi, None))
}
}))
}
for res in futures::future::join_all(handles).await {
let (bi, summary) = res??;
if let Some(end) = app.to.as_ref() {
if end <= &bi.block_slot_time {
return Ok(());
}
let (sender, mut receiver) = tokio::sync::mpsc::channel(10);
tokio::spawn(
TraverseConfig::new_single(app.endpoint, h)
.set_max_parallel(app.num)
.traverse(TransactionIndexer, sender),
);
while let Some((bi, summary)) = receiver.recv().await {
if let Some(end) = app.to.as_ref() {
if end <= &bi.block_slot_time {
return Ok(());
}
h = h.next();
if let Some(summary) = summary {
for bisummary in summary {
if let BlockItemSummaryDetails::AccountTransaction(at) = &bisummary.details {
if types.is_empty()
|| at
.transaction_type()
.map_or(false, |tt| types.contains(&tt))
{
let is_success =
!matches!(&at.effects, AccountTransactionEffects::None { .. });
let type_string = at
.transaction_type()
.map_or_else(|| "N/A".into(), |tt| tt.to_string());
println!(
"{}, {}, {}, {}, {}",
bi.block_slot_time,
bi.block_hash,
bisummary.hash,
is_success,
type_string
)
}
}
}
for bisummary in summary {
if let BlockItemSummaryDetails::AccountTransaction(at) = &bisummary.details {
if types.is_empty()
|| at
.transaction_type()
.map_or(false, |tt| types.contains(&tt))
{
let is_success = !matches!(&at.effects, AccountTransactionEffects::None { .. });
let type_string = at
.transaction_type()
.map_or_else(|| "N/A".into(), |tt| tt.to_string());
println!(
"{}, {}, {}, {}, {}",
bi.block_slot_time, bi.block_hash, bisummary.hash, is_success, type_string
)
}
}
}
if error {
anyhow::bail!("Finalized blocks vanished.")
}
}
Ok(())
}
Loading

0 comments on commit b6168da

Please sign in to comment.