Skip to content

Commit

Permalink
fix: updates to fail fast and capture the right ids
Browse files Browse the repository at this point in the history
  • Loading branch information
Amninder Kaur committed Oct 25, 2023
1 parent 00f7c05 commit d4ce18b
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 26 deletions.
21 changes: 6 additions & 15 deletions chronos_bin/src/message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,6 @@ impl MessageProcessor {
node_id
}

fn gather_ids(result: Result<String, String>) -> String {
match result {
Ok(m) => m,
Err(e) => {
log::error!("Error: delayed message publish failed {:?}", e);
"".to_string()
}
}
}

#[tracing::instrument(skip_all, fields(correlationId))]
async fn prepare_to_publish(&self, row: Row) -> Result<String, String> {
let updated_row = TableRow {
Expand All @@ -54,8 +44,8 @@ impl MessageProcessor {
let mut headers: HashMap<String, String> = match serde_json::from_str(&updated_row.message_headers.to_string()) {
Ok(t) => t,
Err(_e) => {
println!("error occurred while parsing");
HashMap::new()
log::error!("error occurred while parsing");
return Err("error occurred while parsing headers field".to_string());
}
};

Expand All @@ -75,6 +65,7 @@ impl MessageProcessor {
Err("error occurred while publishing".to_string())
}
}

None => {
log::error!("Error: readied_by not found in db row {:?}", updated_row);
Err("error occurred while publishing".to_string())
Expand All @@ -89,7 +80,6 @@ impl MessageProcessor {
let mut retry_count = 0;
while let Err(outcome_error) = &self.data_store.delete_fired(ids).await {
log::error!("Error: error occurred in message processor {}", outcome_error);
log::debug!("retrying");
retry_count += 1;
if retry_count == max_retry_count {
log::error!("Error: max retry count {} reached by node {:?} for deleting fired ids ", max_retry_count, ids);
Expand Down Expand Up @@ -128,7 +118,9 @@ impl MessageProcessor {

let results = futures::future::join_all(publish_futures).await;

let ids: Vec<String> = results.into_iter().map(Self::gather_ids).collect();
// closure to gather ids from results vector and ignore error from result

let ids: Vec<String> = results.into_iter().filter_map(|result| result.ok()).collect();

if !ids.is_empty() {
let _ = self.delete_fired_records_from_db(&ids).await;
Expand All @@ -140,7 +132,6 @@ impl MessageProcessor {
Err(e) => {
if e.contains("could not serialize access due to concurrent update") && retry_count < max_retry_count {
//retry goes here
log::debug!("retrying");
retry_count += 1;
if retry_count == max_retry_count {
log::error!("Error: max retry count {} reached by node {:?} for row ", max_retry_count, readied_by_column);
Expand Down
2 changes: 1 addition & 1 deletion chronos_bin/src/postgres/pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl PgAccess {

impl Pg {
pub async fn new(pg_config: PgConfig) -> Result<Self, PgError> {
let port = pg_config.port.parse::<u16>().unwrap_or(0); // make the connection fail and send pack PgError
let port = pg_config.port.parse::<u16>().unwrap(); // make the connection fail
let mut config = Config::new();
config.dbname = Some(pg_config.database);
config.user = Some(pg_config.user);
Expand Down
18 changes: 8 additions & 10 deletions chronos_bin/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,13 @@ impl Runner {
log::error!("error while writing to healthcheck file {:?}", write_resp);
}
}
message_receiver_handler.await.unwrap();
// futures::future::join(monitor_handler);
// let future_tuple = futures::future::try_join3(monitor_handler, message_processor_handler, message_receiver_handler).await;
// if future_tuple.is_err() {
// log::error!("Chronos Stopping all threads {:?}", future_tuple);
// let write_resp = write(&healthcheck_file, b"0");
// if write_resp.is_err() {
// log::error!("error while writing to healthcheck file {:?}", write_resp);
// }
// }
let future_tuple = futures::future::try_join3(monitor_handler, message_processor_handler, message_receiver_handler).await;
if future_tuple.is_err() {
log::error!("Chronos Stopping all threads {:?}", future_tuple);
let write_resp = write(&healthcheck_file, b"0");
if write_resp.is_err() {
log::error!("error while writing to healthcheck file {:?}", write_resp);
}
}
}
}

0 comments on commit d4ce18b

Please sign in to comment.