Skip to content

Commit

Permalink
dekaf: Add support for reading backwards in order to properly handle …
Browse files Browse the repository at this point in the history
…data preview UIs
  • Loading branch information
jshearer committed Oct 15, 2024
1 parent e6a78da commit 8be16c9
Show file tree
Hide file tree
Showing 4 changed files with 432 additions and 122 deletions.
1 change: 1 addition & 0 deletions crates/dekaf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ async fn handle_api(
// https://github.com/confluentinc/librdkafka/blob/e03d3bb91ed92a38f38d9806b8d8deffe78a1de5/src/rdkafka_request.c#L2823
let (header, request) = dec_request(frame, version)?;
tracing::debug!(client_id=?header.client_id, "Got client ID!");
session.client_id = header.client_id.clone().map(|id| id.to_string());
Ok(enc_resp(out, &header, session.api_versions(request).await?))
}
ApiKey::SaslHandshakeKey => {
Expand Down
12 changes: 9 additions & 3 deletions crates/dekaf/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@ use futures::{FutureExt, TryStreamExt};
use rsasl::config::SASLConfig;
use rustls::pki_types::CertificateDer;
use std::{
collections::HashMap,
fs::File,
io,
path::{Path, PathBuf},
sync::Arc,
};
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio::{
io::{split, AsyncRead, AsyncWrite, AsyncWriteExt},
sync::RwLock,
};
use tracing_subscriber::{filter::LevelFilter, EnvFilter};
use url::Url;

Expand Down Expand Up @@ -113,6 +117,8 @@ async fn main() -> anyhow::Result<()> {
let cli = Cli::parse();
tracing::info!("Starting dekaf");

let offset_map = Arc::new(RwLock::new(HashMap::new()));

let (api_endpoint, api_key) = if cli.local {
(LOCAL_PG_URL.to_owned(), LOCAL_PG_PUBLIC_TOKEN.to_string())
} else {
Expand Down Expand Up @@ -219,7 +225,7 @@ async fn main() -> anyhow::Result<()> {
continue
};

tokio::spawn(serve(Session::new(app.clone(), cli.encryption_secret.to_owned()), socket, addr, stop.clone()));
tokio::spawn(serve(Session::new(app.clone(), cli.encryption_secret.to_owned(), offset_map.clone()), socket, addr, stop.clone()));
}
_ = &mut stop => break,
}
Expand All @@ -240,7 +246,7 @@ async fn main() -> anyhow::Result<()> {
};
socket.set_nodelay(true)?;

tokio::spawn(serve(Session::new(app.clone(), cli.encryption_secret.to_owned()), socket, addr, stop.clone()));
tokio::spawn(serve(Session::new(app.clone(), cli.encryption_secret.to_owned(), offset_map.clone()), socket, addr, stop.clone()));
}
_ = &mut stop => break,
}
Expand Down
Loading

0 comments on commit 8be16c9

Please sign in to comment.