Skip to content

Commit

Permalink
dekaf: Fix data-preview session bound to allow for caught-up consumer…
Browse files Browse the repository at this point in the history
…s polling for more records
  • Loading branch information
jshearer committed Oct 17, 2024
1 parent 58dffbb commit d5ddea2
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 6 deletions.
6 changes: 1 addition & 5 deletions crates/dekaf/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,12 @@ use futures::{FutureExt, TryStreamExt};
use rsasl::config::SASLConfig;
use rustls::pki_types::CertificateDer;
use std::{
collections::HashMap,
fs::File,
io,
path::{Path, PathBuf},
sync::Arc,
};
use tokio::{
io::{AsyncRead, AsyncWrite, AsyncWriteExt},
sync::RwLock,
};
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tracing_subscriber::{filter::LevelFilter, EnvFilter};
use url::Url;

Expand Down
4 changes: 3 additions & 1 deletion crates/dekaf/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1224,7 +1224,9 @@ impl Session {
.fetch_partition_offset(partition as usize, -1)
.await?
{
if latest_offset - fetch_offset < 13 {
// If fetch_offset is > latest_offset, this is a caught-up consumer
// polling for new documents, not a data preview request.
if fetch_offset <= latest_offset && latest_offset - fetch_offset < 13 {
tracing::debug!(
latest_offset,
diff = latest_offset - fetch_offset,
Expand Down

0 comments on commit d5ddea2

Please sign in to comment.