diff --git a/crates/dekaf/src/session.rs b/crates/dekaf/src/session.rs index 4a33e2c853..a28e7a5fd6 100644 --- a/crates/dekaf/src/session.rs +++ b/crates/dekaf/src/session.rs @@ -19,11 +19,11 @@ use kafka_protocol::{ }, protocol::{buf::ByteBuf, Decodable, Encodable, Message, StrBytes}, }; +use std::{cmp::max, sync::Arc, time::Duration}; use std::{ collections::{hash_map::Entry, HashMap}, time::{SystemTime, UNIX_EPOCH}, }; -use std::{sync::Arc, time::Duration, cmp::max}; use tracing::instrument; struct PendingRead { @@ -430,7 +430,9 @@ impl Session { // so long as the request is still a data preview request. If not, bail out Entry::Occupied(entry) => { let data_preview_state = entry.get(); - if data_preview_state.offset - fetch_offset > 12 { + if fetch_offset > data_preview_state.offset + || data_preview_state.offset - fetch_offset > 12 + { bail!("Session was used for fetching preview data, cannot be used for fetching non-preview data.") } Some(data_preview_state.to_owned())