-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: handle retry and stop for all threads if one is stopped #9
Conversation
let node_id: Uuid = match std::env::var("NODE_ID") { | ||
Ok(val) => Uuid::parse_str(&val).unwrap_or_else(|_e| { | ||
let uuid = uuid::Uuid::new_v4(); | ||
log::info!("NODE_ID not found in env assigning {}", uuid); | ||
uuid | ||
}), | ||
Err(_e) => { | ||
log::info!("NODE_ID not found in env"); | ||
uuid::Uuid::new_v4() | ||
} | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This block can be moved to its own function / method.
chronos_bin/src/message_processor.rs
Outdated
|
||
headers.insert("readied_by".to_string(), node_id.unwrap()); | ||
match node_id { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- We assigned Some(...) to
node_id
and then checking it withmatch
. Maybe we don't need to have data type asOption
for this variable? - Also, it appears we are not using
node_id
outside of match block which starts here. Then do we need to have this variable declared so early? It looks like movingnode_id
declaration into this loop will simplify code. - There are two
node_id
s in this method. One isUuid
and another isOption<String>
. If they are different thing, it is better to name variables differently. - Inverting condition
if rdy_to_pblsh_count > 0
tois empty
and breaking early can reduce nesting of the code. - Consider moving
for row in publish_rows {
into its own function. It looks to me that this "for" loop is needed only to populate this variablelet mut publish_futures
, where for each item int thepublish_rows
we create one publishing task. Maybe it can be changed tolet publish_futures = prepare_publishing_tasks(publish_rows) let results = futures::future::join_all(publish_futures).await; or let publish_futures = publish_rows.map(|row| prepare_publishing_task(row)).collect() let results = futures::future::join_all(publish_futures).await;
Doing something like that will make the whole method .run
simpler, testable and easier to read.
chronos_bin/src/message_processor.rs
Outdated
@@ -112,20 +131,14 @@ impl MessageProcessor { | |||
Err(e) => { | |||
if e.contains("could not serialize access due to concurrent update") && retry_count < max_retry_count { | |||
//retry goes here | |||
eprintln!("retrying"); | |||
log::info!("retrying"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When this will be sent to log collector, it might not be very useful. Especially in the multi-threading context the log like without much context may appear as random noise.
chronos_bin/src/message_processor.rs
Outdated
node_id.unwrap(), | ||
// row_id.unwrap() | ||
); | ||
log::error!("Error: max retry count {} reached by node {:?} for row ", max_retry_count, node_id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which node_id we are printing here? The Option or Uuid?
chronos_bin/src/message_receiver.rs
Outdated
let string_payload = match get_payload_utf8(new_message) { | ||
Some(string_payload) => String::from_utf8_lossy(string_payload).to_string(), | ||
None => { | ||
log::error!("message payload is not utf8"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When debugging this in production you almost always want to see the original value which we failed to parse. Add new_message
to this log line.
chronos_bin/src/message_receiver.rs
Outdated
}; | ||
let message_key = get_message_key(new_message); | ||
match &self.producer.publish(string_payload, Some(headers), message_key, "same id".to_string()).await { | ||
Ok(_) => {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The hardcoded "same id"
is this intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be removed, its redundant!
chronos_bin/src/postgres/pg.rs
Outdated
} | ||
} | ||
|
||
impl Pg { | ||
pub async fn new(pg_config: PgConfig) -> Result<Self, PgError> { | ||
let port = pg_config.port.parse::<u16>().unwrap_or(0); // make the connection fail and send pack PgError |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo 'pack' > 'back'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If port is not configured while it is required, there is no point retrying. I would consider it as fatal error and fail.
chronos_bin/src/runner.rs
Outdated
@@ -48,6 +48,28 @@ impl Runner { | |||
message_receiver.run().await; | |||
}); | |||
|
|||
futures::future::join_all([monitor_handler, message_processor_handler, message_receiver_handler]).await; | |||
// check if healthcheck file exists in healthcheck dir | |||
let healthcheck_file = "healthcheck/chronos_healthcheck"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it be a config value instead?
a5a05c4
to
00f7c05
Compare
chronos_bin/src/message_processor.rs
Outdated
} | ||
} | ||
|
||
let ids: Vec<String> = results.into_iter().map(Self::gather_ids).collect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please re-visit this part. I am not sure the consequence of having gather_ids
to return an empty string. Just by reading the code I see that empty string will then be fed into data store delete method.
I would suggest to think if your results list is allowed to have any Result::Err entries.
- If no errors are allowed then you want to validate the list and abort with error message composed of all those errors.
- If there could be some errors mixed with Result::Ok entries then you better filter errors by using
filter()
function on the iterator. And then process only Results::Ok. However, in this scenario I don't know what to do with filtered out errors. It depends on requirements. The current code appears to only log errors but do nothing with faulty entries (just taking an empty string instead of ID).
chronos_bin/src/message_processor.rs
Outdated
} | ||
} | ||
Err(e) => { | ||
if e.contains("could not serialize access due to concurrent update") && retry_count < max_retry_count { | ||
//retry goes here | ||
eprintln!("retrying"); | ||
log::debug!("retrying"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion to always use attempt "N of M" pattern when printing bound iterations.
chronos_bin/src/message_processor.rs
Outdated
let mut headers: HashMap<String, String> = match serde_json::from_str(&updated_row.message_headers.to_string()) { | ||
Ok(t) => t, | ||
Err(_e) => { | ||
println!("error occurred while parsing"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better use logger.
No description provided.