From 9cd7a8db7307907df80ed9d988b898764a0218be Mon Sep 17 00:00:00 2001 From: clabby Date: Wed, 16 Oct 2024 20:50:32 -0400 Subject: [PATCH] chore(derive): Add tracing to `ChannelAssembler` --- .../src/stages/channel/channel_assembler.rs | 51 ++++++++++++++++--- 1 file changed, 43 insertions(+), 8 deletions(-) diff --git a/crates/derive/src/stages/channel/channel_assembler.rs b/crates/derive/src/stages/channel/channel_assembler.rs index a4564ef60..49f615e93 100644 --- a/crates/derive/src/stages/channel/channel_assembler.rs +++ b/crates/derive/src/stages/channel/channel_assembler.rs @@ -6,11 +6,12 @@ use crate::{ prelude::{OriginProvider, PipelineError}, }; use alloc::{boxed::Box, sync::Arc}; -use alloy_primitives::Bytes; +use alloy_primitives::{hex, Bytes}; use async_trait::async_trait; use core::fmt::Debug; use op_alloy_genesis::RollupConfig; use op_alloy_protocol::{BlockInfo, Channel}; +use tracing::{debug, error, info, warn}; /// The [ChannelAssembler] stage is responsible for assembling the [Frame]s from the [FrameQueue] /// stage into a raw compressed [Channel]. @@ -70,14 +71,23 @@ where let origin = self.origin().ok_or(PipelineError::MissingOrigin.crit())?; // Time out the channel if it has timed out. - if self.channel.is_some() && self.is_timed_out()? { - #[cfg(feature = "metrics")] - { - let open_block_number = - self.channel.as_ref().map(|c| c.open_block_number()).unwrap_or_default(); - crate::observe!(CHANNEL_TIMEOUTS, (origin.number - open_block_number) as f64); + if let Some(channel) = self.channel.as_ref() { + if self.is_timed_out()? { + #[cfg(feature = "metrics")] + { + let open_block_number = + self.channel.as_ref().map(|c| c.open_block_number()).unwrap_or_default(); + crate::observe!(CHANNEL_TIMEOUTS, (origin.number - open_block_number) as f64); + } + warn!( + target: "channel-assembler", + "Channel (ID: {}) timed at L1 origin #{}, open block #{}. Discarding channel.", + hex::encode(channel.id()), + origin.number, + channel.open_block_number() + ); + self.channel = None; } - self.channel = None; } // Grab the next frame from the previous stage. @@ -85,13 +95,32 @@ where // Start a new channel if the frame number is 0. if next_frame.number == 0 { + info!( + target: "channel-assembler", + "Starting new channel (ID: {}) at L1 origin #{}", + hex::encode(next_frame.id), + origin.number + ); self.channel = Some(Channel::new(next_frame.id, origin)); } if let Some(channel) = self.channel.as_mut() { // Add the frame to the channel. If this fails, return NotEnoughData and discard the // frame. + debug!( + target: "channel-assembler", + "Adding frame #{} to channel (ID: {}) at L1 origin #{}", + next_frame.number, + hex::encode(channel.id()), + origin.number + ); if channel.add_frame(next_frame, origin).is_err() { + error!( + target: "channel-assembler", + "Failed to add frame to channel (ID: {}) at L1 origin #{}", + hex::encode(channel.id()), + origin.number + ); return Err(PipelineError::NotEnoughData.temp()); } @@ -100,6 +129,12 @@ where let channel_bytes = channel.frame_data().ok_or(PipelineError::ChannelNotFound.crit())?; + info!( + target: "channel-assembler", + "Channel (ID: {}) ready for decompression.", + hex::encode(channel.id()), + ); + // Reset the channel and return the compressed bytes. self.channel = None; return Ok(Some(channel_bytes));