Skip to content

Commit

Permalink
crash handling
Browse files Browse the repository at this point in the history
  • Loading branch information
anil0906 authored and Amninder Kaur committed Jul 11, 2023
1 parent ea4e897 commit 60a4ae7
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 88 deletions.
123 changes: 51 additions & 72 deletions src/message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,88 +39,67 @@ impl MessageProcessor {
let mut ready_params = Vec::new();
ready_params.push(param);

let publish_rows = &self.data_store.ready_to_fire( &ready_params).await.unwrap();
match &self.data_store.ready_to_fire( &ready_params).await {
Ok(publish_rows) => {
let mut ids: Vec<String> = Vec::with_capacity(publish_rows.len());
let mut publish_futures = Vec::with_capacity(publish_rows.len());
for row in publish_rows {
let updated_row = TableRow {
id: row.get("id"),
deadline: row.get("deadline"),
readied_at: row.get("readied_at"),
readied_by: row.get("readied_by"),
message_headers: row.get("message_headers"),
message_key: row.get("message_key"),
message_value: row.get("message_value"),
};

// println!(
// "Rows Needs Readying:: {:?} @ {:?}",
// publish_rows.len(),
// Utc::now()
// );
let headers: HashMap<String, String> =
match serde_json::from_str(&updated_row.message_headers.to_string()) {
Ok(T) => T,
Err(E) => {
println!("error occurred while parsing");
HashMap::new()
}
};
//TODO: handle empty headers
// println!("checking {:?}",headers);

let mut ids: Vec<String> = Vec::with_capacity(publish_rows.len());
let mut publish_futures = Vec::with_capacity(publish_rows.len());
for row in publish_rows {
let updated_row = TableRow {
id: row.get("id"),
deadline: row.get("deadline"),
readied_at: row.get("readied_at"),
readied_by: row.get("readied_by"),
message_headers: row.get("message_headers"),
message_key: row.get("message_key"),
message_value: row.get("message_value"),
};

let headers: HashMap<String, String> =
match serde_json::from_str(&updated_row.message_headers.to_string()) {
Ok(T) => T,
Err(E) => {
println!("error occurred while parsing");
HashMap::new()
publish_futures.push(self.producer
.publish(
updated_row.message_value.to_string(),
Some(headers),
updated_row.message_key.to_string(),
updated_row.id.to_string()
))
}
let results = futures::future::join_all(publish_futures).await;
for result in results {
match result {
Ok(m) => {
ids.push(m);
}
Err(e) => {
println!("publish failed {:?}", e);
// failure detection needs to pick
}
}
};
//TODO: handle empty headers
// println!("checking {:?}",headers);

publish_futures.push(self.producer
.publish(
updated_row.message_value.to_string(),
Some(headers),
updated_row.message_key.to_string(),
updated_row.id.to_string()
))
}
let results = futures::future::join_all(publish_futures).await;
for result in results {
match result {
Ok(m) => {
ids.push(m);
// println!(
// "insert success with number changed {:?} @{:?}",
// m,
// Utc::now()
// );
}
Err(e) => {
println!("publish failed {:?}", e);
// failure detection needs to pick


if ids.len() > 0 {
if let Err(outcome_error) = &self.data_store.delete_fired( &ids).await {
println!("error occurred in message processor delete_fired {}", outcome_error);
}
}
}
}
Err(e) => {
println!("error occurred in message processor {}", e);
}

// println!("finished the loop for publish now delete published from DB");
if ids.len() > 0 {
let outcome = &self.data_store.delete_fired( &ids).await.unwrap();
// println!("delete fired id {:?} @{:?} outcome {outcome}", &ids, Utc::now());
}

//println!("after the delete statement");

// if let Ok(messages) =
// {
// for message in messages {
// self.data_store
// .move_to_ready_state(message)
// .await
// .expect("mart to ready state failed");
// self.producer
// .produce_message(message.clone())
// .await
// .expect("produce message failed");
// self.delete_record(message)
// .await
// .expect("delete record failed");
// }
// }
}
}
}
2 changes: 1 addition & 1 deletion src/message_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl MessageReceiver {
message_value: &serde_json::from_slice(&payload).expect("de-ser failed for payload"),
};
let insert_time = Instant::now();
self.data_store.insert_to_delay(&params).await.expect("TODO: panic message");
self.data_store.insert_to_delay(&params).await.expect("insert to db failed");
// println!("insert took: {:?}", insert_time.elapsed())

}
Expand Down
26 changes: 13 additions & 13 deletions src/monitor.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use std::sync::Arc;
use std::thread;
use crate::postgres::pg::Pg;
use chrono::{Duration as chrono_duration, Utc};
use log::{error, info};
use std::sync::Arc;
use std::time::Duration;
use crate::postgres::pg::Pg;

pub struct FailureDetector {
pub(crate) data_store: Arc<Box<Pg>>,
Expand All @@ -16,16 +14,18 @@ impl FailureDetector {
loop {
let _ = tokio::time::sleep(Duration::from_secs(10)).await; // sleep for 10sec


let fetched_rows = &self.data_store.failed_to_fire(
Utc::now() - chrono_duration::seconds(10)
)
.await.unwrap();
if fetched_rows.len() > 0 {
let _id_list = &self.data_store.reset_to_init( fetched_rows).await.unwrap();
match &self.data_store.failed_to_fire(Utc::now() - chrono_duration::seconds(10)).await {
Ok(fetched_rows) => {
if fetched_rows.len() > 0 {
if let Err(e) = &self.data_store.reset_to_init(fetched_rows).await {
println!("error in monitor reset_to_init {}", e);
}
}
}
Err(e) => {
println!("error in monitor {}", e);
}
}
//TODO Log the list of id's that failed to fire and were re-sent to the init state

}
}
}
4 changes: 2 additions & 2 deletions src/postgres/pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl Pg {

impl Pg {
pub(crate) async fn insert_to_delay(&self, params: &TableInsertRow<'_>) -> Result<u64, PgError> {
let get_client_instant = Instant::now();
let get_client_instant = Instant::now();
let pg_client = self.get_client().await?;
let insert_query =
"INSERT INTO hanger (id, deadline, message_headers, message_key, message_value)
Expand Down Expand Up @@ -201,7 +201,7 @@ impl Pg {
let query_execute_instant = Instant::now();
let pg_client = self.get_client().await?;

let get_query = "SELECT * from hanger where readied_at > $1";
let get_query = "SELECT * from hanger where readied_at > $1";
let response = pg_client
.query(get_query, &[&delay_time])
.await
Expand Down

0 comments on commit 60a4ae7

Please sign in to comment.