Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: update retry loop #2032

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
293 changes: 203 additions & 90 deletions rust/monovertex/src/forwarder.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
use std::collections::HashMap;

use chrono::Utc;
use tokio::task::JoinSet;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use tracing::log::warn;
use tracing::{debug, info};

use crate::config::{config, OnFailureStrategy};
use crate::error::{Error, Result};
use crate::message::{Message, Offset};
Expand All @@ -8,12 +15,6 @@ use crate::metrics::forward_metrics;
use crate::sink::{proto, SinkClient};
use crate::source::SourceClient;
use crate::transformer::TransformerClient;
use chrono::Utc;
use tokio::task::JoinSet;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use tracing::log::warn;
use tracing::{debug, info};

/// Forwarder is responsible for reading messages from the source, applying transformation if
/// transformer is present, writing the messages to the sink, and then acknowledging the messages
Expand Down Expand Up @@ -223,50 +224,52 @@ impl Forwarder {
// we will overwrite this vec with failed messages and will keep retrying.
let mut messages_to_send = messages;

// check what is the failure strategy in the config
let strategy = config().sink_retry_on_fail_strategy.clone();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not clone in new since it is literally one time and can be reused?


// only breaks out of this loop based on the retry strategy unless all the messages have been written to sink
// successfully.
loop {
while attempts < config().sink_max_retry_attempts {
let status = self
.write_to_sink_once(&mut error_map, &mut fallback_msgs, &mut messages_to_send)
.await;
match status {
Ok(true) => break,
Ok(false) => {
attempts += 1;
warn!(
"Retry attempt {} due to retryable error. Errors: {:?}",
attempts, error_map
);
}
Err(e) => Err(e)?,
}
}

// If after the retries we still have messages to process, handle the post retry failures
let need_retry = self.handle_sink_post_retry(
&mut attempts,
&mut error_map,
&mut fallback_msgs,
&mut messages_to_send,
);
match need_retry {
// if we are done with the messages, break the loop
Ok(false) => break,
// if we need to retry, reset the attempts and error_map
Ok(true) => {
attempts = 0;
error_map.clear();
while attempts < config().sink_max_retry_attempts || strategy == OnFailureStrategy::Retry {
let status = self
.write_to_sink_once(&mut error_map, &mut fallback_msgs, &mut messages_to_send)
.await;
match status {
Ok(true) => break,
Ok(false) => {
attempts += 1;
warn!(
"Retry attempt {} due to retryable error. Errors: {:?}",
attempts, error_map
);
}
Err(e) => Err(e)?,
}
}

// If after the retries we still have messages to process, handle the post retry failures
let need_retry = handle_sink_post_retry(
&mut attempts,
&mut error_map,
&mut fallback_msgs,
&mut messages_to_send,
strategy,
self.common_labels.clone(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not pass reference?

);

// if we need to retry, return with error as we have exhausted the retries
if need_retry? {
return Err(Error::SinkError(format!(
"Failed to write messages to sink after {} attempts. Errors: {:?}",
attempts, error_map
)));
}

// If there are fallback messages, write them to the fallback sink
if !fallback_msgs.is_empty() {
self.handle_fallback_messages(fallback_msgs).await?;
}

// update the metric for the end to end time taken to write to the sink
forward_metrics()
.sink_time
.get_or_create(&self.common_labels)
Expand All @@ -281,58 +284,6 @@ impl Forwarder {
Ok(())
}

/// Handles the post retry failures based on the configured strategy,
/// returns true if we need to retry, else false.
fn handle_sink_post_retry(
&mut self,
attempts: &mut u16,
error_map: &mut HashMap<String, i32>,
fallback_msgs: &mut Vec<Message>,
messages_to_send: &mut Vec<Message>,
) -> Result<bool> {
// if we are done with the messages, break the loop
if messages_to_send.is_empty() {
return Ok(false);
}
// check what is the failure strategy in the config
let strategy = config().sink_retry_on_fail_strategy.clone();
match strategy {
// if we need to retry, return true
OnFailureStrategy::Retry => {
warn!(
"Using onFailure Retry, Retry attempts {} completed",
attempts
);
return Ok(true);
}
// if we need to drop the messages, log and return false
OnFailureStrategy::Drop => {
// log that we are dropping the messages as requested
warn!(
"Dropping messages after {} attempts. Errors: {:?}",
attempts, error_map
);
// update the metrics
forward_metrics()
.dropped_total
.get_or_create(&self.common_labels)
.inc_by(messages_to_send.len() as u64);
}
// if we need to move the messages to the fallback, return false
OnFailureStrategy::Fallback => {
// log that we are moving the messages to the fallback as requested
warn!(
"Moving messages to fallback after {} attempts. Errors: {:?}",
attempts, error_map
);
// move the messages to the fallback messages
fallback_msgs.append(messages_to_send);
}
}
// if we are done with the messages, break the loop
Ok(false)
}

/// Writes to sink once and will return true if successful, else false. Please note that it
/// mutates is incoming fields.
async fn write_to_sink_once(
Expand Down Expand Up @@ -514,6 +465,57 @@ impl Forwarder {
}
}

/// Handles the post retry failures based on the configured strategy,
/// returns true if we need to retry, else false.
fn handle_sink_post_retry(
attempts: &mut u16,
error_map: &mut HashMap<String, i32>,
fallback_msgs: &mut Vec<Message>,
messages_to_send: &mut Vec<Message>,
strategy: OnFailureStrategy,
common_labels: Vec<(String, String)>,
) -> Result<bool> {
// if we are done with the messages, break the loop
if messages_to_send.is_empty() {
return Ok(false);
}
match strategy {
// if we need to retry, return true
OnFailureStrategy::Retry => {
warn!(
"Using onFailure Retry, Retry attempts {} completed",
attempts
);
return Ok(true);
}
// if we need to drop the messages, log and return false
OnFailureStrategy::Drop => {
// log that we are dropping the messages as requested
warn!(
"Dropping messages after {} attempts. Errors: {:?}",
attempts, error_map
);
// update the metrics
forward_metrics()
.dropped_total
.get_or_create(&common_labels)
.inc_by(messages_to_send.len() as u64);
}
// if we need to move the messages to the fallback, return false
OnFailureStrategy::Fallback => {
// log that we are moving the messages to the fallback as requested
warn!(
"Moving messages to fallback after {} attempts. Errors: {:?}",
attempts, error_map
);
// move the messages to the fallback messages
fallback_msgs.append(messages_to_send);
}
}
// if we are done with the messages, break the loop
Ok(false)
}

#[cfg(test)]
mod tests {
use std::collections::HashSet;
Expand Down Expand Up @@ -1038,3 +1040,114 @@ mod tests {
.expect("failed to join fb sink server task");
}
}

#[cfg(test)]
mod tests_retry {
use super::*;

impl Message {
// dummy method to test the retry logic
pub(crate) fn dummy() -> Self {
Self {
keys: vec![],
value: vec![],
offset: Offset {
offset: "".to_string(),
partition_id: 0,
},
event_time: Utc::now(),
id: "".to_string(),
headers: HashMap::new(),
}
}
}

#[test]
fn test_handle_sink_post_retry_empty_messages() {
let mut attempts = 0;
let mut error_map = HashMap::new();
let mut fallback_msgs = vec![];
let mut messages_to_send = vec![];
let common_labels = vec![];

let result = handle_sink_post_retry(
&mut attempts,
&mut error_map,
&mut fallback_msgs,
&mut messages_to_send,
OnFailureStrategy::Retry,
common_labels,
)
.unwrap();
assert_eq!(result, false);
}

#[test]
fn test_handle_sink_post_retry_retry() {
let mut attempts = 0;
let mut error_map = HashMap::new();
let mut fallback_msgs = vec![];
let mut messages_to_send = vec![Message::dummy()];

let result = handle_sink_post_retry(
&mut attempts,
&mut error_map,
&mut fallback_msgs,
&mut messages_to_send,
OnFailureStrategy::Retry,
vec![],
)
.unwrap();
assert_eq!(result, true);
}

#[test]
fn test_handle_sink_post_retry_drop() {
let mut attempts = 0;
let mut error_map = HashMap::new();
let mut fallback_msgs = vec![];
let mut messages_to_send = vec![Message::dummy()];

// check the metric before the drop
let val = forward_metrics().dropped_total.get_or_create(&vec![]).get();
assert_eq!(val, 0);
let result = handle_sink_post_retry(
&mut attempts,
&mut error_map,
&mut fallback_msgs,
&mut messages_to_send,
OnFailureStrategy::Drop,
vec![],
)
.unwrap();
assert_eq!(result, false);
// check if the metric is updated
let val = forward_metrics().dropped_total.get_or_create(&vec![]).get();
assert_eq!(val, 1)
}

#[test]
fn test_handle_sink_post_retry_fallback() {
let mut attempts = 0;
let mut error_map = HashMap::new();
let mut fallback_msgs = vec![];
let mut messages_to_send = vec![Message::dummy()];

// check if the fallback messages are updated
assert_eq!(fallback_msgs.len(), 0);

let result = handle_sink_post_retry(
&mut attempts,
&mut error_map,
&mut fallback_msgs,
&mut messages_to_send,
OnFailureStrategy::Fallback,
vec![],
)
.unwrap();
assert_eq!(result, false);
assert!(messages_to_send.is_empty());
// check if the fallback messages are updated
assert_eq!(fallback_msgs.len(), 1);
}
}
Loading