Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: handle retry and stop for all threads if one is stopped #9

Merged
merged 4 commits into from
Oct 26, 2023

Conversation

akaur13
Copy link
Collaborator

@akaur13 akaur13 commented Oct 12, 2023

No description provided.

@akaur13 akaur13 requested a review from anil0906 October 12, 2023 23:59
Comment on lines +21 to +29
let node_id: Uuid = match std::env::var("NODE_ID") {
Ok(val) => Uuid::parse_str(&val).unwrap_or_else(|_e| {
let uuid = uuid::Uuid::new_v4();
log::info!("NODE_ID not found in env assigning {}", uuid);
uuid
}),
Err(_e) => {
log::info!("NODE_ID not found in env");
uuid::Uuid::new_v4()
}
};

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block can be moved to its own function / method.


headers.insert("readied_by".to_string(), node_id.unwrap());
match node_id {
Copy link

@fmarek-kindred fmarek-kindred Oct 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • We assigned Some(...) to node_id and then checking it with match. Maybe we don't need to have data type as Option for this variable?
  • Also, it appears we are not using node_id outside of match block which starts here. Then do we need to have this variable declared so early? It looks like moving node_id declaration into this loop will simplify code.
  • There are two node_ids in this method. One is Uuid and another is Option<String>. If they are different thing, it is better to name variables differently.
  • Inverting condition if rdy_to_pblsh_count > 0 to is empty and breaking early can reduce nesting of the code.
  • Consider moving for row in publish_rows { into its own function. It looks to me that this "for" loop is needed only to populate this variable let mut publish_futures, where for each item int the publish_rows we create one publishing task. Maybe it can be changed to
       let publish_futures = prepare_publishing_tasks(publish_rows)
       let results = futures::future::join_all(publish_futures).await;
    
       or 
      
       let publish_futures = publish_rows.map(|row| prepare_publishing_task(row)).collect()
       let results = futures::future::join_all(publish_futures).await;
    

Doing something like that will make the whole method .runsimpler, testable and easier to read.

@@ -112,20 +131,14 @@ impl MessageProcessor {
Err(e) => {
if e.contains("could not serialize access due to concurrent update") && retry_count < max_retry_count {
//retry goes here
eprintln!("retrying");
log::info!("retrying");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When this will be sent to log collector, it might not be very useful. Especially in the multi-threading context the log like without much context may appear as random noise.

node_id.unwrap(),
// row_id.unwrap()
);
log::error!("Error: max retry count {} reached by node {:?} for row ", max_retry_count, node_id);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which node_id we are printing here? The Option or Uuid?

let string_payload = match get_payload_utf8(new_message) {
Some(string_payload) => String::from_utf8_lossy(string_payload).to_string(),
None => {
log::error!("message payload is not utf8");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When debugging this in production you almost always want to see the original value which we failed to parse. Add new_message to this log line.

};
let message_key = get_message_key(new_message);
match &self.producer.publish(string_payload, Some(headers), message_key, "same id".to_string()).await {
Ok(_) => {}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hardcoded "same id" is this intentional?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be removed, its redundant!

}
}

impl Pg {
pub async fn new(pg_config: PgConfig) -> Result<Self, PgError> {
let port = pg_config.port.parse::<u16>().unwrap_or(0); // make the connection fail and send pack PgError

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo 'pack' > 'back'

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If port is not configured while it is required, there is no point retrying. I would consider it as fatal error and fail.

@@ -48,6 +48,28 @@ impl Runner {
message_receiver.run().await;
});

futures::future::join_all([monitor_handler, message_processor_handler, message_receiver_handler]).await;
// check if healthcheck file exists in healthcheck dir
let healthcheck_file = "healthcheck/chronos_healthcheck";

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be a config value instead?

}
}

let ids: Vec<String> = results.into_iter().map(Self::gather_ids).collect();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please re-visit this part. I am not sure the consequence of having gather_ids to return an empty string. Just by reading the code I see that empty string will then be fed into data store delete method.

I would suggest to think if your results list is allowed to have any Result::Err entries.

  • If no errors are allowed then you want to validate the list and abort with error message composed of all those errors.
  • If there could be some errors mixed with Result::Ok entries then you better filter errors by using filter() function on the iterator. And then process only Results::Ok. However, in this scenario I don't know what to do with filtered out errors. It depends on requirements. The current code appears to only log errors but do nothing with faulty entries (just taking an empty string instead of ID).

}
}
Err(e) => {
if e.contains("could not serialize access due to concurrent update") && retry_count < max_retry_count {
//retry goes here
eprintln!("retrying");
log::debug!("retrying");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion to always use attempt "N of M" pattern when printing bound iterations.

let mut headers: HashMap<String, String> = match serde_json::from_str(&updated_row.message_headers.to_string()) {
Ok(t) => t,
Err(_e) => {
println!("error occurred while parsing");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better use logger.

@akaur13 akaur13 merged commit a7eb56c into master Oct 26, 2023
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants