From ecad3c24f3931873855825eb2b0eb9be8ac839ce Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Thu, 17 Oct 2024 15:49:11 -0400 Subject: [PATCH] dekaf: respond with correct high_water_mark/last_stable_offset when serving data preview responses --- crates/dekaf/src/session.rs | 59 +++++++++++++++++++++++-------------- 1 file changed, 37 insertions(+), 22 deletions(-) diff --git a/crates/dekaf/src/session.rs b/crates/dekaf/src/session.rs index 5cdc1321c6..a7584512d6 100644 --- a/crates/dekaf/src/session.rs +++ b/crates/dekaf/src/session.rs @@ -569,30 +569,45 @@ impl Session { hit_timeout = true } - partition_responses.push( - PartitionData::default() - .with_partition_index(partition_request.partition) - // `kafka-protocol` encodes None here using a length of -1, but librdkafka client library - // complains with: `Protocol parse failure for Fetch v11 ... invalid MessageSetSize -1` - // An empty Bytes will get encoded with a length of 0, which works fine. - .with_records(batch.or(Some(Bytes::new())).to_owned()) - .with_high_watermark(pending.last_write_head) // Map to kafka cursor. - .with_last_stable_offset(pending.last_write_head), - ); - - if read.rewrite_offsets_from.is_none() { - pending.offset = read.offset; - pending.last_write_head = read.last_write_head; - pending.handle = - tokio_util::task::AbortOnDropHandle::new(tokio::spawn(read.next_batch( - crate::read::ReadTarget::Bytes( - partition_request.partition_max_bytes as usize, + let mut partition_data = PartitionData::default() + .with_partition_index(partition_request.partition) + // `kafka-protocol` encodes None here using a length of -1, but librdkafka client library + // complains with: `Protocol parse failure for Fetch v11 ... invalid MessageSetSize -1` + // An empty Bytes will get encoded with a length of 0, which works fine. + .with_records(batch.or(Some(Bytes::new())).to_owned()); + + match &self.data_preview_state { + SessionDataPreviewState::Unknown => { + unreachable!("Must have already determined data-preview status of session") + } + SessionDataPreviewState::NotDataPreview => { + partition_data = partition_data + .with_high_watermark(pending.last_write_head) // Map to kafka cursor. + .with_last_stable_offset(pending.last_write_head); + + pending.offset = read.offset; + pending.last_write_head = read.last_write_head; + pending.handle = tokio_util::task::AbortOnDropHandle::new(tokio::spawn( + read.next_batch( + crate::read::ReadTarget::Bytes( + partition_request.partition_max_bytes as usize, + ), + std::time::Instant::now() + timeout, ), - std::time::Instant::now() + timeout, - ))); - } else { - self.reads.remove(&key); + )); + } + SessionDataPreviewState::DataPreview(data_preview_states) => { + let data_preview_state = data_preview_states + .get(&key) + .expect("should be able to find data preview state by this point"); + partition_data = partition_data + .with_high_watermark(data_preview_state.offset) // Map to kafka cursor. + .with_last_stable_offset(data_preview_state.offset); + self.reads.remove(&key); + } } + + partition_responses.push(partition_data); } topic_responses.push(