Skip to content

Commit

Permalink
chore: minor fixes for rust source and sink (#2201)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
  • Loading branch information
yhl25 and vigith authored Nov 4, 2024
1 parent 9a89fd4 commit 83dd070
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 62 deletions.
2 changes: 0 additions & 2 deletions rust/numaflow-core/src/config/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,6 @@ mod tests {
reader_config: BufferReaderConfig {
partitions: 1,
streams: vec![("default-simple-pipeline-out-0".into(), 0)],
batch_size: 500,
read_timeout: Duration::from_secs(1),
wip_ack_interval: Duration::from_secs(1),
},
partitions: 0,
Expand Down
8 changes: 0 additions & 8 deletions rust/numaflow-core/src/config/pipeline/isb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ use std::fmt;
use std::time::Duration;

const DEFAULT_PARTITION_IDX: u16 = 0;
const DEFAULT_BATCH_SIZE: usize = 500;
const DEFAULT_PARTITIONS: u16 = 1;
const DEFAULT_MAX_LENGTH: usize = 30000;
const DEFAULT_USAGE_LIMIT: f64 = 0.8;
const DEFAULT_REFRESH_INTERVAL_SECS: u64 = 1;
const DEFAULT_BUFFER_FULL_STRATEGY: BufferFullStrategy = BufferFullStrategy::RetryUntilSuccess;
const DEFAULT_RETRY_INTERVAL_MILLIS: u64 = 10;
const DEFAULT_WIP_ACK_INTERVAL_MILLIS: u64 = 1000;
const DEFAULT_READ_TIMEOUT_MILLIS: u64 = 1000;

pub(crate) mod jetstream {
const DEFAULT_URL: &str = "localhost:4222";
Expand Down Expand Up @@ -78,8 +76,6 @@ impl fmt::Display for BufferFullStrategy {
pub(crate) struct BufferReaderConfig {
pub(crate) partitions: u16,
pub(crate) streams: Vec<(String, u16)>,
pub(crate) batch_size: usize,
pub(crate) read_timeout: Duration,
pub(crate) wip_ack_interval: Duration,
}

Expand All @@ -88,9 +84,7 @@ impl Default for BufferReaderConfig {
BufferReaderConfig {
partitions: DEFAULT_PARTITIONS,
streams: vec![("default-0".to_string(), DEFAULT_PARTITION_IDX)],
batch_size: DEFAULT_BATCH_SIZE,
wip_ack_interval: Duration::from_millis(DEFAULT_WIP_ACK_INTERVAL_MILLIS),
read_timeout: Duration::from_millis(DEFAULT_READ_TIMEOUT_MILLIS),
}
}
}
Expand Down Expand Up @@ -145,9 +139,7 @@ mod tests {
let expected = BufferReaderConfig {
partitions: DEFAULT_PARTITIONS,
streams: vec![("default-0".to_string(), DEFAULT_PARTITION_IDX)],
batch_size: DEFAULT_BATCH_SIZE,
wip_ack_interval: Duration::from_millis(DEFAULT_WIP_ACK_INTERVAL_MILLIS),
read_timeout: Duration::from_millis(DEFAULT_READ_TIMEOUT_MILLIS),
};
let config = BufferReaderConfig::default();
assert_eq!(config, expected);
Expand Down
33 changes: 18 additions & 15 deletions rust/numaflow-core/src/pipeline.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::collections::HashMap;

use async_nats::jetstream;
use async_nats::jetstream::Context;
use async_nats::{jetstream, ConnectOptions};
use futures::future::try_join_all;
use numaflow_pb::clients::sink::sink_client::SinkClient;
use numaflow_pb::clients::source::source_client::SourceClient;
use numaflow_pb::clients::sourcetransformer::source_transform_client::SourceTransformClient;
use std::collections::HashMap;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use tonic::transport::Channel;

Expand Down Expand Up @@ -289,17 +289,22 @@ async fn create_transformer(

/// Creates a jetstream context based on the provided configuration
async fn create_js_context(config: pipeline::isb::jetstream::ClientConfig) -> Result<Context> {
let js_client = match (config.user, config.password) {
(Some(user), Some(password)) => {
async_nats::connect_with_options(
config.url,
async_nats::ConnectOptions::with_user_and_password(user, password),
)
.await
}
_ => async_nats::connect(config.url).await,
// TODO: make these configurable. today this is hardcoded on Golang code too.
let mut opts = ConnectOptions::new()
.max_reconnects(None) // -1 for unlimited reconnects
.ping_interval(Duration::from_secs(3))
.max_reconnects(None)
.ping_interval(Duration::from_secs(3))
.retry_on_initial_connect();

if let (Some(user), Some(password)) = (config.user, config.password) {
opts = opts.user_and_password(user, password);
}
.map_err(|e| error::Error::Connection(e.to_string()))?;

let js_client = async_nats::connect_with_options(&config.url, opts)
.await
.map_err(|e| error::Error::Connection(e.to_string()))?;

Ok(jetstream::new(js_client))
}

Expand Down Expand Up @@ -562,8 +567,6 @@ mod tests {
.enumerate()
.map(|(i, key)| (key.to_string(), i as u16))
.collect(),
batch_size: 500,
read_timeout: Duration::from_secs(1),
wip_ack_interval: Duration::from_secs(1),
},
partitions: 0,
Expand Down
6 changes: 6 additions & 0 deletions rust/numaflow-core/src/pipeline/forwarder/source_forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ impl Forwarder {

/// Writes messages to the jetstream, it writes to all the downstream buffers.
async fn write_to_jetstream(&mut self, messages: Vec<Message>) -> Result<(), Error> {
let start_time = tokio::time::Instant::now();
if messages.is_empty() {
return Ok(());
}
Expand All @@ -186,6 +187,11 @@ impl Forwarder {
.await
.map_err(|e| Error::Forwarder(format!("Failed to write to jetstream {:?}", e)))??;
}
debug!(
len = messages.len(),
elapsed_ms = start_time.elapsed().as_millis(),
"Wrote messages to jetstream",
);
Ok(())
}
}
81 changes: 47 additions & 34 deletions rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ use std::time::Duration;
use async_nats::jetstream::{
consumer::PullConsumer, AckKind, Context, Message as JetstreamMessage,
};

use tokio::sync::mpsc::Receiver;
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use tokio::time::{self, Instant};
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use tracing::{error, warn};
use tracing::{debug, error, info, warn};

use crate::config::pipeline::isb::BufferReaderConfig;
use crate::config::pipeline::PipelineConfig;
Expand Down Expand Up @@ -72,7 +73,8 @@ impl JetstreamReader {
cancel_token: CancellationToken,
pipeline_config: &PipelineConfig,
) -> Result<(Receiver<ReadMessage>, JoinHandle<Result<()>>)> {
let (messages_tx, messages_rx) = mpsc::channel(2 * self.config.batch_size);
// FIXME: factor of 2 should be configurable, at the least a const
let (messages_tx, messages_rx) = mpsc::channel(2 * pipeline_config.batch_size);

let handle: JoinHandle<Result<()>> = tokio::spawn({
let this = self.clone();
Expand Down Expand Up @@ -104,41 +106,44 @@ impl JetstreamReader {
.messages()
.await
.unwrap()
.chunks_timeout(this.config.batch_size, this.config.read_timeout);
.chunks_timeout(pipeline_config.batch_size, pipeline_config.read_timeout);

tokio::pin!(chunk_stream);

// The .next() call will not return if there is no data even if read_timeout is
// reached.
let mut total_messages = 0;
let mut chunk_time = Instant::now();
let mut start_time = Instant::now();
while let Some(messages) = chunk_stream.next().await {
debug!(
len = messages.len(),
elapsed_ms = chunk_time.elapsed().as_millis(),
"Received messages from Jetstream",
);
total_messages += messages.len();
for message in messages {
let jetstream_message = match message {
Ok(message) => message,
Err(e) => {
error!(?e, "Failed to fetch messages from the Jetstream");
continue;
}
};

let msg_info = match jetstream_message.info() {
Ok(info) => info,
Err(e) => {
error!(?e, "Failed to get message info from Jetstream");
continue;
}
};
let jetstream_message = message.map_err(|e| {
Error::ISB(format!(
"Error while fetching message from Jetstream: {:?}",
e
))
})?;

let msg_info = jetstream_message.info().map_err(|e| {
Error::ISB(format!(
"Error while fetching message info from Jetstream: {:?}",
e
))
})?;

let mut message: Message =
match jetstream_message.payload.clone().try_into() {
Ok(message) => message,
Err(e) => {
error!(
?e,
"Failed to parse message payload received from Jetstream"
);
continue;
}
};
jetstream_message.payload.clone().try_into().map_err(|e| {
Error::ISB(format!(
"Error while converting Jetstream message to Message: {:?}",
e
))
})?;

message.offset = Some(Offset::Int(IntOffset::new(
msg_info.stream_sequence,
Expand All @@ -158,21 +163,31 @@ impl JetstreamReader {
ack: ack_tx,
};

if messages_tx.send(read_message).await.is_err() {
error!("Failed to send message to the channel");
return Ok(());
}
messages_tx.send(read_message).await.map_err(|e| {
Error::ISB(format!("Error while sending message to channel: {:?}", e))
})?;

forward_pipeline_metrics()
.forwarder
.data_read
.get_or_create(labels)
.inc();

if start_time.elapsed() >= Duration::from_millis(1000) {
info!(
len = total_messages,
elapsed_ms = start_time.elapsed().as_millis(),
"Total messages read from Jetstream"
);
start_time = Instant::now();
total_messages = 0;
}
}
if cancel_token.is_cancelled() {
warn!("Cancellation token is cancelled. Exiting JetstreamReader");
break;
}
chunk_time = Instant::now();
}
Ok(())
}
Expand Down Expand Up @@ -279,8 +294,6 @@ mod tests {
let buf_reader_config = BufferReaderConfig {
partitions: 0,
streams: vec![],
batch_size: 2,
read_timeout: Duration::from_millis(1000),
wip_ack_interval: Duration::from_millis(5),
};
let js_reader = JetstreamReader::new(
Expand Down
10 changes: 7 additions & 3 deletions rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tokio::sync::mpsc::Receiver;
use tokio::sync::{mpsc, oneshot};
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
use tracing::{debug, error, info, warn};

use crate::config::pipeline::isb::BufferWriterConfig;
use crate::error::Error;
Expand Down Expand Up @@ -207,7 +207,7 @@ impl JetstreamWriter {
/// an error it means it is fatal non-retryable error.
pub(super) async fn blocking_write(&self, payload: Vec<u8>) -> Result<PublishAck> {
let js_ctx = self.js_ctx.clone();

let start_time = tokio::time::Instant::now();
loop {
match js_ctx
.publish(self.stream_name.clone(), Bytes::from(payload.clone()))
Expand All @@ -219,8 +219,12 @@ impl JetstreamWriter {
// should we return an error here? Because duplicate messages are not fatal
// But it can mess up the watermark progression because the offset will be
// same as the previous message offset
warn!("Duplicate message detected, ignoring {:?}", ack);
warn!(ack = ?ack, "Duplicate message detected, ignoring");
}
debug!(
elapsed_ms = start_time.elapsed().as_millis(),
"Blocking write successful in",
);
return Ok(ack);
}
Err(e) => {
Expand Down

0 comments on commit 83dd070

Please sign in to comment.