Skip to content

Commit

Permalink
fix: handle retry and stop for all threads if one is stopped (#9)
Browse files Browse the repository at this point in the history
* fix: handle retry and graceful close for all threads if one is stopped

* fix: add health check file with 0 or 1

* fix: refactor processor and receiver modules

* fix: updates to fail fast and capture the right ids

---------

Co-authored-by: Amninder Kaur <[email protected]>
  • Loading branch information
akaur13 and Amninder Kaur authored Oct 26, 2023
1 parent a097d9c commit a7eb56c
Show file tree
Hide file tree
Showing 20 changed files with 518 additions and 453 deletions.
24 changes: 23 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,17 @@ KAFKA_OUT_TOPIC="chronos.out"
KAFKA_USERNAME=
KAFKA_PASSWORD=

# KAFKA
# KAFKA_BROKERS=kb001.ksp-sbx.syd1.kc.thinkbig.local:9092,kb002.ksp-sbx.syd1.kc.thinkbig.local:9092,kb003.ksp-sbx.syd1.kc.thinkbig.local:9092,kb004.ksp-sbx.syd1.kc.thinkbig.local:9092,kb005.ksp-sbx.syd1.kc.thinkbig.local:9092
# KAFKA_CLIENT_ID="chronos-dev"
# KAFKA_GROUP_ID="chronos-dev"
# KAFKA_INPUT_TOPIC="sbx.ksp.bet.private.delay.in"
# KAFKA_OUTPUT_TOPIC="sbx.ksp.bet.private.delay.out"
# KAFKA_USERNAME="chimera-chronos"
# KAFKA_PASSWORD="Lbosg675kzTGkyXUw97r0Mt3gGiAfpa4"
# KAFKA_SASL_MECH="SCRAM-SHA-512"
# KAFKA_SEC_PROTOCOL="SASL_PLAINTEXT"

# POSTGRES
# NB: `node-postgres` AND `node-pg-migrate` USE THE SAME ENVIRONMENT VARIABLES AS `libpq` TO CONNECT TO A POSTGRESQL SERVER
# NODE_PG_FORCE_NATIVE=1
Expand All @@ -24,6 +35,17 @@ PG_PASSWORD=admin
PG_DATABASE=chronos_db
PG_POOL_SIZE=50

# POSTGRES
# NB: `node-postgres` AND `node-pg-migrate` USE THE SAME ENVIRONMENT VARIABLES AS `libpq` TO CONNECT TO A POSTGRESQL SERVER
# NODE_PG_FORCE_NATIVE=1
# PG_HOST=pgsql-ksp-sbx.unibet.com.au
# PG_PORT=5432
# PG_USER=chimera-chronos
# PG_PASSWORD=Npr0QfoU4TJNb3BH7fe21vfwhPTVwB4Q
# PG_DATABASE=chimera_chronos
# PG_POOL_SIZE=50


# CONFIG
RUST_LOG=info

Expand All @@ -38,4 +60,4 @@ PROCESSOR_DB_POLL=10

# TRACING
OTEL_SERVICE_NAME=chronos
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://localhost:4318/v1/traces
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://localhost:4318/v1/traces
18 changes: 9 additions & 9 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ name: release app binary on tag
on:
release:
types: [created]
# push:
# tags:
# # only build on tags that start with 'v'
# # having major, minor and path version numbers
# # along with alpha beta support
# # e.g. v1.0.0-alpha.1, v1.0.0-beta.1, v1.0.0
# - 'v[0-9]+.[0-9]+.[0-9]+(-[a-zA-Z0-9]+)+.[0-9]?'
# - 'v[0-9]+.[0-9]+.[0-9]'
# - 'v[0-9]+.[0-9]+.[0-9]+(-[a-zA-Z0-9]+)'
push:
tags:
# only build on tags that start with 'v'
# having major, minor and path version numbers
# along with alpha beta support
# e.g. v1.0.0-alpha.1, v1.0.0-beta.1, v1.0.0
- 'v[0-9]+.[0-9]+.[0-9]+(-[a-zA-Z0-9]+)+.[0-9]?'
- 'v[0-9]+.[0-9]+.[0-9]'
- 'v[0-9]+.[0-9]+.[0-9]+(-[a-zA-Z0-9]+)'

jobs:
build:
Expand Down
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/target
.env

/heathcheck


### Linux ###
Expand Down
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 13 additions & 12 deletions How-to.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,18 @@ All the required configurations for Chronos can be passed in environment variabl
|----|----|
|KAFKA_HOST|"localhost"
|KAFKA_PORT|9093
| KAFKA_CLIENT_ID|"chronos"
| KAFKA_GROUP_ID|"chronos"
| KAFKA_IN_TOPIC|"chronos.in"
| KAFKA_OUT_TOPIC|"chronos.out"
| KAFKA_USERNAME|
| KAFKA_PASSWORD|
| PG_HOST|localhost
| PG_PORT|5432
| PG_USER|admin
| PG_PASSWORD|admin
| PG_DATABASE|chronos_db
| PG_POOL_SIZE|50
|KAFKA_CLIENT_ID|"chronos"
|KAFKA_GROUP_ID|"chronos"
|KAFKA_IN_TOPIC|"chronos.in"
|KAFKA_OUT_TOPIC|"chronos.out"
|KAFKA_USERNAME|
|KAFKA_PASSWORD|
|PG_HOST|localhost
|PG_PORT|5432
|PG_USER|admin
|PG_PASSWORD|admin
|PG_DATABASE|chronos_db
|PG_POOL_SIZE|50

### Optional Vars
These values are set to fine tune performance Chrono in need, refer to [Chronos](./README.md)
Expand All @@ -50,6 +50,7 @@ These values are set to fine tune performance Chrono in need, refer to [Chronos]
| PROCESSOR_DB_POLL|5 milli sec
| TIMING_ADVANCE|0 sec
| FAIL_DETECT_INTERVAL|10 sec
| HEALTHCHECK_FILE|healthcheck/chronos_healthcheck


## Observability
Expand Down
2 changes: 1 addition & 1 deletion chronos_bin/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "chronos_bin"
version = "0.1.0"
version = "0.2.1"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand Down
22 changes: 19 additions & 3 deletions chronos_bin/src/bin/chronos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ use chronos_bin::postgres::config::PgConfig;
use chronos_bin::postgres::pg::Pg;
use chronos_bin::runner::Runner;
use chronos_bin::telemetry::register_telemetry::{TelemetryCollector, TelemetryCollectorType};
use log::debug;
use log::{debug, info};
use std::sync::Arc;
use std::time::Duration;

#[tokio::main]
async fn main() {
env_logger::init();
dotenvy::dotenv().ok();

let protocol = std::env::var("TELEMETRY_PROTOCOL").unwrap_or_else(|_| "http/json".to_string());
let protocol = std::env::var("OTEL_EXPORTER_OTLP_PROTOCOL").unwrap_or_else(|_| "http/json".to_string());

let tracing_opentelemetry = TelemetryCollector::new(protocol, TelemetryCollectorType::Otlp);
tracing_opentelemetry.register_traces();
Expand All @@ -23,8 +24,23 @@ async fn main() {

let kafka_consumer = KafkaConsumer::new(&kafka_config);
let kafka_producer = KafkaProducer::new(&kafka_config);
let data_store = Pg::new(pg_config).await.unwrap();
let data_store = match Pg::new(pg_config).await {
Ok(pg) => pg,
Err(e) => loop {
log::error!("couldnt connect to PG DB due to error::{} will retry ", e);
tokio::time::sleep(Duration::from_secs(10)).await;
let pg_config = PgConfig::from_env();
match Pg::new(pg_config).await {
Ok(pg) => pg,
Err(e) => {
log::error!("error while creating PG intance {}", e);
continue;
}
};
},
};

info!("starting chronos establish connections");
let r = Runner {
data_store: Arc::new(data_store),
producer: Arc::new(kafka_producer),
Expand Down
1 change: 0 additions & 1 deletion chronos_bin/src/kafka/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
pub mod config;
pub mod consumer;
pub mod errors;
pub mod kafka_deploy;
pub mod producer;
6 changes: 3 additions & 3 deletions chronos_bin/src/kafka/producer.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::collections::HashMap;
use std::time::Duration;

use crate::kafka::errors::KafkaAdapterError;
use crate::utils::util::into_headers;
use crate::{kafka::errors::KafkaAdapterError, utils::util::CHRONOS_ID};
use rdkafka::producer::{FutureProducer, FutureRecord};

use super::config::KafkaConfig;
Expand All @@ -18,7 +18,7 @@ pub struct KafkaProducer {

impl KafkaProducer {
pub fn new(config: &KafkaConfig) -> Self {
// Kafka Producer
// rdlibkafka goes infinitely trying to connect to kafka broker
let producer = config.build_producer_config().create().expect("Failed to create producer");
let topic = config.out_topic.to_owned();

Expand All @@ -44,6 +44,6 @@ impl KafkaProducer {
)
.await
.map_err(|(kafka_error, _record)| KafkaAdapterError::PublishMessage(kafka_error, "message publishing failed".to_string()))?;
Ok(unwrap_header["chronosId"].to_string())
Ok(unwrap_header[CHRONOS_ID].to_string())
}
}
Loading

0 comments on commit a7eb56c

Please sign in to comment.