Skip to content

Commit

Permalink
chore: merge source_{reader,acker} impl into one (#2102)
Browse files Browse the repository at this point in the history
Signed-off-by: Vigith Maurice <[email protected]>
  • Loading branch information
vigith committed Sep 28, 2024
1 parent 7402596 commit 66c8613
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 120 deletions.
6 changes: 3 additions & 3 deletions rust/numaflow-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use tracing::error;

/// Custom Error handling.
mod error;
pub(crate) use crate::error::Result;
pub(crate) use crate::error::Error;
pub(crate) use crate::error::Result;

/// MonoVertex is a simplified version of the [Pipeline] spec which is ideal for high TPS, low latency
/// use-cases which do not require [ISB].
Expand All @@ -18,6 +18,8 @@ mod config;

/// Internal message structure that is passed around.
mod message;
/// Shared entities that can be used orthogonal to different modules.
mod shared;
/// [Sink] serves as the endpoint for processed data that has been outputted from the platform,
/// which is then sent to an external system or application.
///
Expand All @@ -32,5 +34,3 @@ mod source;
///
/// [Transformer]: https://numaflow.numaproj.io/user-guide/sources/transformer/overview/
mod transformer;
/// Shared entities that can be used orthogonal to different modules.
mod shared;
8 changes: 3 additions & 5 deletions rust/numaflow-core/src/monovertex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::error;
use crate::shared::utils;
use crate::shared::utils::create_rpc_channel;
use crate::sink::user_defined::SinkWriter;
use crate::source::user_defined::{SourceAcker, SourceReader};
use crate::source::user_defined::Source;
use crate::transformer::user_defined::SourceTransformer;
use forwarder::ForwarderBuilder;
use metrics::MetricsState;
Expand Down Expand Up @@ -164,12 +164,10 @@ async fn start_forwarder(cln_token: CancellationToken, sdk_config: SDKConfig) ->
lag_reader.start().await;

// build the forwarder
let source_reader = SourceReader::new(source_grpc_client.clone()).await?;
let source_acker = SourceAcker::new(source_grpc_client.clone()).await?;
let source_reader = Source::new(source_grpc_client.clone()).await?;
let sink_writer = SinkWriter::new(sink_grpc_client.clone()).await?;

let mut forwarder_builder =
ForwarderBuilder::new(source_reader, source_acker, sink_writer, cln_token);
let mut forwarder_builder = ForwarderBuilder::new(source_reader, sink_writer, cln_token);

// add transformer if exists
if let Some(transformer_grpc_client) = transformer_grpc_client {
Expand Down
65 changes: 19 additions & 46 deletions rust/numaflow-core/src/monovertex/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::monovertex::metrics;
use crate::monovertex::metrics::forward_metrics;
use crate::monovertex::sink_pb::Status::{Failure, Fallback, Success};
use crate::sink::user_defined::SinkWriter;
use crate::source::user_defined::{SourceAcker, SourceReader};
use crate::source::user_defined::Source;
use crate::transformer::user_defined::SourceTransformer;
use chrono::Utc;
use log::warn;
Expand All @@ -20,8 +20,7 @@ use tracing::{debug, info};
/// transformer is present, writing the messages to the sink, and then acknowledging the messages
/// back to the source.
pub(crate) struct Forwarder {
source_reader: SourceReader,
source_acker: SourceAcker,
source: Source,
sink_writer: SinkWriter,
source_transformer: Option<SourceTransformer>,
fb_sink_writer: Option<SinkWriter>,
Expand All @@ -31,8 +30,7 @@ pub(crate) struct Forwarder {

/// ForwarderBuilder is used to build a Forwarder instance with optional fields.
pub(crate) struct ForwarderBuilder {
source_reader: SourceReader,
source_acker: SourceAcker,
source: Source,
sink_writer: SinkWriter,
cln_token: CancellationToken,
source_transformer: Option<SourceTransformer>,
Expand All @@ -42,14 +40,12 @@ pub(crate) struct ForwarderBuilder {
impl ForwarderBuilder {
/// Create a new builder with mandatory fields
pub(crate) fn new(
source_reader: SourceReader,
source_acker: SourceAcker,
source: Source,
sink_writer: SinkWriter,
cln_token: CancellationToken,
) -> Self {
Self {
source_reader,
source_acker,
source,
sink_writer,
cln_token,
source_transformer: None,
Expand All @@ -74,8 +70,7 @@ impl ForwarderBuilder {
pub(crate) fn build(self) -> Forwarder {
let common_labels = metrics::forward_metrics_labels().clone();
Forwarder {
source_reader: self.source_reader,
source_acker: self.source_acker,
source: self.source,
sink_writer: self.sink_writer,
source_transformer: self.source_transformer,
fb_sink_writer: self.fb_sink_writer,
Expand Down Expand Up @@ -127,7 +122,7 @@ impl Forwarder {
async fn read_and_process_messages(&mut self) -> error::Result<usize> {
let start_time = tokio::time::Instant::now();
let messages = self
.source_reader
.source
.read(config().batch_size, config().timeout_in_ms)
.await
.map_err(|e| {
Expand Down Expand Up @@ -538,7 +533,7 @@ impl Forwarder {
let n = offsets.len();
let start_time = tokio::time::Instant::now();

self.source_acker.ack(offsets).await?;
self.source.ack(offsets).await?;

debug!("Ack latency - {}ms", start_time.elapsed().as_millis());

Expand Down Expand Up @@ -572,7 +567,7 @@ mod tests {
use crate::monovertex::sourcetransform_pb::source_transform_client::SourceTransformClient;
use crate::shared::utils::create_rpc_channel;
use crate::sink::user_defined::SinkWriter;
use crate::source::user_defined::{SourceAcker, SourceReader};
use crate::source::user_defined::Source;
use crate::transformer::user_defined::SourceTransformer;

struct SimpleSource {
Expand Down Expand Up @@ -752,18 +747,12 @@ mod tests {

let cln_token = CancellationToken::new();

let source_reader = SourceReader::new(SourceClient::new(
let source = Source::new(SourceClient::new(
create_rpc_channel(source_sock_file.clone()).await.unwrap(),
))
.await
.expect("failed to connect to source server");

let source_acker = SourceAcker::new(SourceClient::new(
create_rpc_channel(source_sock_file).await.unwrap(),
))
.await
.expect("failed to connect to source server");

let sink_writer = SinkWriter::new(SinkClient::new(
create_rpc_channel(sink_sock_file).await.unwrap(),
))
Expand All @@ -776,10 +765,9 @@ mod tests {
.await
.expect("failed to connect to transformer server");

let mut forwarder =
ForwarderBuilder::new(source_reader, source_acker, sink_writer, cln_token.clone())
.source_transformer(transformer_client)
.build();
let mut forwarder = ForwarderBuilder::new(source, sink_writer, cln_token.clone())
.source_transformer(transformer_client)
.build();

// Assert the received message in a different task
let assert_handle = tokio::spawn(async move {
Expand Down Expand Up @@ -881,27 +869,19 @@ mod tests {

let cln_token = CancellationToken::new();

let source_reader = SourceReader::new(SourceClient::new(
let source = Source::new(SourceClient::new(
create_rpc_channel(source_sock_file.clone()).await.unwrap(),
))
.await
.expect("failed to connect to source server");

let source_acker = SourceAcker::new(SourceClient::new(
create_rpc_channel(source_sock_file).await.unwrap(),
))
.await
.expect("failed to connect to source server");

let sink_writer = SinkWriter::new(SinkClient::new(
create_rpc_channel(sink_sock_file).await.unwrap(),
))
.await
.expect("failed to connect to sink server");

let mut forwarder =
ForwarderBuilder::new(source_reader, source_acker, sink_writer, cln_token.clone())
.build();
let mut forwarder = ForwarderBuilder::new(source, sink_writer, cln_token.clone()).build();

let cancel_handle = tokio::spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
Expand Down Expand Up @@ -1003,18 +983,12 @@ mod tests {

let cln_token = CancellationToken::new();

let source_reader = SourceReader::new(SourceClient::new(
let source = Source::new(SourceClient::new(
create_rpc_channel(source_sock_file.clone()).await.unwrap(),
))
.await
.expect("failed to connect to source server");

let source_acker = SourceAcker::new(SourceClient::new(
create_rpc_channel(source_sock_file).await.unwrap(),
))
.await
.expect("failed to connect to source server");

let sink_writer = SinkWriter::new(SinkClient::new(
create_rpc_channel(sink_sock_file).await.unwrap(),
))
Expand All @@ -1027,10 +1001,9 @@ mod tests {
.await
.expect("failed to connect to fb sink server");

let mut forwarder =
ForwarderBuilder::new(source_reader, source_acker, sink_writer, cln_token.clone())
.fallback_sink_writer(fb_sink_writer)
.build();
let mut forwarder = ForwarderBuilder::new(source, sink_writer, cln_token.clone())
.fallback_sink_writer(fb_sink_writer)
.build();

let assert_handle = tokio::spawn(async move {
let received_message = sink_rx.recv().await.unwrap();
Expand Down
Loading

0 comments on commit 66c8613

Please sign in to comment.