Skip to content

Commit

Permalink
chore(derive): [Holocene] Drain previous channel in one iteration (#583)
Browse files Browse the repository at this point in the history
* fix(derive): Holocene frame queue drain

* lint

* fix: bad rebase

* fix: bad rebase

---------

Co-authored-by: refcell <[email protected]>
  • Loading branch information
clabby and refcell authored Sep 30, 2024
1 parent d5e7038 commit 3b5499c
Showing 1 changed file with 51 additions and 23 deletions.
74 changes: 51 additions & 23 deletions crates/derive/src/stages/frame_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,36 +67,43 @@ where

let mut i = 0;
while i < self.queue.len() - 1 {
let frame = &self.queue[i];
let prev_frame = &self.queue[i];
let next_frame = &self.queue[i + 1];

// If the frames are in the same channel, and the current is last, drop the next frame.
if frame.id == next_frame.id && frame.is_last {
self.queue.remove(i + 1);
continue;
}
let extends_channel = prev_frame.id == next_frame.id;

// If the frames are in the same channel, and the frame numbers are not sequential,
// drop the next frame.
if frame.id == next_frame.id && frame.number + 1 != next_frame.number {
if extends_channel && prev_frame.number + 1 != next_frame.number {
self.queue.remove(i + 1);
continue;
}

// If the frames are in different channels, and the current channel is not last, walk
// back the channel and drop all prev frames.
if frame.id != next_frame.id && !frame.is_last && next_frame.number == 0 {
self.queue.remove(i);
i = i.saturating_sub(1);
// If the frames are in the same channel, and the previous is last, drop the next frame.
if extends_channel && prev_frame.is_last {
self.queue.remove(i + 1);
continue;
}

// If the frames are in different channels, the next frame must be first.
if frame.id != next_frame.id && next_frame.number != 0 {
if !extends_channel && next_frame.number != 0 {
self.queue.remove(i + 1);
continue;
}

// If the frames are in different channels, and the current channel is not last, walk
// back the channel and drop all prev frames.
if !extends_channel && !prev_frame.is_last && next_frame.number == 0 {
// Find the index of the first frame in the queue with the same channel ID
// as the previous frame.
let first_frame =
self.queue.iter().position(|f| f.id == prev_frame.id).expect("infallible");

// Drain all frames from the previous channel.
let drained = self.queue.drain(first_frame..=i);
i = i.saturating_sub(drained.len());
continue;
}

i += 1;
}
}
Expand Down Expand Up @@ -125,21 +132,15 @@ where
return Ok(());
};

// Frame parsing should return an error if no frames were parsed
// and the above branch should be hit. Either way, return early here
// so as to not re-apply the pruning rules against all frames.
if frames.is_empty() {
error!(target: "frame-queue", "Frames should not be empty after parsing.");
return Ok(());
}

crate::inc!(DERIVED_FRAMES_COUNT, frames.len() as f64, &["success"]);
// Optimistically extend the queue with the new frames.
self.queue.extend(frames);

// Prune frames if Holocene is active.
let origin = self.origin().ok_or(PipelineError::MissingOrigin.crit())?;
self.prune(origin);

crate::inc!(DERIVED_FRAMES_COUNT, self.queue.len() as f64, &["success"]);

Ok(())
}
}
Expand Down Expand Up @@ -476,6 +477,33 @@ pub(crate) mod tests {
assert_eq!(err, PipelineError::Eof.temp());
}

#[tokio::test]
async fn test_holocene_replace_channel() {
let frames = vec![
// -- First Channel - VALID & CLOSED --
Frame { id: [0xDD; 16], number: 0, data: vec![0xDD; 50], is_last: false },
Frame { id: [0xDD; 16], number: 1, data: vec![0xDD; 50], is_last: true },
// -- Second Channel - VALID & NOT CLOSED / DROPPED --
Frame { id: [0xEE; 16], number: 0, data: vec![0xDD; 50], is_last: false },
Frame { id: [0xEE; 16], number: 1, data: vec![0xDD; 50], is_last: false },
// -- Third Channel - VALID & CLOSED / REPLACES CHANNEL #2 --
Frame { id: [0xFF; 16], number: 0, data: vec![0xDD; 50], is_last: false },
Frame { id: [0xFF; 16], number: 1, data: vec![0xDD; 50], is_last: true },
];
let encoded = encode_frames(frames.clone());
let config = RollupConfig { holocene_time: Some(0), ..Default::default() };
let mut mock = MockFrameQueueProvider::new(vec![Ok(encoded)]);
mock.set_origin(BlockInfo::default());
let mut frame_queue = FrameQueue::new(mock, Arc::new(config));
assert!(frame_queue.is_holocene_active(BlockInfo::default()));
for frame in frames.iter().filter(|f| f.id != [0xEE; 16]) {
let frame_decoded = frame_queue.next_frame().await.unwrap();
assert_eq!(frame_decoded, *frame);
}
let err = frame_queue.next_frame().await.unwrap_err();
assert_eq!(err, PipelineError::Eof.temp());
}

#[tokio::test]
async fn test_holocene_interleaved_invalid_channel() {
let frames = vec![
Expand Down

0 comments on commit 3b5499c

Please sign in to comment.