Skip to content

Commit

Permalink
chore: changes for transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
Amninder Kaur committed Sep 7, 2023
1 parent 7b9c7b9 commit dd59fe4
Show file tree
Hide file tree
Showing 5 changed files with 293 additions and 156 deletions.
1 change: 0 additions & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ PG_PASSWORD=admin
PG_DATABASE=chronos
PG_POOL_SIZE=25


# CONFIG
RUST_LOG=info

Expand Down
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

144 changes: 92 additions & 52 deletions chronos_bin/src/message_processor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::kafka::producer::KafkaProducer;
use crate::postgres::pg::{GetReady, Pg, TableRow};
use crate::utils::delay_controller::DelayController;
use chrono::Utc;
use std::collections::HashMap;
use std::sync::Arc;
Expand All @@ -13,80 +14,119 @@ pub struct MessageProcessor {

impl MessageProcessor {
pub async fn run(&self) {
println!("MessageProcessor ON!");
log::info!("MessageProcessor ON!");

//Get UUID for the node that deployed this thread
let node_id: String = std::env::var("NODE_ID").unwrap_or_else(|_| uuid::Uuid::new_v4().to_string());
// log::info!("node_id {}", node_id);
let mut delay_controller = DelayController::new(100);
loop {
tokio::time::sleep(Duration::from_millis(10)).await;
// println!("MessageProcessor");
// tokio::time::sleep(Duration::from_millis(ChronosConfig::from_env().db_poll_interval)).await;
let deadline = Utc::now();
let uuid = Uuid::new_v4();

let param = GetReady {
readied_at: deadline,
readied_by: uuid,
readied_by: Uuid::parse_str(&node_id).unwrap(),
deadline,
limit: 1000,
// limit: 1000,
// order: "asc",
};

let mut ready_params = Vec::new();
ready_params.push(param);
//retry loop
loop {
// thread::sleep(Duration::from_millis(100));
let max_retry_count = 3;
let mut retry_count = 0;

match &self.data_store.ready_to_fire(&ready_params).await {
Ok(publish_rows) => {
let mut ids: Vec<String> = Vec::with_capacity(publish_rows.len());
let mut publish_futures = Vec::with_capacity(publish_rows.len());
for row in publish_rows {
let updated_row = TableRow {
id: row.get("id"),
deadline: row.get("deadline"),
readied_at: row.get("readied_at"),
readied_by: row.get("readied_by"),
message_headers: row.get("message_headers"),
message_key: row.get("message_key"),
message_value: row.get("message_value"),
};
let mut node_id: Option<String> = None;
// let mut row_id: Option<String> = None;
match &self.data_store.ready_to_fire(&param).await {
Ok(publish_rows) => {
let rdy_to_pblsh_count = publish_rows.len();
if rdy_to_pblsh_count > 0 {
let mut ids: Vec<String> = Vec::with_capacity(rdy_to_pblsh_count);
let mut publish_futures = Vec::with_capacity(rdy_to_pblsh_count);
for row in publish_rows {
let updated_row = TableRow {
id: row.get("id"),
deadline: row.get("deadline"),
readied_at: row.get("readied_at"),
readied_by: row.get("readied_by"),
message_headers: row.get("message_headers"),
message_key: row.get("message_key"),
message_value: row.get("message_value"),
};

let headers: HashMap<String, String> = match serde_json::from_str(&updated_row.message_headers.to_string()) {
Ok(t) => t,
Err(e) => {
println!("error occurred while parsing");
HashMap::new()
}
};
//TODO: handle empty headers
// println!("checking {:?}",headers);
let mut headers: HashMap<String, String> = match serde_json::from_str(&updated_row.message_headers.to_string()) {
Ok(ser_headers) => ser_headers,
Err(err) => {
log::error!("error occurred while parsing {}", err);
HashMap::new()
}
};

publish_futures.push(self.producer.publish(
updated_row.message_value.to_string(),
Some(headers),
updated_row.message_key.to_string(),
updated_row.id.to_string(),
))
}
let results = futures::future::join_all(publish_futures).await;
for result in results {
match result {
Ok(m) => {
ids.push(m);
node_id = Some(updated_row.readied_by.to_string());
// row_id = Some(updated_row.id.to_string());

headers.insert("readied_by".to_string(), node_id.unwrap());

publish_futures.push(self.producer.publish(
updated_row.message_value.to_string(),
Some(headers),
updated_row.message_key.to_string(),
updated_row.id.to_string(),
))
}
Err(e) => {
println!("publish failed {:?}", e);
// failure detection needs to pick
let results = futures::future::join_all(publish_futures).await;
for result in results {
match result {
Ok(m) => {
ids.push(m);
}
Err(e) => {
log::error!("Error: delayed message publish failed {:?}", e);
break;
// failure detection needs to pick
}
}
}

if !ids.is_empty() {
if let Err(outcome_error) = &self.data_store.delete_fired(&ids).await {
println!("Error: error occurred in message processor delete_fired {}", outcome_error);
//add retry logic here
}
println!("delete ids {:?} and break", ids);
break;
}
log::debug!("number of rows published successfully and deleted from DB {}", ids.len());
} else {
log::debug!("no rows ready to fire for dealine {}", deadline);
break;
}
}

if !ids.is_empty() {
if let Err(outcome_error) = &self.data_store.delete_fired(&ids).await {
println!("error occurred in message processor delete_fired {}", outcome_error);
Err(e) => {
if e.contains("could not serialize access due to concurrent update") && retry_count < max_retry_count {
//retry goes here
eprintln!("retrying");
retry_count += 1;
if retry_count == max_retry_count {
log::error!(
"Error: max retry count {} reached by node {} for row ",
max_retry_count,
node_id.unwrap(),
// row_id.unwrap()
);
break;
}
}
log::error!("Error: error occurred in message processor while publishing {}", e);
break;
}
}
Err(e) => {
println!("error occurred in message processor {}", e);
}
}
delay_controller.sleep().await;
}
}
}
4 changes: 2 additions & 2 deletions chronos_bin/src/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ impl FailureDetector {
loop {
let _ = tokio::time::sleep(Duration::from_secs(10)).await; // sleep for 10sec

match &self.data_store.failed_to_fire(Utc::now() - chrono_duration::seconds(10)).await {
match &self.data_store.failed_to_fire(&(Utc::now() - chrono_duration::seconds(10))).await {
Ok(fetched_rows) => {
if fetched_rows.len() > 0 {
if !fetched_rows.is_empty() {
if let Err(e) = &self.data_store.reset_to_init(fetched_rows).await {
println!("error in monitor reset_to_init {}", e);
}
Expand Down
Loading

0 comments on commit dd59fe4

Please sign in to comment.