Skip to content

Commit

Permalink
Add BlobSidecar events
Browse files Browse the repository at this point in the history
  • Loading branch information
Tumas committed Oct 30, 2024
1 parent 584d106 commit dfd04a9
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 6 deletions.
31 changes: 30 additions & 1 deletion fork_choice_control/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ use serde::Serialize;
use tap::Pipe as _;
use types::{
combined::{Attestation, BeaconState, SignedAggregateAndProof, SignedBeaconBlock},
deneb::containers::BlobIdentifier,
deneb::{
containers::{BlobIdentifier, BlobSidecar},
primitives::{BlobIndex, KzgCommitment, VersionedHash},
},
phase0::{
containers::Checkpoint,
primitives::{DepositIndex, Epoch, ExecutionBlockHash, Slot, ValidatorIndex, H256},
Expand Down Expand Up @@ -213,6 +216,7 @@ impl<P: Preset, W> ValidatorMessage<P, W> {
#[derive(Debug)]
pub enum ApiMessage<P: Preset> {
AttestationEvent(Arc<Attestation<P>>),
BlobSidecarEvent(BlobSidecarEvent),
BlockEvent(BlockEvent),
ChainReorgEvent(ChainReorgEvent),
FinalizedCheckpoint(FinalizedCheckpointEvent),
Expand Down Expand Up @@ -255,6 +259,31 @@ impl<P: Preset> SyncMessage<P> {
}
}

#[derive(Debug, Serialize)]
pub struct BlobSidecarEvent {
pub block_root: H256,
#[serde(with = "serde_utils::string_or_native")]
pub index: BlobIndex,
#[serde(with = "serde_utils::string_or_native")]
pub slot: Slot,
pub kzg_commitment: KzgCommitment,
pub versioned_hash: VersionedHash,
}

impl BlobSidecarEvent {
pub fn new<P: Preset>(block_root: H256, blob_sidecar: &BlobSidecar<P>) -> Self {
let kzg_commitment = blob_sidecar.kzg_commitment;

Self {
block_root,
index: blob_sidecar.index,
slot: blob_sidecar.slot(),
kzg_commitment,
versioned_hash: misc::kzg_commitment_to_versioned_hash(kzg_commitment),
}
}
}

#[derive(Debug, Serialize)]
pub struct BlockEvent {
#[serde(with = "serde_utils::string_or_native")]
Expand Down
14 changes: 9 additions & 5 deletions fork_choice_control/src/mutator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ use types::{
use crate::{
block_processor::BlockProcessor,
messages::{
AttestationVerifierMessage, MutatorMessage, P2pMessage, PoolMessage, SubnetMessage,
SyncMessage, ValidatorMessage,
AttestationVerifierMessage, BlobSidecarEvent, MutatorMessage, P2pMessage, PoolMessage,
SubnetMessage, SyncMessage, ValidatorMessage,
},
misc::{
Delayed, MutatorRejectionReason, PendingAggregateAndProof, PendingAttestation,
Expand Down Expand Up @@ -1055,7 +1055,7 @@ where

reply_to_http_api(sender, Ok(ValidationOutcome::Accept));

self.accept_blob_sidecar(&wait_group, blob_sidecar);
self.accept_blob_sidecar(&wait_group, &blob_sidecar);
}
Ok(BlobSidecarAction::Ignore(publishable)) => {
let (gossip_id, sender) = origin.split();
Expand Down Expand Up @@ -1609,19 +1609,23 @@ where
Ok(())
}

fn accept_blob_sidecar(&mut self, wait_group: &W, blob_sidecar: Arc<BlobSidecar<P>>) {
fn accept_blob_sidecar(&mut self, wait_group: &W, blob_sidecar: &Arc<BlobSidecar<P>>) {
let old_head = self.store.head().clone();
let head_was_optimistic = old_head.is_optimistic();
let block_root = blob_sidecar.signed_block_header.message.hash_tree_root();

self.store_mut().apply_blob_sidecar(blob_sidecar);
self.store_mut()
.apply_blob_sidecar(blob_sidecar.clone_arc());

self.update_store_snapshot();

if let Some(pending_block) = self.delayed_until_blobs.get(&block_root) {
self.retry_block(wait_group.clone(), pending_block.clone());
}

ApiMessage::BlobSidecarEvent(BlobSidecarEvent::new(block_root, blob_sidecar))
.send(&self.api_tx);

self.spawn(PersistBlobSidecarsTask {
store_snapshot: self.owned_store(),
storage: self.storage.clone_arc(),
Expand Down
4 changes: 4 additions & 0 deletions http_api/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use tokio::sync::broadcast::{self, Receiver, Sender};
pub enum Topic {
Attestation,
AttesterSlashing,
BlobSidecar,
Block,
BlsToExecutionChange,
ChainReorg,
Expand All @@ -28,6 +29,7 @@ impl Topic {
pub struct EventChannels {
pub attestations: Sender<Event>,
pub attester_slashings: Sender<Event>,
pub blob_sidecars: Sender<Event>,
pub blocks: Sender<Event>,
pub bls_to_execution_changes: Sender<Event>,
pub chain_reorgs: Sender<Event>,
Expand All @@ -43,6 +45,7 @@ impl EventChannels {
Self {
attestations: broadcast::channel(max_events).0,
attester_slashings: broadcast::channel(max_events).0,
blob_sidecars: broadcast::channel(max_events).0,
blocks: broadcast::channel(max_events).0,
bls_to_execution_changes: broadcast::channel(max_events).0,
chain_reorgs: broadcast::channel(max_events).0,
Expand All @@ -58,6 +61,7 @@ impl EventChannels {
match topic {
Topic::Attestation => &self.attestations,
Topic::AttesterSlashing => &self.attester_slashings,
Topic::BlobSidecar => &self.blob_sidecars,
Topic::Block => &self.blocks,
Topic::BlsToExecutionChange => &self.bls_to_execution_changes,
Topic::ChainReorg => &self.chain_reorgs,
Expand Down
5 changes: 5 additions & 0 deletions http_api/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ async fn handle_events<P: Preset>(
let EventChannels {
attestations,
attester_slashings,
blob_sidecars,
blocks,
bls_to_execution_changes,
chain_reorgs,
Expand Down Expand Up @@ -237,6 +238,10 @@ async fn handle_events<P: Preset>(
let event = Topic::Attestation.build(attestation)?;
attestations.send(event).unwrap_or_default()
}
ApiMessage::BlobSidecarEvent(blob_sidecar) => {
let event = Topic::BlobSidecar.build(blob_sidecar)?;
blob_sidecars.send(event).unwrap_or_default()
}
ApiMessage::BlockEvent(block_event) => {
let event = Topic::Block.build(block_event)?;
blocks.send(event).unwrap_or_default()
Expand Down

0 comments on commit dfd04a9

Please sign in to comment.