Skip to content

Commit

Permalink
applied comments
Browse files Browse the repository at this point in the history
  • Loading branch information
pompon0 committed Jan 9, 2024
1 parent 14e963f commit b750173
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 109 deletions.
119 changes: 56 additions & 63 deletions node/actors/sync_blocks/src/tests/end_to_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,96 +16,97 @@ type NetworkDispatcherPipe =
pipe::DispatcherPipe<network::io::InputMessage, network::io::OutputMessage>;

#[derive(Debug)]
struct NodeHandle {
struct Node {
store: Arc<BlockStore>,
test_validators: Arc<TestValidators>,
switch_on_sender: Option<oneshot::Sender<()>>,
_switch_off_sender: oneshot::Sender<()>,
}

impl NodeHandle {
fn switch_on(&mut self) {
self.switch_on_sender.take();
}

async fn put_block(&self, ctx: &ctx::Ctx, block_number: BlockNumber) {
tracing::trace!(%block_number, "Storing new block");
let block = &self.test_validators.final_blocks[block_number.0 as usize];
self.store.queue_block(ctx, block.clone()).await.unwrap();
}
}

struct Node {
network: NetworkInstance,
store: Arc<BlockStore>,
store_runner: BlockStoreRunner,
test_validators: Arc<TestValidators>,
switch_on_receiver: oneshot::Receiver<()>,
switch_off_receiver: oneshot::Receiver<()>,
}

impl fmt::Debug for Node {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter
.debug_struct("Node")
.field("key", &self.key())
.finish()
}
}

fn to_sync_state(state: BlockStoreState) -> SyncState {
SyncState {
first_stored_block: state.first,
last_stored_block: state.last,
}
}

impl Node {
async fn new_network(
ctx: &ctx::Ctx,
node_count: usize,
gossip_peers: usize,
) -> (Vec<NodeHandle>, Vec<Node>) {
) -> (Vec<Node>, Vec<NodeRunner>) {
let rng = &mut ctx.rng();
let test_validators = Arc::new(TestValidators::new(rng, 4, 20));
let mut nodes = vec![];
let mut node_handles = vec![];
let mut runners = vec![];
for net in NetworkInstance::new(rng, node_count, gossip_peers) {
let (nh, n) = Node::new(ctx, net, test_validators.clone()).await;
let (n, r) = Node::new(ctx, net, test_validators.clone()).await;
nodes.push(n);
node_handles.push(nh);
runners.push(r);
}
(node_handles, nodes)
(nodes, runners)
}

async fn new(
ctx: &ctx::Ctx,
mut network: NetworkInstance,
test_validators: Arc<TestValidators>,
) -> (NodeHandle, Node) {
) -> (Self, NodeRunner) {
let (store, store_runner) = make_store(ctx, test_validators.final_blocks[0].clone()).await;
let (switch_on_sender, switch_on_receiver) = oneshot::channel();
let (switch_off_sender, switch_off_receiver) = oneshot::channel();

network.disable_gossip_pings();

let this = Self {
let runner = NodeRunner {
network,
store: store.clone(),
store_runner,
test_validators: test_validators.clone(),
switch_on_receiver,
switch_off_receiver,
};
let handle = NodeHandle {
let this = Self {
store,
test_validators,
switch_on_sender: Some(switch_on_sender),
_switch_off_sender: switch_off_sender,
};
(handle, this)
(this, runner)
}

fn switch_on(&mut self) {
self.switch_on_sender.take();
}

async fn put_block(&self, ctx: &ctx::Ctx, block_number: BlockNumber) {
tracing::trace!(%block_number, "Storing new block");
let block = &self.test_validators.final_blocks[block_number.0 as usize];
self.store.queue_block(ctx, block.clone()).await.unwrap();
}
}

#[must_use]
struct NodeRunner {
network: NetworkInstance,
store: Arc<BlockStore>,
store_runner: BlockStoreRunner,
test_validators: Arc<TestValidators>,
switch_on_receiver: oneshot::Receiver<()>,
switch_off_receiver: oneshot::Receiver<()>,
}

impl fmt::Debug for NodeRunner {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter
.debug_struct("NodeRunner")
.field("key", &self.key())
.finish()
}
}

fn to_sync_state(state: BlockStoreState) -> SyncState {
SyncState {
first_stored_block: state.first,
last_stored_block: state.last,
}
}

impl NodeRunner {
fn key(&self) -> node::PublicKey {
self.network.gossip_config().key.public()
}
Expand Down Expand Up @@ -196,7 +197,7 @@ impl Node {
trait GossipNetworkTest: fmt::Debug + Send {
/// Returns the number of nodes in the gossip network and number of peers for each node.
fn network_params(&self) -> (usize, usize);
async fn test(self, ctx: &ctx::Ctx, network: Vec<NodeHandle>) -> anyhow::Result<()>;
async fn test(self, ctx: &ctx::Ctx, network: Vec<Node>) -> anyhow::Result<()>;
}

#[instrument(level = "trace")]
Expand All @@ -208,20 +209,12 @@ async fn test_sync_blocks<T: GossipNetworkTest>(test: T) {
let ctx = &ctx::test_root(&ctx::AffineClock::new(CLOCK_SPEEDUP as f64))
.with_timeout(TEST_TIMEOUT * CLOCK_SPEEDUP);
let (node_count, gossip_peers) = test.network_params();
let (network, nodes) = Node::new_network(ctx, node_count, gossip_peers).await;
let (nodes, runners) = Node::new_network(ctx, node_count, gossip_peers).await;
scope::run!(ctx, |ctx, s| async {
for (i, node) in nodes.into_iter().enumerate() {
s.spawn_bg(
async {
let key = node.key();
node.run(ctx).await?;
tracing::trace!(?key, "Node task completed");
Ok(())
}
.instrument(tracing::info_span!("node", i)),
);
for (i, runner) in runners.into_iter().enumerate() {
s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i)));
}
test.test(ctx, network).await
test.test(ctx, nodes).await
})
.await
.unwrap();
Expand All @@ -239,7 +232,7 @@ impl GossipNetworkTest for BasicSynchronization {
(self.node_count, self.gossip_peers)
}

async fn test(self, ctx: &ctx::Ctx, mut node_handles: Vec<NodeHandle>) -> anyhow::Result<()> {
async fn test(self, ctx: &ctx::Ctx, mut node_handles: Vec<Node>) -> anyhow::Result<()> {
let rng = &mut ctx.rng();

// Check initial node states.
Expand Down Expand Up @@ -314,7 +307,7 @@ impl GossipNetworkTest for SwitchingOffNodes {
(self.node_count, self.node_count / 2)
}

async fn test(self, ctx: &ctx::Ctx, mut node_handles: Vec<NodeHandle>) -> anyhow::Result<()> {
async fn test(self, ctx: &ctx::Ctx, mut node_handles: Vec<Node>) -> anyhow::Result<()> {
let rng = &mut ctx.rng();

for node_handle in &mut node_handles {
Expand Down Expand Up @@ -363,7 +356,7 @@ impl GossipNetworkTest for SwitchingOnNodes {
(self.node_count, self.node_count / 2)
}

async fn test(self, ctx: &ctx::Ctx, mut node_handles: Vec<NodeHandle>) -> anyhow::Result<()> {
async fn test(self, ctx: &ctx::Ctx, mut node_handles: Vec<Node>) -> anyhow::Result<()> {
let rng = &mut ctx.rng();

let mut switched_on_nodes = Vec::with_capacity(self.node_count);
Expand Down
9 changes: 7 additions & 2 deletions node/libs/storage/src/block_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,16 @@ impl BlockStoreState {
#[async_trait::async_trait]
pub trait PersistentBlockStore: fmt::Debug + Send + Sync {
/// Range of blocks avaliable in storage.
/// PersistentBlockStore is expected to always contain at least 1 block.
/// PersistentBlockStore is expected to always contain at least 1 block,
/// and be append-only storage (never delete blocks).
/// Consensus code calls this method only once and then tracks the
/// range of avaliable blocks internally.
async fn state(&self, ctx: &ctx::Ctx) -> ctx::Result<BlockStoreState>;

/// Gets a block by its number.
/// Returns error if block is missing.
/// Caller is expected to know the state (by calling `state()`)
/// and only request the blocks contained in the state.
async fn block(
&self,
ctx: &ctx::Ctx,
Expand Down Expand Up @@ -154,8 +157,10 @@ impl BlockStore {
return Ok(None);
}
if !inner.persisted_state.contains(number) {
// Subtraction is safe, because we know that the block
// is in inner.queue at this point.
let idx = number.0 - inner.persisted_state.next().0;
return Ok(Some(inner.queue[idx as usize].clone()));
return Ok(inner.queue.get(idx as usize).cloned());
}
}
let t = metrics::PERSISTENT_BLOCK_STORE.block_latency.start();
Expand Down
9 changes: 6 additions & 3 deletions node/libs/storage/src/testonly/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ impl PersistentBlockStore for BlockStore {
) -> ctx::Result<validator::FinalBlock> {
let blocks = self.0.lock().unwrap();
let front = blocks.front().context("not found")?;
let idx = number.0 - front.header().number.0;
let idx = number
.0
.checked_sub(front.header().number.0)
.context("not found")?;
Ok(blocks.get(idx as usize).context("not found")?.clone())
}

Expand All @@ -51,8 +54,8 @@ impl PersistentBlockStore for BlockStore {
) -> ctx::Result<()> {
let mut blocks = self.0.lock().unwrap();
let got = block.header().number;
if !blocks.is_empty() {
let want = blocks.back().unwrap().header().number.next();
if let Some(last) = blocks.back() {
let want = last.header().number.next();
if got != want {
return Err(anyhow::anyhow!("got block {got:?}, while expected {want:?}").into());
}
Expand Down
37 changes: 0 additions & 37 deletions node/libs/storage/src/traits.rs

This file was deleted.

5 changes: 1 addition & 4 deletions node/tools/src/store.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
//! This module contains the methods to handle an append-only database of finalized blocks. Since we only store finalized blocks, this forms a
//! chain of blocks, not a tree (assuming we have all blocks and not have any gap). It allows for basic functionality like inserting a block,
//! getting a block, checking if a block is contained in the DB. We also store the head of the chain. Storing it explicitly allows us to fetch
//! the current head quickly.
//! RocksDB-based implementation of PersistentBlockStore and ReplicaStore.
use anyhow::Context as _;
use rocksdb::{Direction, IteratorMode, ReadOptions};
use std::{
Expand Down

0 comments on commit b750173

Please sign in to comment.