Skip to content

Commit

Permalink
dekaf: Track latest committed offset by group and journal
Browse files Browse the repository at this point in the history
Looks like this:

```
dekaf_committed_offset{group_id="joseph-dekaf-testing-1100-1",journal_name="demo/bots-humans-edits/pivot=00"} 73173569244
dekaf_committed_offset{group_id="joseph-dekaf-testing-1100-0",journal_name="demo/wikipedia/recentchange/pivot=00"} 2036875838205
```
  • Loading branch information
jshearer committed Oct 17, 2024
1 parent d44376b commit 8c398b3
Showing 1 changed file with 44 additions and 0 deletions.
44 changes: 44 additions & 0 deletions crates/dekaf/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,14 @@ impl Session {
.connect_to_group_coordinator(req.group_id.as_str())
.await?;

let flow_client = self
.auth
.as_mut()
.ok_or(anyhow::anyhow!("Session not authenticated"))?
.authenticated_client()
.await?
.clone();

client
.ensure_topics(
mutated_req
Expand All @@ -861,6 +869,42 @@ impl Session {

for topic in resp.topics.iter_mut() {
topic.name = self.decrypt_topic_name(topic.name.to_owned());

let collection_partitions = Collection::new(&flow_client, topic.name.as_str())
.await?
.context(format!("unable to look up partitions for {:?}", topic.name))?
.partitions;

for partition in &topic.partitions {
if let Some(error) = partition.error_code.err() {
tracing::warn!(topic=?topic.name,partition=partition.partition_index,?error,"Got error from upstream Kafka when trying to commit offsets");
} else {
let journal_name = collection_partitions
.get(partition.partition_index as usize)
.context(format!(
"unable to find partition {} in collection {:?}",
partition.partition_index, topic.name
))?
.spec
.name
.to_owned();

let committed_offset = req
.topics
.iter()
.find(|req_topic| req_topic.name == topic.name)
.context(format!("unable to find topic in request {:?}", topic.name))?
.partitions
.get(partition.partition_index as usize)
.context(format!(
"unable to find partition {}",
partition.partition_index
))?
.committed_offset;

metrics::gauge!("dekaf_committed_offset", "group_id"=>req.group_id.to_string(),"journal_name"=>journal_name).set(committed_offset as f64);
}
}
}

Ok(resp)
Expand Down

0 comments on commit 8c398b3

Please sign in to comment.