Skip to content

Commit

Permalink
dekaf: Clean up and clarify some anonymous types, as well as ensuring…
Browse files Browse the repository at this point in the history
… that a session that has been used for data previews cannot subsequently be re-used for non-preview fetches
  • Loading branch information
jshearer committed Oct 16, 2024
1 parent b20aac2 commit 77e821e
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 96 deletions.
237 changes: 150 additions & 87 deletions crates/dekaf/src/session.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use super::{App, Collection, Read};
use crate::{
from_downstream_topic_name, from_upstream_topic_name, read::BatchResult,
to_downstream_topic_name, to_upstream_topic_name, topology::fetch_all_collection_names,
from_downstream_topic_name, from_upstream_topic_name,
read::BatchResult,
to_downstream_topic_name, to_upstream_topic_name,
topology::{fetch_all_collection_names, PartitionOffset},
Authenticated,
};
use anyhow::Context;
use anyhow::{bail, Context};
use bytes::{BufMut, Bytes, BytesMut};
use kafka_protocol::{
error::{ParseResponseErrorCode, ResponseError},
Expand All @@ -18,10 +20,7 @@ use kafka_protocol::{
protocol::{buf::ByteBuf, Decodable, Encodable, Message, StrBytes},
};
use std::{
collections::{
hash_map::{Entry, OccupiedEntry},
HashMap,
},
collections::{hash_map::Entry, HashMap},
time::{SystemTime, UNIX_EPOCH},
};
use std::{sync::Arc, time::Duration};
Expand All @@ -33,12 +32,19 @@ struct PendingRead {
handle: tokio_util::task::AbortOnDropHandle<anyhow::Result<(Read, BatchResult)>>,
}

#[derive(Clone, Debug)]
enum SessionDataPreviewState {
Unknown,
NotDataPreview,
DataPreview(HashMap<(TopicName, i32), PartitionOffset>),
}

pub struct Session {
app: Arc<App>,
reads: HashMap<(TopicName, i32), PendingRead>,
secret: String,
auth: Option<Authenticated>,
data_preview_offsets: HashMap<(TopicName, i32), Option<(i64, i64)>>,
data_preview_state: SessionDataPreviewState,
pub client_id: Option<String>,
}

Expand All @@ -50,7 +56,7 @@ impl Session {
auth: None,
secret,
client_id: None,
data_preview_offsets: HashMap::new(),
data_preview_state: SessionDataPreviewState::Unknown,
}
}

Expand Down Expand Up @@ -271,48 +277,46 @@ impl Session {
.await?;

// Concurrently fetch Collection instances and offsets for all requested topics and partitions.
// Map each "topic" into Vec<(Partition Index, Option<(Journal Offset, Timestamp))>.
let collections: anyhow::Result<
Vec<(TopicName, Vec<(i32, i64, Option<(i64, i64, i64)>)>)>,
> = futures::future::try_join_all(request.topics.into_iter().map(|topic| async move {
let maybe_collection = Collection::new(
client,
from_downstream_topic_name(topic.name.clone()).as_str(),
)
.await?;
// Map each "topic" into Vec<(Partition Index, Option<PartitionOffset>.
let collections: anyhow::Result<Vec<(TopicName, Vec<(i32, Option<PartitionOffset>)>)>> =
futures::future::try_join_all(request.topics.into_iter().map(|topic| async move {
let maybe_collection = Collection::new(
client,
from_downstream_topic_name(topic.name.clone()).as_str(),
)
.await?;

let Some(collection) = maybe_collection else {
return Ok((
topic.name,
topic
.partitions
.iter()
.map(|p| (p.partition_index, p.timestamp, None))
.collect(),
));
};
let collection = &collection;

// Concurrently fetch requested offset for each named partition.
let offsets: anyhow::Result<_> = futures::future::try_join_all(
topic.partitions.into_iter().map(|partition| async move {
Ok((
partition.partition_index,
partition.timestamp,
collection
.fetch_partition_offset(
partition.partition_index as usize,
partition.timestamp, // In millis.
)
.await?,
))
}),
)
.await;
let Some(collection) = maybe_collection else {
return Ok((
topic.name,
topic
.partitions
.iter()
.map(|p| (p.partition_index, None))
.collect(),
));
};
let collection = &collection;

// Concurrently fetch requested offset for each named partition.
let offsets: anyhow::Result<_> = futures::future::try_join_all(
topic.partitions.into_iter().map(|partition| async move {
Ok((
partition.partition_index,
collection
.fetch_partition_offset(
partition.partition_index as usize,
partition.timestamp, // In millis.
)
.await?,
))
}),
)
.await;

Ok((topic.name, offsets?))
}))
.await;
Ok((topic.name, offsets?))
}))
.await;

let collections = collections?;

Expand All @@ -326,8 +330,13 @@ impl Session {
.map(|(topic_name, offsets)| {
let partitions = offsets
.into_iter()
.map(|(partition_index, request_timestamp, maybe_offset)| {
let Some((offset, fragment_start, timestamp)) = maybe_offset else {
.map(|(partition_index, maybe_offset)| {
let Some(PartitionOffset {
offset,
mod_time: timestamp,
..
}) = maybe_offset
else {
return ListOffsetsPartitionResponse::default()
.with_partition_index(partition_index)
.with_error_code(ResponseError::UnknownTopicOrPartition.code());
Expand Down Expand Up @@ -376,7 +385,8 @@ impl Session {
.as_mut()
.ok_or(anyhow::anyhow!("Session not authenticated"))?
.authenticated_client()
.await?;
.await?
.clone();

let timeout = std::time::Duration::from_millis(max_wait_ms as u64);

Expand All @@ -390,50 +400,48 @@ impl Session {
key.1 = partition_request.partition;
let fetch_offset = partition_request.fetch_offset;

let data_preview_params: Option<(i64, i64)> = match self
.data_preview_offsets
.entry(key.to_owned())
let data_preview_params: Option<PartitionOffset> = match self
.data_preview_state
.to_owned()
{
Entry::Occupied(entry) => match entry.get() {
Some((offset, fragment_start)) => {
tracing::debug!(collection=?key.0,partition=key.1, offset, fragment_start, fetch_offset, "Session already marked as data-preview for this partition");
Some((*offset, *fragment_start))
}
None => {
tracing::debug!(collection=?key.0,partition=key.1, "Session already marked as not data-preview for this partition");
None
}
},
Entry::Vacant(entry) => {
tracing::debug!(collection=?key.0,partition=key.1, fetch_offset,"Loading latest offset for this partition to check if session is data-preview");
let collection = Collection::new(&client, key.0.as_str())
.await?
.ok_or(anyhow::anyhow!("Collection {} not found", key.0.as_str()))?;

if let Some((latest_offset, fragment_start, _)) = collection
.fetch_partition_offset(key.1 as usize, -1)
SessionDataPreviewState::Unknown => {
if let Some(state) = self
.is_fetch_data_preview(key.0.to_string(), key.1, fetch_offset)
.await?
{
if latest_offset - fetch_offset < 13 {
tracing::debug!(collection=?key.0,partition=key.1, fetch_offset, latest_offset, diff=latest_offset - fetch_offset, "Marking session as data-preview for this partition");
let diff = Some((latest_offset, fragment_start));
entry.insert(diff);
diff
} else {
tracing::debug!(collection=?key.0,partition=key.1, fetch_offset, latest_offset, diff=latest_offset - fetch_offset, "Marking session as not data-preview for this partition");
entry.insert(None);
None
}
let mut data_preview_state = HashMap::new();
data_preview_state.insert(key.to_owned(), state);
self.data_preview_state =
SessionDataPreviewState::DataPreview(data_preview_state);
Some(state)
} else {
self.data_preview_state = SessionDataPreviewState::NotDataPreview;
None
}
}
SessionDataPreviewState::NotDataPreview => None,
SessionDataPreviewState::DataPreview(mut state) => {
match state.entry(key.to_owned()) {
Entry::Occupied(entry) => Some(entry.get().to_owned()),
Entry::Vacant(entry) => {
if let Some(state) = self
.is_fetch_data_preview(key.0.to_string(), key.1, fetch_offset)
.await?
{
entry.insert(state);
Some(state)
} else {
bail!("Session was used for fetching preview data, cannot be used for fetching non-preview data.")
}
}
}
}
};

if matches!(self.reads.get(&key), Some(pending) if pending.offset == fetch_offset) {
continue; // Common case: fetch is at the pending offset.
}
let Some(collection) = Collection::new(client, &key.0).await? else {
let Some(collection) = Collection::new(&client, &key.0).await? else {
tracing::debug!(collection = ?&key.0, "Collection doesn't exist!");
continue; // Collection doesn't exist.
};
Expand All @@ -452,9 +460,11 @@ impl Session {
last_write_head: fetch_offset,
handle: tokio_util::task::AbortOnDropHandle::new(match data_preview_params {
// Startree: 0, Tinybird: 12
Some((latest_offset, fragment_start))
if latest_offset - fetch_offset <= 12 =>
{
Some(PartitionOffset {
fragment_start,
offset: latest_offset,
..
}) if latest_offset - fetch_offset <= 12 => {
let diff = latest_offset - fetch_offset;
tokio::spawn(
Read::new(
Expand Down Expand Up @@ -1165,4 +1175,57 @@ impl Session {
TopicName(StrBytes::from_string(name))
}
}

/// If the fetched offset is within a fixed number of offsets from the end of the journal,
/// return Some with a PartitionOffset containing the beginning and end of the latest fragment.
#[tracing::instrument(skip(self))]
async fn is_fetch_data_preview(
&mut self,
collection_name: String,
partition: i32,
fetch_offset: i64,
) -> anyhow::Result<Option<PartitionOffset>> {
let client = self
.auth
.as_mut()
.ok_or(anyhow::anyhow!("Session not authenticated"))?
.authenticated_client()
.await?;

tracing::debug!(
"Loading latest offset for this partition to check if session is data-preview"
);
let collection = Collection::new(&client, collection_name.as_str())
.await?
.ok_or(anyhow::anyhow!("Collection {} not found", collection_name))?;

if let Some(
partition_offset @ PartitionOffset {
offset: latest_offset,
..
},
) = collection
.fetch_partition_offset(partition as usize, -1)
.await?
{
if latest_offset - fetch_offset < 13 {
tracing::debug!(
latest_offset,
diff = latest_offset - fetch_offset,
"Marking session as data-preview for this partition"
);
Ok(Some(partition_offset))
} else {
tracing::debug!(
fetch_offset,
latest_offset,
diff = latest_offset - fetch_offset,
"Marking session as not data-preview for this partition"
);
Ok(None)
}
} else {
Ok(None)
}
}
}
32 changes: 23 additions & 9 deletions crates/dekaf/src/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ pub struct Partition {
pub route: broker::Route,
}

#[derive(Clone, Copy, Default, Debug)]
pub struct PartitionOffset {
pub fragment_start: i64,
pub offset: i64,
pub mod_time: i64,
}

impl Collection {
/// Build a Collection by fetching its spec, a authenticated data-plane access token, and its partitions.
pub async fn new(
Expand Down Expand Up @@ -185,7 +192,7 @@ impl Collection {
&self,
partition_index: usize,
timestamp_millis: i64,
) -> anyhow::Result<Option<(i64, i64, i64)>> {
) -> anyhow::Result<Option<PartitionOffset>> {
let Some(partition) = self.partitions.get(partition_index) else {
return Ok(None);
};
Expand All @@ -212,30 +219,37 @@ impl Collection {
};
let response = self.journal_client.list_fragments(request).await?;

let (offset, fragment_start, mod_time) = match response.fragments.get(0) {
let offset_data = match response.fragments.get(0) {
Some(broker::fragments_response::Fragment {
spec: Some(spec), ..
}) => {
if timestamp_millis == -1 {
// Subtract one to reflect the largest fetch-able offset of the fragment.
(spec.end - 1, spec.begin, spec.mod_time)
PartitionOffset {
fragment_start: spec.begin,
// Subtract one to reflect the largest fetch-able offset of the fragment.
offset: spec.end - 1,
mod_time: spec.mod_time,
}
} else {
(spec.begin, spec.begin, spec.mod_time)
PartitionOffset {
fragment_start: spec.begin,
offset: spec.begin,
mod_time: spec.mod_time,
}
}
}
_ => (0, 0, 0),
_ => PartitionOffset::default(),
};

tracing::debug!(
collection = self.spec.name,
mod_time,
offset,
?offset_data,
partition_index,
timestamp_millis,
"fetched offset"
);

Ok(Some((offset, fragment_start, mod_time)))
Ok(Some(offset_data))
}

/// Build a journal client by resolving the collections data-plane gateway and an access token.
Expand Down

0 comments on commit 77e821e

Please sign in to comment.