diff --git a/crates/dekaf/src/main.rs b/crates/dekaf/src/main.rs index e4568239f3..20fac234ce 100644 --- a/crates/dekaf/src/main.rs +++ b/crates/dekaf/src/main.rs @@ -12,16 +12,12 @@ use futures::{FutureExt, TryStreamExt}; use rsasl::config::SASLConfig; use rustls::pki_types::CertificateDer; use std::{ - collections::HashMap, fs::File, io, path::{Path, PathBuf}, sync::Arc, }; -use tokio::{ - io::{AsyncRead, AsyncWrite, AsyncWriteExt}, - sync::RwLock, -}; +use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use tracing_subscriber::{filter::LevelFilter, EnvFilter}; use url::Url; diff --git a/crates/dekaf/src/session.rs b/crates/dekaf/src/session.rs index 3e0591ebda..d68e0899e8 100644 --- a/crates/dekaf/src/session.rs +++ b/crates/dekaf/src/session.rs @@ -1224,7 +1224,9 @@ impl Session { .fetch_partition_offset(partition as usize, -1) .await? { - if latest_offset - fetch_offset < 13 { + // If fetch_offset is > latest_offset, this is a caught-up consumer + // polling for new documents, not a data preview request. + if fetch_offset <= latest_offset && latest_offset - fetch_offset < 13 { tracing::debug!( latest_offset, diff = latest_offset - fetch_offset,