Skip to content

Commit

Permalink
fix: handle retry and graceful close for all threads if one is stopped
Browse files Browse the repository at this point in the history
  • Loading branch information
Amninder Kaur committed Oct 12, 2023
1 parent b20560a commit 6107c18
Show file tree
Hide file tree
Showing 18 changed files with 322 additions and 229 deletions.
26 changes: 24 additions & 2 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,17 @@ KAFKA_OUT_TOPIC="chronos.out"
KAFKA_USERNAME=
KAFKA_PASSWORD=

# KAFKA
# KAFKA_BROKERS=kb001.ksp-sbx.syd1.kc.thinkbig.local:9092,kb002.ksp-sbx.syd1.kc.thinkbig.local:9092,kb003.ksp-sbx.syd1.kc.thinkbig.local:9092,kb004.ksp-sbx.syd1.kc.thinkbig.local:9092,kb005.ksp-sbx.syd1.kc.thinkbig.local:9092
# KAFKA_CLIENT_ID="chronos-dev"
# KAFKA_GROUP_ID="chronos-dev"
# KAFKA_INPUT_TOPIC="sbx.ksp.bet.private.delay.in"
# KAFKA_OUTPUT_TOPIC="sbx.ksp.bet.private.delay.out"
# KAFKA_USERNAME="chimera-chronos"
# KAFKA_PASSWORD="Lbosg675kzTGkyXUw97r0Mt3gGiAfpa4"
# KAFKA_SASL_MECH="SCRAM-SHA-512"
# KAFKA_SEC_PROTOCOL="SASL_PLAINTEXT"

# POSTGRES
# NB: `node-postgres` AND `node-pg-migrate` USE THE SAME ENVIRONMENT VARIABLES AS `libpq` TO CONNECT TO A POSTGRESQL SERVER
# NODE_PG_FORCE_NATIVE=1
Expand All @@ -24,8 +35,19 @@ PG_PASSWORD=admin
PG_DATABASE=chronos_db
PG_POOL_SIZE=50

# POSTGRES
# NB: `node-postgres` AND `node-pg-migrate` USE THE SAME ENVIRONMENT VARIABLES AS `libpq` TO CONNECT TO A POSTGRESQL SERVER
# NODE_PG_FORCE_NATIVE=1
# PG_HOST=pgsql-ksp-sbx.unibet.com.au
# PG_PORT=5432
# PG_USER=chimera-chronos
# PG_PASSWORD=Npr0QfoU4TJNb3BH7fe21vfwhPTVwB4Q
# PG_DATABASE=chimera_chronos
# PG_POOL_SIZE=50


# CONFIG
RUST_LOG=debug
RUST_LOG=info

#APP
DELAY_TIME=0
Expand All @@ -34,4 +56,4 @@ MONITOR_DB_POLL=5
TIMING_ADVANCE=0
FAIL_DETECT_INTERVAL=500
MAX_RETRIES=3
PROCESSOR_DB_POLL=10
PROCESSOR_DB_POLL=10
18 changes: 9 additions & 9 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ name: release app binary on tag
on:
release:
types: [created]
# push:
# tags:
# # only build on tags that start with 'v'
# # having major, minor and path version numbers
# # along with alpha beta support
# # e.g. v1.0.0-alpha.1, v1.0.0-beta.1, v1.0.0
# - 'v[0-9]+.[0-9]+.[0-9]+(-[a-zA-Z0-9]+)+.[0-9]?'
# - 'v[0-9]+.[0-9]+.[0-9]'
# - 'v[0-9]+.[0-9]+.[0-9]+(-[a-zA-Z0-9]+)'
push:
tags:
# only build on tags that start with 'v'
# having major, minor and path version numbers
# along with alpha beta support
# e.g. v1.0.0-alpha.1, v1.0.0-beta.1, v1.0.0
- 'v[0-9]+.[0-9]+.[0-9]+(-[a-zA-Z0-9]+)+.[0-9]?'
- 'v[0-9]+.[0-9]+.[0-9]'
- 'v[0-9]+.[0-9]+.[0-9]+(-[a-zA-Z0-9]+)'

jobs:
build:
Expand Down
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions How-to.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ All the required configurations for Chronos can be passed in environment variabl
| PG_PASSWORD|admin|True
| PG_DATABASE|chronos_db|True
| PG_POOL_SIZE|50|True
|NODE_ID|UUID|False
| DELAY_TIME|0|False
| RANDOMNESS_DELAY|100|False
| MONITOR_DB_POLL|5|False
Expand Down
2 changes: 1 addition & 1 deletion chronos_bin/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "chronos_bin"
version = "0.1.0"
version = "0.2.1"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand Down
20 changes: 18 additions & 2 deletions chronos_bin/src/bin/chronos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use chronos_bin::kafka::producer::KafkaProducer;
use chronos_bin::postgres::config::PgConfig;
use chronos_bin::postgres::pg::Pg;
use chronos_bin::runner::Runner;
use log::debug;
use log::{debug, info};
use std::sync::Arc;
use std::time::Duration;

#[tokio::main]
async fn main() {
Expand All @@ -17,8 +18,23 @@ async fn main() {

let kafka_consumer = KafkaConsumer::new(&kafka_config);
let kafka_producer = KafkaProducer::new(&kafka_config);
let data_store = Pg::new(pg_config).await.unwrap();
let data_store = match Pg::new(pg_config).await {
Ok(pg) => pg,
Err(e) => loop {
log::error!("couldnt connect to PG DB due to error::{} will retry ", e);
tokio::time::sleep(Duration::from_secs(10)).await;
let pg_config = PgConfig::from_env();
match Pg::new(pg_config).await {
Ok(pg) => pg,
Err(e) => {
log::error!("error while creating PG intance {}", e);
continue;
}
};
},
};

info!("starting chronos establish connections");
let r = Runner {
data_store: Arc::new(data_store),
producer: Arc::new(kafka_producer),
Expand Down
27 changes: 2 additions & 25 deletions chronos_bin/src/kafka/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,8 @@ pub struct KafkaConsumer {

impl KafkaConsumer {
pub fn new(config: &KafkaConfig) -> Self {
// let consumer = config.build_consumer_config().create().expect("Failed to create consumer");
let consumer = match config.build_consumer_config().create() {
Ok(consumer) => consumer,
Err(e) => {
log::error!("error creating consumer {:?}", e);
//retry
log::info!("retrying in 5 seconds");
std::thread::sleep(Duration::from_secs(5));
loop {
match config.build_consumer_config().create() {
Ok(consumer) => {
log::info!("connected to kafka");
break consumer;
}
Err(e) => {
log::error!("error creating consumer {:?}", e);
//retry
log::info!("retrying in 5 seconds");
std::thread::sleep(Duration::from_secs(5));
}
}
}
}
};

// rdlibkafka goes infinitely trying to connect to kafka broker
let consumer = config.build_consumer_config().create().expect("Failed to create consumer");
let topic = config.in_topic.clone();
Self { consumer, topic }
}
Expand Down
1 change: 0 additions & 1 deletion chronos_bin/src/kafka/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
pub mod config;
pub mod consumer;
pub mod errors;
pub mod kafka_deploy;
pub mod producer;
1 change: 1 addition & 0 deletions chronos_bin/src/kafka/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub struct KafkaProducer {

impl KafkaProducer {
pub fn new(config: &KafkaConfig) -> Self {
// rdlibkafka goes infinitely trying to connect to kafka broker
let producer = config.build_producer_config().create().expect("Failed to create producer");
let topic = config.out_topic.to_owned();

Expand Down
49 changes: 31 additions & 18 deletions chronos_bin/src/message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,41 @@ impl MessageProcessor {
log::info!("MessageProcessor ON!");

//Get UUID for the node that deployed this thread
let node_id: String = std::env::var("NODE_ID").unwrap_or_else(|_| uuid::Uuid::new_v4().to_string());
// log::info!("node_id {}", node_id);
let node_id: Uuid = match std::env::var("NODE_ID") {
Ok(val) => Uuid::parse_str(&val).unwrap_or_else(|_e| {
let uuid = uuid::Uuid::new_v4();
log::info!("NODE_ID not found in env assigning {}", uuid);
uuid
}),
Err(_e) => {
log::info!("NODE_ID not found in env");
uuid::Uuid::new_v4()
}
};
log::info!("node_id {}", node_id);
let mut delay_controller = DelayController::new(100);
loop {
log::debug!("MessageProcessor loop");
tokio::time::sleep(Duration::from_millis(10)).await;
// tokio::time::sleep(Duration::from_millis(ChronosConfig::from_env().db_poll_interval)).await;
let deadline = Utc::now() - Duration::from_secs(ChronosConfig::from_env().time_advance);

let param = GetReady {
readied_at: deadline,
readied_by: Uuid::parse_str(&node_id).unwrap(),
readied_by: node_id,
deadline,
// limit: 1000,
// order: "asc",
};

//retry loop
loop {
log::debug!("retry loop");
// thread::sleep(Duration::from_millis(100));
let max_retry_count = 3;
let mut retry_count = 0;

let mut node_id: Option<String> = None;
// let mut row_id: Option<String> = None;
match &self.data_store.ready_to_fire(&param).await {
Ok(publish_rows) => {
let rdy_to_pblsh_count = publish_rows.len();
Expand All @@ -66,13 +77,18 @@ impl MessageProcessor {
HashMap::new()
}
};
//TODO: handle empty headers
// println!("checking {:?}",headers);

node_id = Some(updated_row.readied_by.to_string());
// row_id = Some(updated_row.id.to_string());

headers.insert("readied_by".to_string(), node_id.unwrap());
match node_id {
Some(id) => {
headers.insert("readied_by".to_string(), id);
}
None => {
log::error!("Error: readied_by not found in db row {:?}", updated_row);
break;
}
}

publish_futures.push(self.producer.publish(
updated_row.message_value.to_string(),
Expand All @@ -85,6 +101,7 @@ impl MessageProcessor {
for result in results {
match result {
Ok(m) => {
log::info!("message published successfully {:?}", m);
ids.push(m);
}
Err(e) => {
Expand All @@ -97,13 +114,15 @@ impl MessageProcessor {

if !ids.is_empty() {
if let Err(outcome_error) = &self.data_store.delete_fired(&ids).await {
println!("Error: error occurred in message processor delete_fired {}", outcome_error);
log::error!("Error: error occurred in message processor {}", outcome_error);
break;
//add retry logic here
}
println!("delete ids {:?} and break", ids);
log::info!("delete ids {:?} and break", ids);
break;
}
log::debug!("number of rows published successfully and deleted from DB {}", ids.len());
break;
} else {
log::debug!("no rows ready to fire for dealine {}", deadline);
break;
Expand All @@ -112,20 +131,14 @@ impl MessageProcessor {
Err(e) => {
if e.contains("could not serialize access due to concurrent update") && retry_count < max_retry_count {
//retry goes here
eprintln!("retrying");
log::info!("retrying");
retry_count += 1;
if retry_count == max_retry_count {
log::error!(
"Error: max retry count {} reached by node {} for row ",
max_retry_count,
node_id.unwrap(),
// row_id.unwrap()
);
log::error!("Error: max retry count {} reached by node {:?} for row ", max_retry_count, node_id);
break;
}
}
log::error!("Error: error occurred in message processor while publishing {}", e);
break;
}
}
}
Expand Down
Loading

0 comments on commit 6107c18

Please sign in to comment.