diff --git a/crates/derive/src/stages/channel_reader.rs b/crates/derive/src/stages/channel_reader.rs index 4831c086..c6189b4e 100644 --- a/crates/derive/src/stages/channel_reader.rs +++ b/crates/derive/src/stages/channel_reader.rs @@ -176,8 +176,8 @@ impl BatchReader { } let compression_type = data[0]; - if (compression_type & 0x0F) == ZLIB_DEFLATE_COMPRESSION_METHOD - || (compression_type & 0x0F) == ZLIB_RESERVED_COMPRESSION_METHOD + if (compression_type & 0x0F) == ZLIB_DEFLATE_COMPRESSION_METHOD || + (compression_type & 0x0F) == ZLIB_RESERVED_COMPRESSION_METHOD { self.decompressed = decompress_to_vec_zlib(&data).ok()?; } else if compression_type == CHANNEL_VERSION_BROTLI { @@ -246,7 +246,7 @@ mod test { async fn test_next_batch_batch_reader_no_data() { let mock = MockChannelReaderProvider::new(vec![Ok(None)]); let mut reader = ChannelReader::new(mock, Arc::new(RollupConfig::default())); - assert_eq!(reader.next_batch().await, Err(StageError::NoChannel)); + assert!(matches!(reader.next_batch().await.unwrap_err(), StageError::Temporary(_))); assert!(reader.next_batch.is_none()); }