Skip to content

Commit

Permalink
chore(derive): Add tracing to ChannelAssembler
Browse files Browse the repository at this point in the history
  • Loading branch information
clabby committed Oct 17, 2024
1 parent cfa563b commit 9cd7a8d
Showing 1 changed file with 43 additions and 8 deletions.
51 changes: 43 additions & 8 deletions crates/derive/src/stages/channel/channel_assembler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ use crate::{
prelude::{OriginProvider, PipelineError},
};
use alloc::{boxed::Box, sync::Arc};
use alloy_primitives::Bytes;
use alloy_primitives::{hex, Bytes};
use async_trait::async_trait;
use core::fmt::Debug;
use op_alloy_genesis::RollupConfig;
use op_alloy_protocol::{BlockInfo, Channel};
use tracing::{debug, error, info, warn};

/// The [ChannelAssembler] stage is responsible for assembling the [Frame]s from the [FrameQueue]
/// stage into a raw compressed [Channel].
Expand Down Expand Up @@ -70,28 +71,56 @@ where
let origin = self.origin().ok_or(PipelineError::MissingOrigin.crit())?;

// Time out the channel if it has timed out.
if self.channel.is_some() && self.is_timed_out()? {
#[cfg(feature = "metrics")]
{
let open_block_number =
self.channel.as_ref().map(|c| c.open_block_number()).unwrap_or_default();
crate::observe!(CHANNEL_TIMEOUTS, (origin.number - open_block_number) as f64);
if let Some(channel) = self.channel.as_ref() {
if self.is_timed_out()? {
#[cfg(feature = "metrics")]
{
let open_block_number =
self.channel.as_ref().map(|c| c.open_block_number()).unwrap_or_default();
crate::observe!(CHANNEL_TIMEOUTS, (origin.number - open_block_number) as f64);
}
warn!(
target: "channel-assembler",
"Channel (ID: {}) timed at L1 origin #{}, open block #{}. Discarding channel.",
hex::encode(channel.id()),
origin.number,
channel.open_block_number()

Check warning on line 87 in crates/derive/src/stages/channel/channel_assembler.rs

View check run for this annotation

Codecov / codecov/patch

crates/derive/src/stages/channel/channel_assembler.rs#L84-L87

Added lines #L84 - L87 were not covered by tests
);
self.channel = None;
}
self.channel = None;
}

// Grab the next frame from the previous stage.
let next_frame = self.prev.next_frame().await?;

// Start a new channel if the frame number is 0.
if next_frame.number == 0 {
info!(
target: "channel-assembler",
"Starting new channel (ID: {}) at L1 origin #{}",
hex::encode(next_frame.id),

Check warning on line 101 in crates/derive/src/stages/channel/channel_assembler.rs

View check run for this annotation

Codecov / codecov/patch

crates/derive/src/stages/channel/channel_assembler.rs#L100-L101

Added lines #L100 - L101 were not covered by tests
origin.number
);
self.channel = Some(Channel::new(next_frame.id, origin));
}

if let Some(channel) = self.channel.as_mut() {
// Add the frame to the channel. If this fails, return NotEnoughData and discard the
// frame.
debug!(
target: "channel-assembler",
"Adding frame #{} to channel (ID: {}) at L1 origin #{}",
next_frame.number,
hex::encode(channel.id()),

Check warning on line 114 in crates/derive/src/stages/channel/channel_assembler.rs

View check run for this annotation

Codecov / codecov/patch

crates/derive/src/stages/channel/channel_assembler.rs#L112-L114

Added lines #L112 - L114 were not covered by tests
origin.number
);
if channel.add_frame(next_frame, origin).is_err() {
error!(
target: "channel-assembler",
"Failed to add frame to channel (ID: {}) at L1 origin #{}",
hex::encode(channel.id()),

Check warning on line 121 in crates/derive/src/stages/channel/channel_assembler.rs

View check run for this annotation

Codecov / codecov/patch

crates/derive/src/stages/channel/channel_assembler.rs#L120-L121

Added lines #L120 - L121 were not covered by tests
origin.number
);
return Err(PipelineError::NotEnoughData.temp());
}

Expand All @@ -100,6 +129,12 @@ where
let channel_bytes =
channel.frame_data().ok_or(PipelineError::ChannelNotFound.crit())?;

info!(
target: "channel-assembler",
"Channel (ID: {}) ready for decompression.",
hex::encode(channel.id()),

Check warning on line 135 in crates/derive/src/stages/channel/channel_assembler.rs

View check run for this annotation

Codecov / codecov/patch

crates/derive/src/stages/channel/channel_assembler.rs#L134-L135

Added lines #L134 - L135 were not covered by tests
);

// Reset the channel and return the compressed bytes.
self.channel = None;
return Ok(Some(channel_bytes));
Expand Down

0 comments on commit 9cd7a8d

Please sign in to comment.