Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dekaf: Detect and handle requests for documents in a limited range directly before the write-head of a collection #1701

Merged
merged 8 commits into from
Oct 18, 2024
1 change: 1 addition & 0 deletions crates/dekaf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ async fn handle_api(
// https://github.com/confluentinc/librdkafka/blob/e03d3bb91ed92a38f38d9806b8d8deffe78a1de5/src/rdkafka_request.c#L2823
let (header, request) = dec_request(frame, version)?;
tracing::debug!(client_id=?header.client_id, "Got client ID!");
session.client_id = header.client_id.clone().map(|id| id.to_string());
Ok(enc_resp(out, &header, session.api_versions(request).await?))
}
ApiKey::SaslHandshakeKey => {
Expand Down
42 changes: 32 additions & 10 deletions crates/dekaf/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use doc::AsNode;
use futures::StreamExt;
use gazette::journal::{ReadJsonLine, ReadJsonLines};
use gazette::{broker, journal, uuid};
use kafka_protocol::records::Compression;
use kafka_protocol::records::{Compression, TimestampType};
use lz4_flex::frame::BlockMode;
use std::time::{Duration, Instant};

Expand All @@ -28,6 +28,8 @@ pub struct Read {

// Keep these details around so we can create a new ReadRequest if we need to skip forward
journal_name: String,

pub(crate) rewrite_offsets_from: Option<i64>,
}

pub enum BatchResult {
Expand All @@ -39,6 +41,12 @@ pub enum BatchResult {
TimeoutNoData,
}

#[derive(Copy, Clone)]
pub enum ReadTarget {
Bytes(usize),
Docs(usize),
}

impl Read {
pub fn new(
client: journal::Client,
Expand All @@ -47,6 +55,7 @@ impl Read {
offset: i64,
key_schema_id: u32,
value_schema_id: u32,
rewrite_offsets_from: Option<i64>,
) -> Self {
let (not_before_sec, _) = collection.not_before.to_unix();

Expand Down Expand Up @@ -79,17 +88,18 @@ impl Read {
value_schema_id,

journal_name: partition.spec.name.clone(),
rewrite_offsets_from,
}
}

#[tracing::instrument(skip_all,fields(journal_name=self.journal_name))]
pub async fn next_batch(
mut self,
target_bytes: usize,
target: ReadTarget,
timeout: Instant,
) -> anyhow::Result<(Self, BatchResult)> {
use kafka_protocol::records::{
Compression, Record, RecordBatchEncoder, RecordEncodeOptions, TimestampType,
Compression, Record, RecordBatchEncoder, RecordEncodeOptions,
};

let mut records: Vec<Record> = Vec::new();
Expand All @@ -109,7 +119,10 @@ impl Read {

let mut did_timeout = false;

while records_bytes < target_bytes {
while match target {
ReadTarget::Bytes(target_bytes) => records_bytes < target_bytes,
ReadTarget::Docs(target_docs) => records.len() < target_docs,
} {
let read = match tokio::select! {
biased; // Attempt to read before yielding.

Expand Down Expand Up @@ -169,6 +182,8 @@ impl Read {
ReadJsonLine::Doc { root, next_offset } => (root, next_offset),
};

let mut record_bytes: usize = 0;

let Some(doc::ArchivedNode::String(uuid)) = self.uuid_ptr.query(root.get()) else {
let serialized_doc = root.get().to_debug_json_value();
anyhow::bail!(
Expand Down Expand Up @@ -215,14 +230,14 @@ impl Read {
buf.put_i16(9999);
// ControlMessageType: unused: i16
buf.put_i16(9999);
records_bytes += 4;
record_bytes += 4;
Some(buf.split().freeze())
} else {
tmp.push(0);
tmp.extend(self.key_schema_id.to_be_bytes());
() = avro::encode_key(&mut tmp, &self.key_schema, root.get(), &self.key_ptr)?;

records_bytes += tmp.len();
record_bytes += tmp.len();
buf.extend_from_slice(&tmp);
tmp.clear();
Some(buf.split().freeze())
Expand All @@ -236,7 +251,7 @@ impl Read {
tmp.extend(self.value_schema_id.to_be_bytes());
() = avro::encode(&mut tmp, &self.value_schema, root.get())?;

records_bytes += tmp.len();
record_bytes += tmp.len();
buf.extend_from_slice(&tmp);
tmp.clear();
Some(buf.split().freeze())
Expand All @@ -257,7 +272,11 @@ impl Read {
//
// Note that sequence must increment at the same rate
// as offset for efficient record batch packing.
let kafka_offset = next_offset - 1;
let kafka_offset = if let Some(rewrite_from) = self.rewrite_offsets_from {
rewrite_from + records.len() as i64
jshearer marked this conversation as resolved.
Show resolved Hide resolved
} else {
next_offset - 1
};

records.push(Record {
control: is_control,
Expand All @@ -273,6 +292,7 @@ impl Read {
transactional: false,
value,
});
records_bytes += record_bytes;
}

let opts = RecordEncodeOptions {
Expand All @@ -297,12 +317,14 @@ impl Read {
metrics::counter!("dekaf_bytes_read", "journal_name" => self.journal_name.to_owned())
.increment(records_bytes as u64);

let frozen = buf.freeze();

Ok((
self,
match (records.len() > 0, did_timeout) {
(false, true) => BatchResult::TimeoutNoData,
(true, true) => BatchResult::TimeoutExceededBeforeTarget(buf.freeze()),
(true, false) => BatchResult::TargetExceededBeforeTimeout(buf.freeze()),
(true, true) => BatchResult::TimeoutExceededBeforeTarget(frozen),
(true, false) => BatchResult::TargetExceededBeforeTimeout(frozen),
(false, false) => {
unreachable!("shouldn't be able see no documents, and also not timeout")
}
Expand Down
Loading
Loading