This repository show cases a functional base Rust project that consumes a Substreams .spkg
(local or remote file).
To run:
SUBSTREAMS_API_TOKEN="<StreamingFast API Token>" cargo run -- https://mainnet.eth.streamingfast.io:443 https://github.com/streamingfast/substreams-eth-block-meta/releases/download/v0.5.1/substreams-eth-block-meta-v0.5.1.spkg db_out
The presented Rust project contains a SubstreamsStream
wrapper that handles automatic reconnection in case of error. It is implemented as a Rust TryStream
which enable consuming the retryable stream easily using standard Rust syntax:
let stream = SubstreamsStream::new(...);
loop {
match stream.next().await {
None => { /* Stream completed, reached end block */ },
Some(Ok(BlockResponse::New(data))) => { /* Got a BlockScopedData message */ },
Some(Err(err)) => { /* Fatal error or retry limit reached */ },
}
}
The main.rs
file accepts three arguments: the substreams endpoint (in the form http(s)?://<url>:<port>
), the location of the .spkg
file to use for the request, and the output module's name to stream from.
For now cursor
handling is not properly loaded/saved to database, something that would be required on a production system to ensure the stream is resumed at the right location and that a block is never miss.
Should be implemented in main.rs in persist_cursor
function and in load_persisted_cursor
.
Warning If you don't implement cursor persistence, if your process restart, it will start back from specified
start_block
(currently hard-coded to0
).
We use println
statements within main.rs and substreams_stream.rs to demo what is happening within the codebase. Those, specially in substreams_stream.rs, should be replaced to be logged to a logging system.
Also, more places could be instrumented to log extra details.
BlockUndoSignal
must be treated as "delete every data that has been recorded after block height specified by block in BlockUndoSignal". In the example above, this means you must delete changes done by Block #7b
and Block #6b
. The exact details depends on your own logic. If for example all your added record contain a block number, a simple way is to do delete all records where block_num > 5
which is the block num received in the BlockUndoSignal
(this is true for append only records, so when only INSERT
are allowed).
This is left to be implemented by you how to deal with that.
Warning It's done using
unimplemented!
macro which will panic if an undo signal is received, so be warned for a production system. Undo signals on Ethereum Mainnet happen around 5-10 times a day, even less so you might miss the fact that they exist when testing.
Protobuf generation is done using buf which can be installed with:
brew install bufbuild/buf/buf
Note See other installation methods if not on Mac/Linux (
brew
can be installed on Linux).
If you have an .spkg
that already contains your Protobuf definitions, you can use it directly. It contains Substreams system Protobuf definitions so you can generate everything in one shot:
buf generate --exclude-path="google" <path/to/substream.spkg>#format=bin
Note An
.spkg
contains recursively all Proto definitions, some you may not desire. You can exclude generation of some element via--exclude-path="google"
flag. You can specify many separated by a comma.
You can generate against published Substreams Buf Module:
buf generate buf.build/streamingfast/substreams --include-imports
This will include only Substreams system Protobufs to decode packages and perform RPC operations.