Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: handle retry and stop for all threads if one is stopped #9

Merged
merged 4 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,17 @@ KAFKA_OUT_TOPIC="chronos.out"
KAFKA_USERNAME=
KAFKA_PASSWORD=

# KAFKA
# KAFKA_BROKERS=kb001.ksp-sbx.syd1.kc.thinkbig.local:9092,kb002.ksp-sbx.syd1.kc.thinkbig.local:9092,kb003.ksp-sbx.syd1.kc.thinkbig.local:9092,kb004.ksp-sbx.syd1.kc.thinkbig.local:9092,kb005.ksp-sbx.syd1.kc.thinkbig.local:9092
# KAFKA_CLIENT_ID="chronos-dev"
# KAFKA_GROUP_ID="chronos-dev"
# KAFKA_INPUT_TOPIC="sbx.ksp.bet.private.delay.in"
# KAFKA_OUTPUT_TOPIC="sbx.ksp.bet.private.delay.out"
# KAFKA_USERNAME="chimera-chronos"
# KAFKA_PASSWORD="Lbosg675kzTGkyXUw97r0Mt3gGiAfpa4"
# KAFKA_SASL_MECH="SCRAM-SHA-512"
# KAFKA_SEC_PROTOCOL="SASL_PLAINTEXT"

# POSTGRES
# NB: `node-postgres` AND `node-pg-migrate` USE THE SAME ENVIRONMENT VARIABLES AS `libpq` TO CONNECT TO A POSTGRESQL SERVER
# NODE_PG_FORCE_NATIVE=1
Expand All @@ -24,6 +35,17 @@ PG_PASSWORD=admin
PG_DATABASE=chronos_db
PG_POOL_SIZE=50

# POSTGRES
# NB: `node-postgres` AND `node-pg-migrate` USE THE SAME ENVIRONMENT VARIABLES AS `libpq` TO CONNECT TO A POSTGRESQL SERVER
# NODE_PG_FORCE_NATIVE=1
# PG_HOST=pgsql-ksp-sbx.unibet.com.au
# PG_PORT=5432
# PG_USER=chimera-chronos
# PG_PASSWORD=Npr0QfoU4TJNb3BH7fe21vfwhPTVwB4Q
# PG_DATABASE=chimera_chronos
# PG_POOL_SIZE=50


# CONFIG
RUST_LOG=info

Expand All @@ -38,4 +60,4 @@ PROCESSOR_DB_POLL=10

# TRACING
OTEL_SERVICE_NAME=chronos
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://localhost:4318/v1/traces
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://localhost:4318/v1/traces
18 changes: 9 additions & 9 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ name: release app binary on tag
on:
release:
types: [created]
# push:
# tags:
# # only build on tags that start with 'v'
# # having major, minor and path version numbers
# # along with alpha beta support
# # e.g. v1.0.0-alpha.1, v1.0.0-beta.1, v1.0.0
# - 'v[0-9]+.[0-9]+.[0-9]+(-[a-zA-Z0-9]+)+.[0-9]?'
# - 'v[0-9]+.[0-9]+.[0-9]'
# - 'v[0-9]+.[0-9]+.[0-9]+(-[a-zA-Z0-9]+)'
push:
tags:
# only build on tags that start with 'v'
# having major, minor and path version numbers
# along with alpha beta support
# e.g. v1.0.0-alpha.1, v1.0.0-beta.1, v1.0.0
- 'v[0-9]+.[0-9]+.[0-9]+(-[a-zA-Z0-9]+)+.[0-9]?'
- 'v[0-9]+.[0-9]+.[0-9]'
- 'v[0-9]+.[0-9]+.[0-9]+(-[a-zA-Z0-9]+)'

jobs:
build:
Expand Down
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/target
.env

/heathcheck


### Linux ###
Expand Down
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 13 additions & 12 deletions How-to.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,18 @@ All the required configurations for Chronos can be passed in environment variabl
|----|----|
|KAFKA_HOST|"localhost"
|KAFKA_PORT|9093
| KAFKA_CLIENT_ID|"chronos"
| KAFKA_GROUP_ID|"chronos"
| KAFKA_IN_TOPIC|"chronos.in"
| KAFKA_OUT_TOPIC|"chronos.out"
| KAFKA_USERNAME|
| KAFKA_PASSWORD|
| PG_HOST|localhost
| PG_PORT|5432
| PG_USER|admin
| PG_PASSWORD|admin
| PG_DATABASE|chronos_db
| PG_POOL_SIZE|50
|KAFKA_CLIENT_ID|"chronos"
|KAFKA_GROUP_ID|"chronos"
|KAFKA_IN_TOPIC|"chronos.in"
|KAFKA_OUT_TOPIC|"chronos.out"
|KAFKA_USERNAME|
|KAFKA_PASSWORD|
|PG_HOST|localhost
|PG_PORT|5432
|PG_USER|admin
|PG_PASSWORD|admin
|PG_DATABASE|chronos_db
|PG_POOL_SIZE|50

### Optional Vars
These values are set to fine tune performance Chrono in need, refer to [Chronos](./README.md)
Expand All @@ -50,6 +50,7 @@ These values are set to fine tune performance Chrono in need, refer to [Chronos]
| PROCESSOR_DB_POLL|5 milli sec
| TIMING_ADVANCE|0 sec
| FAIL_DETECT_INTERVAL|10 sec
| HEALTHCHECK_FILE|healthcheck/chronos_healthcheck


## Observability
Expand Down
2 changes: 1 addition & 1 deletion chronos_bin/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "chronos_bin"
version = "0.1.0"
version = "0.2.1"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand Down
22 changes: 19 additions & 3 deletions chronos_bin/src/bin/chronos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ use chronos_bin::postgres::config::PgConfig;
use chronos_bin::postgres::pg::Pg;
use chronos_bin::runner::Runner;
use chronos_bin::telemetry::register_telemetry::{TelemetryCollector, TelemetryCollectorType};
use log::debug;
use log::{debug, info};
use std::sync::Arc;
use std::time::Duration;

#[tokio::main]
async fn main() {
env_logger::init();
dotenvy::dotenv().ok();

let protocol = std::env::var("TELEMETRY_PROTOCOL").unwrap_or_else(|_| "http/json".to_string());
let protocol = std::env::var("OTEL_EXPORTER_OTLP_PROTOCOL").unwrap_or_else(|_| "http/json".to_string());

let tracing_opentelemetry = TelemetryCollector::new(protocol, TelemetryCollectorType::Otlp);
tracing_opentelemetry.register_traces();
Expand All @@ -23,8 +24,23 @@ async fn main() {

let kafka_consumer = KafkaConsumer::new(&kafka_config);
let kafka_producer = KafkaProducer::new(&kafka_config);
let data_store = Pg::new(pg_config).await.unwrap();
let data_store = match Pg::new(pg_config).await {
Ok(pg) => pg,
Err(e) => loop {
log::error!("couldnt connect to PG DB due to error::{} will retry ", e);
tokio::time::sleep(Duration::from_secs(10)).await;
let pg_config = PgConfig::from_env();
match Pg::new(pg_config).await {
Ok(pg) => pg,
Err(e) => {
log::error!("error while creating PG intance {}", e);
continue;
}
};
},
};

info!("starting chronos establish connections");
let r = Runner {
data_store: Arc::new(data_store),
producer: Arc::new(kafka_producer),
Expand Down
1 change: 0 additions & 1 deletion chronos_bin/src/kafka/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
pub mod config;
pub mod consumer;
pub mod errors;
pub mod kafka_deploy;
pub mod producer;
6 changes: 3 additions & 3 deletions chronos_bin/src/kafka/producer.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::collections::HashMap;
use std::time::Duration;

use crate::kafka::errors::KafkaAdapterError;
use crate::utils::util::into_headers;
use crate::{kafka::errors::KafkaAdapterError, utils::util::CHRONOS_ID};
use rdkafka::producer::{FutureProducer, FutureRecord};

use super::config::KafkaConfig;
Expand All @@ -18,7 +18,7 @@ pub struct KafkaProducer {

impl KafkaProducer {
pub fn new(config: &KafkaConfig) -> Self {
// Kafka Producer
// rdlibkafka goes infinitely trying to connect to kafka broker
let producer = config.build_producer_config().create().expect("Failed to create producer");
let topic = config.out_topic.to_owned();

Expand All @@ -44,6 +44,6 @@ impl KafkaProducer {
)
.await
.map_err(|(kafka_error, _record)| KafkaAdapterError::PublishMessage(kafka_error, "message publishing failed".to_string()))?;
Ok(unwrap_header["chronosId"].to_string())
Ok(unwrap_header[CHRONOS_ID].to_string())
}
}
Loading
Loading