Skip to content

Commit

Permalink
chore: From trait for AckRequest
Browse files Browse the repository at this point in the history
Signed-off-by: Vigith Maurice <[email protected]>
  • Loading branch information
vigith committed Sep 21, 2024
1 parent 499b33d commit 6f6a48d
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 17 deletions.
19 changes: 18 additions & 1 deletion rust/monovertex/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use chrono::{DateTime, Utc};
use crate::error::Error;
use crate::shared::{prost_timestamp_from_utc, utc_from_timestamp};
use crate::sink_pb::SinkRequest;
use crate::source_pb::read_response;
use crate::source_pb;
use crate::source_pb::{AckRequest, read_response};
use crate::sourcetransform_pb::SourceTransformRequest;

/// A message that is sent from the source to the sink.
Expand Down Expand Up @@ -36,6 +37,22 @@ pub(crate) struct Offset {
pub(crate) partition_id: i32,
}

impl From<Offset> for AckRequest {
fn from(offset: Offset) -> Self {
Self {
request: Some(source_pb::ack_request::Request {
offset: Some(source_pb::Offset {
offset: BASE64_STANDARD
.decode(offset.offset)
.expect("we control the encoding, so this should never fail"),
partition_id: offset.partition_id,
}),
}),
handshake: None,
}
}
}

/// Convert the [`Message`] to [`SourceTransformRequest`]
impl From<Message> for SourceTransformRequest {
fn from(message: Message) -> Self {
Expand Down
27 changes: 11 additions & 16 deletions rust/monovertex/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@ use crate::message::{Message, Offset};
use crate::source_pb;
use crate::source_pb::source_client::SourceClient;
use crate::source_pb::{
ack_request, ack_response, read_request, AckRequest, AckResponse, ReadRequest, ReadResponse,
ack_response, read_request, AckRequest, AckResponse, ReadRequest, ReadResponse,
};
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use log::info;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
Expand Down Expand Up @@ -47,6 +45,7 @@ impl SourceReader {
let handshake_response = resp_stream.message().await?.ok_or(SourceError(
"failed to receive handshake response".to_string(),
))?;
// TODO(explain): why will this be None and why is None okay?
if handshake_response.handshake.map_or(true, |h| !h.sot) {
return Err(SourceError("invalid handshake response".to_string()));
}
Expand Down Expand Up @@ -78,7 +77,7 @@ impl SourceReader {
let mut messages = Vec::with_capacity(num_records as usize);

while let Some(response) = self.resp_stream.message().await? {
if response.status.as_ref().map_or(false, |status| status.eot) {
if response.status.map_or(false, |status| status.eot) {
break;
}

Expand Down Expand Up @@ -121,6 +120,7 @@ impl SourceAcker {
let ack_handshake_response = ack_resp_stream.message().await?.ok_or(SourceError(
"failed to receive ack handshake response".to_string(),
))?;
// TODO(explain): why will this be None and why is None okay?
if ack_handshake_response.handshake.map_or(true, |h| !h.sot) {
return Err(SourceError("invalid ack handshake response".to_string()));
}
Expand All @@ -132,26 +132,19 @@ impl SourceAcker {
}

pub(crate) async fn ack(&mut self, offsets: Vec<Offset>) -> Result<AckResponse> {
let start = std::time::Instant::now();
let start = tokio::time::Instant::now();
let n = offsets.len();

// send n ack requests
for offset in offsets {
let request = AckRequest {
request: Some(ack_request::Request {
offset: Some(source_pb::Offset {
offset: BASE64_STANDARD
.decode(offset.offset)
.expect("we control the encoding, so this should never fail"),
partition_id: offset.partition_id,
}),
}),
handshake: None,
};
let request = offset.into();
self.ack_tx
.send(request)
.await
.map_err(|e| SourceError(e.to_string()))?;
}

// make sure we get n responses for the n requests.
for _ in 0..n {
let _ = self
.ack_resp_stream
Expand All @@ -160,7 +153,9 @@ impl SourceAcker {
.ok_or(SourceError("failed to receive ack response".to_string()))?;
}

// TODO: emit latency metrics.
info!("acked {} messages in {:?}", n, start.elapsed().as_millis());

Ok(AckResponse {
result: Some(ack_response::Result { success: Some(()) }),
handshake: None,
Expand Down

0 comments on commit 6f6a48d

Please sign in to comment.