Skip to content

Commit

Permalink
fix: refactor processor and receiver modules
Browse files Browse the repository at this point in the history
  • Loading branch information
Amninder Kaur committed Oct 25, 2023
1 parent cbce134 commit 00f7c05
Show file tree
Hide file tree
Showing 13 changed files with 365 additions and 416 deletions.
3 changes: 0 additions & 3 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,7 @@ TIMING_ADVANCE=0
FAIL_DETECT_INTERVAL=500
MAX_RETRIES=3
PROCESSOR_DB_POLL=10
<<<<<<< HEAD

# TRACING
OTEL_SERVICE_NAME=chronos
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://localhost:4318/v1/traces
=======
>>>>>>> 6107c18 (fix: handle retry and graceful close for all threads if one is stopped)
4 changes: 0 additions & 4 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
/target
<<<<<<< HEAD
.env

=======
/heathcheck
>>>>>>> a5a05c4 (fix: add health check file with 0 or 1)


### Linux ###
Expand Down
48 changes: 13 additions & 35 deletions How-to.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,24 @@ Use `make withenv RECIPE=docker.up`

## ENV vars
All the required configurations for Chronos can be passed in environment variables mentioned below
<<<<<<< HEAD

### Required Vars
|Env Var|Example Value|
|----|----|
|KAFKA_HOST|"localhost"
|KAFKA_PORT|9093
| KAFKA_CLIENT_ID|"chronos"
| KAFKA_GROUP_ID|"chronos"
| KAFKA_IN_TOPIC|"chronos.in"
| KAFKA_OUT_TOPIC|"chronos.out"
| KAFKA_USERNAME|
| KAFKA_PASSWORD|
| PG_HOST|localhost
| PG_PORT|5432
| PG_USER|admin
| PG_PASSWORD|admin
| PG_DATABASE|chronos_db
| PG_POOL_SIZE|50
|KAFKA_CLIENT_ID|"chronos"
|KAFKA_GROUP_ID|"chronos"
|KAFKA_IN_TOPIC|"chronos.in"
|KAFKA_OUT_TOPIC|"chronos.out"
|KAFKA_USERNAME|
|KAFKA_PASSWORD|
|PG_HOST|localhost
|PG_PORT|5432
|PG_USER|admin
|PG_PASSWORD|admin
|PG_DATABASE|chronos_db
|PG_POOL_SIZE|50

### Optional Vars
These values are set to fine tune performance Chrono in need, refer to [Chronos](./README.md)
Expand All @@ -51,28 +50,7 @@ These values are set to fine tune performance Chrono in need, refer to [Chronos]
| PROCESSOR_DB_POLL|5 milli sec
| TIMING_ADVANCE|0 sec
| FAIL_DETECT_INTERVAL|10 sec
=======
|Env Var|Example Value| Required|
|----|----|----|
|KAFKA_BROKERS|"localhost:9093"|True
| KAFKA_CLIENT_ID|"chronos"|True
| KAFKA_GROUP_ID|"chronos"|True
| KAFKA_IN_TOPIC|"chronos.in"|True
| KAFKA_OUT_TOPIC|"chronos.out"|True
| KAFKA_USERNAME||True
| KAFKA_PASSWORD||True
| PG_HOST|localhost|True
| PG_PORT|5432|True
| PG_USER|admin|True
| PG_PASSWORD|admin|True
| PG_DATABASE|chronos_db|True
| PG_POOL_SIZE|50|True
| DELAY_TIME|0|False
| RANDOMNESS_DELAY|100|False
| MONITOR_DB_POLL|5|False
| TIMING_ADVANCE|0|False
| FAIL_DETECT_INTERVAL|500|False
>>>>>>> 6107c18 (fix: handle retry and graceful close for all threads if one is stopped)
| HEALTHCHECK_FILE|healthcheck/chronos_healthcheck


## Observability
Expand Down
2 changes: 1 addition & 1 deletion chronos_bin/src/bin/chronos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ async fn main() {
env_logger::init();
dotenvy::dotenv().ok();

let protocol = std::env::var("TELEMETRY_PROTOCOL").unwrap_or_else(|_| "http/json".to_string());
let protocol = std::env::var("OTEL_EXPORTER_OTLP_PROTOCOL").unwrap_or_else(|_| "http/json".to_string());

let tracing_opentelemetry = TelemetryCollector::new(protocol, TelemetryCollectorType::Otlp);
tracing_opentelemetry.register_traces();
Expand Down
4 changes: 2 additions & 2 deletions chronos_bin/src/kafka/producer.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::collections::HashMap;
use std::time::Duration;

use crate::kafka::errors::KafkaAdapterError;
use crate::utils::util::into_headers;
use crate::{kafka::errors::KafkaAdapterError, utils::util::CHRONOS_ID};
use rdkafka::producer::{FutureProducer, FutureRecord};

use super::config::KafkaConfig;
Expand Down Expand Up @@ -44,6 +44,6 @@ impl KafkaProducer {
)
.await
.map_err(|(kafka_error, _record)| KafkaAdapterError::PublishMessage(kafka_error, "message publishing failed".to_string()))?;
Ok(unwrap_header["chronosId"].to_string())
Ok(unwrap_header[CHRONOS_ID].to_string())
}
}
224 changes: 108 additions & 116 deletions chronos_bin/src/message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use chrono::Utc;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio_postgres::Row;
use uuid::Uuid;

pub struct MessageProcessor {
Expand All @@ -14,8 +15,7 @@ pub struct MessageProcessor {
}

impl MessageProcessor {
pub async fn run(&self) {
//Get UUID for the node that deployed this thread
fn assign_node_id() -> Uuid {
let node_id: Uuid = match std::env::var("NODE_ID") {
Ok(val) => Uuid::parse_str(&val).unwrap_or_else(|_e| {
let uuid = uuid::Uuid::new_v4();
Expand All @@ -27,153 +27,145 @@ impl MessageProcessor {
uuid::Uuid::new_v4()
}
};
log::info!("node_id {}", node_id);
let mut delay_controller = DelayController::new(100);
node_id
}

fn gather_ids(result: Result<String, String>) -> String {
match result {
Ok(m) => m,
Err(e) => {
log::error!("Error: delayed message publish failed {:?}", e);
"".to_string()
}
}
}

#[tracing::instrument(skip_all, fields(correlationId))]
async fn prepare_to_publish(&self, row: Row) -> Result<String, String> {
let updated_row = TableRow {
id: row.get("id"),
deadline: row.get("deadline"),
readied_at: row.get("readied_at"),
readied_by: row.get("readied_by"),
message_headers: row.get("message_headers"),
message_key: row.get("message_key"),
message_value: row.get("message_value"),
};
let mut headers: HashMap<String, String> = match serde_json::from_str(&updated_row.message_headers.to_string()) {
Ok(t) => t,
Err(_e) => {
println!("error occurred while parsing");
HashMap::new()
}
};

let readied_by_column = Some(updated_row.readied_by.to_string());
tracing::Span::current().record("correlationId", &readied_by_column);

match readied_by_column {
Some(id) => {
headers.insert("readied_by".to_string(), id);
if let Ok(id) = self
.producer
.kafka_publish(updated_row.message_value.to_string(), Some(headers), updated_row.message_key.to_string())
.await
{
Ok(id)
} else {
Err("error occurred while publishing".to_string())
}
}
None => {
log::error!("Error: readied_by not found in db row {:?}", updated_row);
Err("error occurred while publishing".to_string())
}
}
}

#[tracing::instrument(skip_all, fields(deleted_ids))]
async fn delete_fired_records_from_db(&self, ids: &Vec<String>) {
//retry loop
let max_retry_count = 3;
let mut retry_count = 0;
while let Err(outcome_error) = &self.data_store.delete_fired(ids).await {
log::error!("Error: error occurred in message processor {}", outcome_error);
log::debug!("retrying");
retry_count += 1;
if retry_count == max_retry_count {
log::error!("Error: max retry count {} reached by node {:?} for deleting fired ids ", max_retry_count, ids);
break;
}
}
}

#[tracing::instrument(skip_all)]
async fn processor_message_ready(&self, node_id: Uuid) {
loop {
log::debug!("MessageProcessor loop");
tokio::time::sleep(Duration::from_millis(10)).await;
// tokio::time::sleep(Duration::from_millis(ChronosConfig::from_env().db_poll_interval)).await;
log::debug!("retry loop");
// thread::sleep(Duration::from_millis(100));
let max_retry_count = 3;
let mut retry_count = 0;

let deadline = Utc::now() - Duration::from_secs(ChronosConfig::from_env().time_advance);

let params = GetReady {
let param = GetReady {
readied_at: deadline,
readied_by: node_id,
deadline,
// limit: 1000,
// order: "asc",
};

//retry loop
let _ = &self.processor_message_ready(&params).await;

delay_controller.sleep().await;
}
}

#[tracing::instrument(skip_all, fields(node_id, correlationId, is_published, error))]
async fn processor_message_ready(&self, params: &GetReady) {
loop {
let max_retry_count = 3;
let mut retry_count = 0;
let readied_by_column: Option<String> = None;
let resp: Result<Vec<Row>, String> = self.data_store.ready_to_fire_db(&param).await;
match resp {
Ok(ready_to_publish_rows) => {
if ready_to_publish_rows.is_empty() {
log::debug!("no rows ready to fire for dealine {}", deadline);
break;
} else {
let publish_futures = ready_to_publish_rows.into_iter().map(|row| self.prepare_to_publish(row));

match &self.data_store.ready_to_fire_db(params).await {
Ok(publish_rows) => {
let rdy_to_pblsh_count = publish_rows.len();
if rdy_to_pblsh_count > 0 {
let mut ids: Vec<String> = Vec::with_capacity(rdy_to_pblsh_count);
let mut publish_futures = Vec::with_capacity(rdy_to_pblsh_count);
for row in publish_rows {
let updated_row = TableRow {
id: row.get("id"),
deadline: row.get("deadline"),
readied_at: row.get("readied_at"),
readied_by: row.get("readied_by"),
message_headers: row.get("message_headers"),
message_key: row.get("message_key"),
message_value: row.get("message_value"),
};
let mut headers: HashMap<String, String> = match serde_json::from_str(&updated_row.message_headers.to_string()) {
Ok(t) => t,
Err(_e) => {
println!("error occurred while parsing");
HashMap::new()
}
};
//TODO: handle empty headers

let readied_by = updated_row.readied_by.to_string();
tracing::Span::current().record("node_id", &readied_by);
headers.insert("readied_by".to_string(), readied_by);

tracing::Span::current().record("correlationId", updated_row.id.to_string());

publish_futures.push(self.producer.kafka_publish(
updated_row.message_value.to_string(),
Some(headers),
updated_row.message_key.to_string(),
// updated_row.id.to_string(),
))
}
let results = futures::future::join_all(publish_futures).await;
for result in results {
match result {
Ok(m) => {
tracing::Span::current().record("is_published", "true");
ids.push(m);
}
Err(e) => {
tracing::Span::current().record("is_published", "false");
tracing::Span::current().record("error", &e.to_string());

log::error!("Error: delayed message publish failed {:?}", e);
break;
// failure detection needs to pick
}
}
}

let ids: Vec<String> = results.into_iter().map(Self::gather_ids).collect();

if !ids.is_empty() {
if let Err(outcome_error) = &self.data_store.delete_fired(&ids).await {
println!("Error: error occurred in message processor delete_fired {}", outcome_error);
//add retry logic here
}
// println!("delete ids {:?} and break", ids);
let _ = self.delete_fired_records_from_db(&ids).await;
log::debug!("number of rows published successfully and deleted from DB {}", ids.len());
break;
}
log::debug!("number of rows published successfully and deleted from DB {}", ids.len());
} else {
log::debug!("no rows ready to fire for dealine ");
break;
}
}
Err(e) => {
if e.contains("could not serialize access due to concurrent update") && retry_count < max_retry_count {
//retry goes here
eprintln!("retrying");
log::debug!("retrying");
retry_count += 1;
if retry_count == max_retry_count {
log::error!(
"Error: max retry count {} reached by node {} for row ",
max_retry_count,
"node_id_option.unwrap()",
// row_id.unwrap()
);
log::error!("Error: max retry count {} reached by node {:?} for row ", max_retry_count, readied_by_column);
break;
}
// &self.process_db_rows(&params).await;
}
log::error!("Error: error occurred in message processor while publishing {}", e);
break;
}
}
}

// let node_id_option: Option<String> = node_id.clone().into();
// let mut row_id: Option<String> = None;
}
pub async fn run(&self) {
log::info!("MessageProcessor ON!");

#[tracing::instrument(skip_all, fields(correlationId))]
async fn clean_db(&self, ids: Vec<String>) {
//rety in case delete fails
let max_retries = 3;
let mut retry_count = 0;
//Get UUID for the node that deployed this thread
let node_id = Self::assign_node_id();

log::info!("node_id {}", node_id);
let mut delay_controller = DelayController::new(100);
loop {
if retry_count < max_retries {
match &self.data_store.delete_fired(&ids).await {
Ok(_) => {
tracing::Span::current().record("correlationId", ids.join(","));
break;
}
Err(e) => {
println!("Error: error occurred in message processor delete_fired {}", e);
retry_count += 1;
continue;
}
}
} else {
log::error!("Error: max retry count {} reached by node {} for row ", max_retries, "node_id_option.unwrap()",);
break;
}
log::debug!("MessageProcessor loop");
tokio::time::sleep(Duration::from_millis(10)).await;
self.processor_message_ready(node_id).await;

delay_controller.sleep().await;
}
}
}
Loading

0 comments on commit 00f7c05

Please sign in to comment.