From 00f7c0597fa5e63b857575e4f99dd9c2aca474b1 Mon Sep 17 00:00:00 2001 From: Amninder Kaur Date: Tue, 24 Oct 2023 11:06:43 +1100 Subject: [PATCH] fix: refactor processor and receiver modules --- .env.example | 3 - .gitignore | 4 - How-to.md | 48 ++---- chronos_bin/src/bin/chronos.rs | 2 +- chronos_bin/src/kafka/producer.rs | 4 +- chronos_bin/src/message_processor.rs | 224 +++++++++++++-------------- chronos_bin/src/message_receiver.rs | 167 ++++++++++---------- chronos_bin/src/monitor.rs | 38 ++--- chronos_bin/src/postgres/pg.rs | 73 +++++---- chronos_bin/src/runner.rs | 29 ++-- chronos_bin/src/utils/util.rs | 57 +++---- docker-compose.yml | 126 +++++++-------- infra/otelcol-config.yml | 6 +- 13 files changed, 365 insertions(+), 416 deletions(-) diff --git a/.env.example b/.env.example index 799e49c..40a1135 100644 --- a/.env.example +++ b/.env.example @@ -57,10 +57,7 @@ TIMING_ADVANCE=0 FAIL_DETECT_INTERVAL=500 MAX_RETRIES=3 PROCESSOR_DB_POLL=10 -<<<<<<< HEAD # TRACING OTEL_SERVICE_NAME=chronos OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://localhost:4318/v1/traces -======= ->>>>>>> 6107c18 (fix: handle retry and graceful close for all threads if one is stopped) diff --git a/.gitignore b/.gitignore index b2204e5..7567c49 100644 --- a/.gitignore +++ b/.gitignore @@ -1,10 +1,6 @@ /target -<<<<<<< HEAD .env - -======= /heathcheck ->>>>>>> a5a05c4 (fix: add health check file with 0 or 1) ### Linux ### diff --git a/How-to.md b/How-to.md index c96fee4..a7cd568 100644 --- a/How-to.md +++ b/How-to.md @@ -23,25 +23,24 @@ Use `make withenv RECIPE=docker.up` ## ENV vars All the required configurations for Chronos can be passed in environment variables mentioned below -<<<<<<< HEAD ### Required Vars |Env Var|Example Value| |----|----| |KAFKA_HOST|"localhost" |KAFKA_PORT|9093 -| KAFKA_CLIENT_ID|"chronos" -| KAFKA_GROUP_ID|"chronos" -| KAFKA_IN_TOPIC|"chronos.in" -| KAFKA_OUT_TOPIC|"chronos.out" -| KAFKA_USERNAME| -| KAFKA_PASSWORD| -| PG_HOST|localhost -| PG_PORT|5432 -| PG_USER|admin -| PG_PASSWORD|admin -| PG_DATABASE|chronos_db -| PG_POOL_SIZE|50 +|KAFKA_CLIENT_ID|"chronos" +|KAFKA_GROUP_ID|"chronos" +|KAFKA_IN_TOPIC|"chronos.in" +|KAFKA_OUT_TOPIC|"chronos.out" +|KAFKA_USERNAME| +|KAFKA_PASSWORD| +|PG_HOST|localhost +|PG_PORT|5432 +|PG_USER|admin +|PG_PASSWORD|admin +|PG_DATABASE|chronos_db +|PG_POOL_SIZE|50 ### Optional Vars These values are set to fine tune performance Chrono in need, refer to [Chronos](./README.md) @@ -51,28 +50,7 @@ These values are set to fine tune performance Chrono in need, refer to [Chronos] | PROCESSOR_DB_POLL|5 milli sec | TIMING_ADVANCE|0 sec | FAIL_DETECT_INTERVAL|10 sec -======= -|Env Var|Example Value| Required| -|----|----|----| -|KAFKA_BROKERS|"localhost:9093"|True -| KAFKA_CLIENT_ID|"chronos"|True -| KAFKA_GROUP_ID|"chronos"|True -| KAFKA_IN_TOPIC|"chronos.in"|True -| KAFKA_OUT_TOPIC|"chronos.out"|True -| KAFKA_USERNAME||True -| KAFKA_PASSWORD||True -| PG_HOST|localhost|True -| PG_PORT|5432|True -| PG_USER|admin|True -| PG_PASSWORD|admin|True -| PG_DATABASE|chronos_db|True -| PG_POOL_SIZE|50|True -| DELAY_TIME|0|False -| RANDOMNESS_DELAY|100|False -| MONITOR_DB_POLL|5|False -| TIMING_ADVANCE|0|False -| FAIL_DETECT_INTERVAL|500|False ->>>>>>> 6107c18 (fix: handle retry and graceful close for all threads if one is stopped) +| HEALTHCHECK_FILE|healthcheck/chronos_healthcheck ## Observability diff --git a/chronos_bin/src/bin/chronos.rs b/chronos_bin/src/bin/chronos.rs index 1e6cb15..0d9a36b 100644 --- a/chronos_bin/src/bin/chronos.rs +++ b/chronos_bin/src/bin/chronos.rs @@ -14,7 +14,7 @@ async fn main() { env_logger::init(); dotenvy::dotenv().ok(); - let protocol = std::env::var("TELEMETRY_PROTOCOL").unwrap_or_else(|_| "http/json".to_string()); + let protocol = std::env::var("OTEL_EXPORTER_OTLP_PROTOCOL").unwrap_or_else(|_| "http/json".to_string()); let tracing_opentelemetry = TelemetryCollector::new(protocol, TelemetryCollectorType::Otlp); tracing_opentelemetry.register_traces(); diff --git a/chronos_bin/src/kafka/producer.rs b/chronos_bin/src/kafka/producer.rs index 3b94743..3fc3b94 100644 --- a/chronos_bin/src/kafka/producer.rs +++ b/chronos_bin/src/kafka/producer.rs @@ -1,8 +1,8 @@ use std::collections::HashMap; use std::time::Duration; -use crate::kafka::errors::KafkaAdapterError; use crate::utils::util::into_headers; +use crate::{kafka::errors::KafkaAdapterError, utils::util::CHRONOS_ID}; use rdkafka::producer::{FutureProducer, FutureRecord}; use super::config::KafkaConfig; @@ -44,6 +44,6 @@ impl KafkaProducer { ) .await .map_err(|(kafka_error, _record)| KafkaAdapterError::PublishMessage(kafka_error, "message publishing failed".to_string()))?; - Ok(unwrap_header["chronosId"].to_string()) + Ok(unwrap_header[CHRONOS_ID].to_string()) } } diff --git a/chronos_bin/src/message_processor.rs b/chronos_bin/src/message_processor.rs index e2cac4b..8a4ced7 100644 --- a/chronos_bin/src/message_processor.rs +++ b/chronos_bin/src/message_processor.rs @@ -6,6 +6,7 @@ use chrono::Utc; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; +use tokio_postgres::Row; use uuid::Uuid; pub struct MessageProcessor { @@ -14,8 +15,7 @@ pub struct MessageProcessor { } impl MessageProcessor { - pub async fn run(&self) { - //Get UUID for the node that deployed this thread + fn assign_node_id() -> Uuid { let node_id: Uuid = match std::env::var("NODE_ID") { Ok(val) => Uuid::parse_str(&val).unwrap_or_else(|_e| { let uuid = uuid::Uuid::new_v4(); @@ -27,15 +27,88 @@ impl MessageProcessor { uuid::Uuid::new_v4() } }; - log::info!("node_id {}", node_id); - let mut delay_controller = DelayController::new(100); + node_id + } + + fn gather_ids(result: Result) -> String { + match result { + Ok(m) => m, + Err(e) => { + log::error!("Error: delayed message publish failed {:?}", e); + "".to_string() + } + } + } + + #[tracing::instrument(skip_all, fields(correlationId))] + async fn prepare_to_publish(&self, row: Row) -> Result { + let updated_row = TableRow { + id: row.get("id"), + deadline: row.get("deadline"), + readied_at: row.get("readied_at"), + readied_by: row.get("readied_by"), + message_headers: row.get("message_headers"), + message_key: row.get("message_key"), + message_value: row.get("message_value"), + }; + let mut headers: HashMap = match serde_json::from_str(&updated_row.message_headers.to_string()) { + Ok(t) => t, + Err(_e) => { + println!("error occurred while parsing"); + HashMap::new() + } + }; + + let readied_by_column = Some(updated_row.readied_by.to_string()); + tracing::Span::current().record("correlationId", &readied_by_column); + + match readied_by_column { + Some(id) => { + headers.insert("readied_by".to_string(), id); + if let Ok(id) = self + .producer + .kafka_publish(updated_row.message_value.to_string(), Some(headers), updated_row.message_key.to_string()) + .await + { + Ok(id) + } else { + Err("error occurred while publishing".to_string()) + } + } + None => { + log::error!("Error: readied_by not found in db row {:?}", updated_row); + Err("error occurred while publishing".to_string()) + } + } + } + + #[tracing::instrument(skip_all, fields(deleted_ids))] + async fn delete_fired_records_from_db(&self, ids: &Vec) { + //retry loop + let max_retry_count = 3; + let mut retry_count = 0; + while let Err(outcome_error) = &self.data_store.delete_fired(ids).await { + log::error!("Error: error occurred in message processor {}", outcome_error); + log::debug!("retrying"); + retry_count += 1; + if retry_count == max_retry_count { + log::error!("Error: max retry count {} reached by node {:?} for deleting fired ids ", max_retry_count, ids); + break; + } + } + } + + #[tracing::instrument(skip_all)] + async fn processor_message_ready(&self, node_id: Uuid) { loop { - log::debug!("MessageProcessor loop"); - tokio::time::sleep(Duration::from_millis(10)).await; - // tokio::time::sleep(Duration::from_millis(ChronosConfig::from_env().db_poll_interval)).await; + log::debug!("retry loop"); + // thread::sleep(Duration::from_millis(100)); + let max_retry_count = 3; + let mut retry_count = 0; + let deadline = Utc::now() - Duration::from_secs(ChronosConfig::from_env().time_advance); - let params = GetReady { + let param = GetReady { readied_at: deadline, readied_by: node_id, deadline, @@ -43,137 +116,56 @@ impl MessageProcessor { // order: "asc", }; - //retry loop - let _ = &self.processor_message_ready(¶ms).await; - - delay_controller.sleep().await; - } - } - - #[tracing::instrument(skip_all, fields(node_id, correlationId, is_published, error))] - async fn processor_message_ready(&self, params: &GetReady) { - loop { - let max_retry_count = 3; - let mut retry_count = 0; + let readied_by_column: Option = None; + let resp: Result, String> = self.data_store.ready_to_fire_db(¶m).await; + match resp { + Ok(ready_to_publish_rows) => { + if ready_to_publish_rows.is_empty() { + log::debug!("no rows ready to fire for dealine {}", deadline); + break; + } else { + let publish_futures = ready_to_publish_rows.into_iter().map(|row| self.prepare_to_publish(row)); - match &self.data_store.ready_to_fire_db(params).await { - Ok(publish_rows) => { - let rdy_to_pblsh_count = publish_rows.len(); - if rdy_to_pblsh_count > 0 { - let mut ids: Vec = Vec::with_capacity(rdy_to_pblsh_count); - let mut publish_futures = Vec::with_capacity(rdy_to_pblsh_count); - for row in publish_rows { - let updated_row = TableRow { - id: row.get("id"), - deadline: row.get("deadline"), - readied_at: row.get("readied_at"), - readied_by: row.get("readied_by"), - message_headers: row.get("message_headers"), - message_key: row.get("message_key"), - message_value: row.get("message_value"), - }; - let mut headers: HashMap = match serde_json::from_str(&updated_row.message_headers.to_string()) { - Ok(t) => t, - Err(_e) => { - println!("error occurred while parsing"); - HashMap::new() - } - }; - //TODO: handle empty headers - - let readied_by = updated_row.readied_by.to_string(); - tracing::Span::current().record("node_id", &readied_by); - headers.insert("readied_by".to_string(), readied_by); - - tracing::Span::current().record("correlationId", updated_row.id.to_string()); - - publish_futures.push(self.producer.kafka_publish( - updated_row.message_value.to_string(), - Some(headers), - updated_row.message_key.to_string(), - // updated_row.id.to_string(), - )) - } let results = futures::future::join_all(publish_futures).await; - for result in results { - match result { - Ok(m) => { - tracing::Span::current().record("is_published", "true"); - ids.push(m); - } - Err(e) => { - tracing::Span::current().record("is_published", "false"); - tracing::Span::current().record("error", &e.to_string()); - - log::error!("Error: delayed message publish failed {:?}", e); - break; - // failure detection needs to pick - } - } - } + + let ids: Vec = results.into_iter().map(Self::gather_ids).collect(); if !ids.is_empty() { - if let Err(outcome_error) = &self.data_store.delete_fired(&ids).await { - println!("Error: error occurred in message processor delete_fired {}", outcome_error); - //add retry logic here - } - // println!("delete ids {:?} and break", ids); + let _ = self.delete_fired_records_from_db(&ids).await; + log::debug!("number of rows published successfully and deleted from DB {}", ids.len()); break; } - log::debug!("number of rows published successfully and deleted from DB {}", ids.len()); - } else { - log::debug!("no rows ready to fire for dealine "); - break; } } Err(e) => { if e.contains("could not serialize access due to concurrent update") && retry_count < max_retry_count { //retry goes here - eprintln!("retrying"); + log::debug!("retrying"); retry_count += 1; if retry_count == max_retry_count { - log::error!( - "Error: max retry count {} reached by node {} for row ", - max_retry_count, - "node_id_option.unwrap()", - // row_id.unwrap() - ); + log::error!("Error: max retry count {} reached by node {:?} for row ", max_retry_count, readied_by_column); break; } - // &self.process_db_rows(¶ms).await; } log::error!("Error: error occurred in message processor while publishing {}", e); - break; } } } - - // let node_id_option: Option = node_id.clone().into(); - // let mut row_id: Option = None; } + pub async fn run(&self) { + log::info!("MessageProcessor ON!"); - #[tracing::instrument(skip_all, fields(correlationId))] - async fn clean_db(&self, ids: Vec) { - //rety in case delete fails - let max_retries = 3; - let mut retry_count = 0; + //Get UUID for the node that deployed this thread + let node_id = Self::assign_node_id(); + + log::info!("node_id {}", node_id); + let mut delay_controller = DelayController::new(100); loop { - if retry_count < max_retries { - match &self.data_store.delete_fired(&ids).await { - Ok(_) => { - tracing::Span::current().record("correlationId", ids.join(",")); - break; - } - Err(e) => { - println!("Error: error occurred in message processor delete_fired {}", e); - retry_count += 1; - continue; - } - } - } else { - log::error!("Error: max retry count {} reached by node {} for row ", max_retries, "node_id_option.unwrap()",); - break; - } + log::debug!("MessageProcessor loop"); + tokio::time::sleep(Duration::from_millis(10)).await; + self.processor_message_ready(node_id).await; + + delay_controller.sleep().await; } } } diff --git a/chronos_bin/src/message_receiver.rs b/chronos_bin/src/message_receiver.rs index 9d8caa1..93c0ea8 100644 --- a/chronos_bin/src/message_receiver.rs +++ b/chronos_bin/src/message_receiver.rs @@ -1,16 +1,13 @@ use chrono::{DateTime, Utc}; -use log::{debug, error, info, warn}; use serde_json::json; use tracing::instrument; use crate::kafka::consumer::KafkaConsumer; use crate::kafka::producer::KafkaProducer; use crate::postgres::pg::{Pg, TableInsertRow}; -use crate::utils::util::{get_message_key, get_payload_utf8, headers_check, required_headers, CHRONOS_ID, DEADLINE}; -use rdkafka::message::{BorrowedMessage, Message}; -use std::collections::HashMap; -use std::str::FromStr; -use std::sync::Arc; +use crate::utils::util::{get_message_key, get_payload_utf8, required_headers, CHRONOS_ID, DEADLINE}; +use rdkafka::message::BorrowedMessage; +use std::{collections::HashMap, str::FromStr, sync::Arc}; pub struct MessageReceiver { pub(crate) consumer: Arc, @@ -20,110 +17,102 @@ pub struct MessageReceiver { impl MessageReceiver { #[instrument(skip_all, fields(correlationId))] - pub async fn receiver_publish_to_kafka(&self, new_message: &BorrowedMessage<'_>, headers: HashMap) { - let string_payload = String::from_utf8_lossy(get_payload_utf8(new_message)).to_string(); - let message_key = get_message_key(new_message); - tracing::Span::current().record("correlationId", &message_key); - let outcome = &self.producer.kafka_publish(string_payload, Some(headers), message_key.to_string()).await; - match outcome { - Ok(_) => { - debug!("Published message to Kafka {}", &message_key); - } - Err(e) => { - error!("Failed to publish message to Kafka: {:?}", e); - // TODO check if needs to retry publishing + async fn insert_into_db( + &self, + new_message: &BorrowedMessage<'_>, + reqd_headers: HashMap, + message_deadline: DateTime, + ) -> Option { + let max_retry_count = 3; + let mut retry_count = 0; + //retry loop + loop { + if let Some(payload) = get_payload_utf8(new_message) { + if let Ok(message_value) = &serde_json::from_slice(payload) { + if let Some(message_key) = get_message_key(new_message) { + let params = TableInsertRow { + id: &reqd_headers[CHRONOS_ID], + deadline: message_deadline, + message_headers: &json!(&reqd_headers), + message_key: message_key.as_str(), + message_value, + }; + + if let Err(e) = self.data_store.insert_to_delay_db(¶ms).await { + log::error!("insert to delay failed {}", e); + retry_count += 1; + if retry_count == max_retry_count { + return Some("max retry count reached for insert to delay query".to_string()); + } + continue; + } + tracing::Span::current().record("correlationId", &message_key); + } + + log::debug!("Message publish success {:?}", new_message); + return None; + } else { + return Some("json conversion of payload failed".to_string()); + } + } else { + return Some("message payload is not utf8 encoded".to_string()); } } } #[instrument(skip_all, fields(correlationId))] - pub async fn receiver_insert_to_db(&self, new_message: &BorrowedMessage<'_>, headers: HashMap, deadline: DateTime) { - let result_value = &serde_json::from_slice(get_payload_utf8(new_message)); - let payload = match result_value { - Ok(payload) => payload, - Err(e) => { - error!("de-ser failed for payload: {:?}", e); - return; - } - }; - - let message_key = get_message_key(new_message); - tracing::Span::current().record("correlationId", &message_key); - - let params = TableInsertRow { - id: &headers[CHRONOS_ID], - deadline, - message_headers: &json!(&headers), - message_key: message_key.as_str(), - message_value: payload, - }; - let _insert_time = Instant::now(); - - //retry - let total_retry_count = 3; - let mut retry_count = 0; - loop { - match self.data_store.insert_to_delay_db(¶ms).await { - Ok(_) => { - break; - } - Err(e) => { - error!("insert_to_delay failed: {:?} retrying again", e); - retry_count += 1; - if retry_count == total_retry_count { - error!("max retry count {} exceeded aborting insert_to_db for {}", total_retry_count, message_key); - break; + async fn prepare_and_publish(&self, message: &BorrowedMessage<'_>, reqd_headers: HashMap) -> Option { + match get_payload_utf8(message) { + Some(string_payload) => { + if let Some(message_key) = get_message_key(message) { + let string_payload = String::from_utf8_lossy(string_payload).to_string(); + tracing::Span::current().record("correlationId", &message_key); + if let Err(e) = &self.producer.kafka_publish(string_payload, Some(reqd_headers.clone()), message_key).await { + return Some(format!("publish failed for received message {:?} with error :: {}", message, e)); } + } else { + return Some("message key not found".to_string()); } } - } + None => return None, + }; + None } - #[tracing::instrument(name = "receiver_handle_message", skip_all, fields(correlationId))] + #[tracing::instrument(name = "receiver_handle_message", skip_all, fields(correlationId, error))] pub async fn handle_message(&self, message: &BorrowedMessage<'_>) { - if headers_check(message.headers().unwrap()) { - let new_message = &message; - - if let Some(headers) = required_headers(new_message) { - tracing::Span::current().record("correlationId", &headers[CHRONOS_ID]); - let message_deadline: DateTime = match DateTime::::from_str(&headers[DEADLINE]) { - Ok(d) => d, - Err(e) => { - error!("failed to parse deadline: {}", e); - return; - } - }; - + let new_message = &message; + if let Some(reqd_headers) = required_headers(new_message) { + tracing::Span::current().record("correlationId", &reqd_headers[CHRONOS_ID]); + if let Ok(message_deadline) = DateTime::::from_str(&reqd_headers[DEADLINE]) { if message_deadline <= Utc::now() { - debug!("message deadline is in the past, sending directly to out_topic"); - // direct_sent_count += 1; - self.receiver_publish_to_kafka(new_message, headers).await - } else { - debug!("message deadline is in the future, sending to kafka"); - // db_insert_count += 1; - - self.receiver_insert_to_db(new_message, headers, message_deadline).await - // println!("insert took: {:?}", insert_time.elapsed()) + if let Some(err) = self.prepare_and_publish(new_message, reqd_headers).await { + log::error!("{}", err); + tracing::Span::current().record("error", &err); + } + } else if let Some(err_string) = self.insert_into_db(new_message, reqd_headers, message_deadline).await { + log::error!("{}", err_string); + tracing::Span::current().record("error", &err_string); } - } else { - warn!("message with improper headers on inbox.topic "); } - } else { - warn!("message with improper headers on inbox.topic "); } - // println!("{direct_sent_count} messages sent directly and {db_insert_count} added to db from total of {total_count} "); } pub async fn run(&self) { - info!("MessageReceiver ON!"); + log::info!("MessageReceiver ON!"); let _ = &self.consumer.subscribe().await; - // let mut total_count = 0; - // let mut direct_sent_count = 0; - // let mut db_insert_count = 0; loop { - if let Ok(message) = &self.consumer.kafka_consume_message().await { - self.handle_message(message).await; + match &self.consumer.kafka_consume_message().await { + Ok(message) => { + self.handle_message(message).await; + } + Err(e) => { + log::error!("error while consuming message {:?}", e); + } } + // if let Ok(message) = &self.consumer.kafka_consume_message().await { + // self.handle_message(message).await; + // } } } } diff --git a/chronos_bin/src/monitor.rs b/chronos_bin/src/monitor.rs index 474451e..aaaffd3 100644 --- a/chronos_bin/src/monitor.rs +++ b/chronos_bin/src/monitor.rs @@ -9,40 +9,42 @@ pub struct FailureDetector { pub(crate) data_store: Arc, } -//Needs to accept the poll time impl FailureDetector { - // #[instrument] pub async fn run(&self) { log::info!("Monitoring On!"); loop { - // TODO multiple rows are fetched, what to track in the monitor? - let _ = tokio::time::sleep(Duration::from_secs(ChronosConfig::from_env().monitor_db_poll)).await; - let _ = &self.monitor_failed().await; + let _ = &self.monitor_failed_fire_records().await; + } + } + + #[tracing::instrument(skip_all, fields(error))] + async fn reset_to_init_db(&self, fetched_rows: &std::vec::Vec) { + if !fetched_rows.is_empty() { + if let Err(e) = &self.data_store.reset_to_init_db(fetched_rows).await { + tracing::Span::current().record("error", e); + log::error!("error in monitor reset_to_init {}", e); + } else { + log::debug!("reset_to_init_db success for {:?}", fetched_rows) + } } } - #[tracing::instrument(skip_all, fields(message_key, error, monitoring_len))] - async fn monitor_failed(&self) { + + #[tracing::instrument(skip_all, fields(error, fail_to_fire_rows))] + async fn monitor_failed_fire_records(&self) { match &self .data_store .failed_to_fire_db(&(Utc::now() - Duration::from_secs(ChronosConfig::from_env().fail_detect_interval))) .await { Ok(fetched_rows) => { - if !fetched_rows.is_empty() { - if let Err(e) = &self.data_store.reset_to_init_db(fetched_rows).await { - tracing::Span::current().record("error", e); - println!("error in monitor reset_to_init {}", e); - } - tracing::Span::current().record("monitoring_len", fetched_rows.len()); - // TODO Need to monitor the node that redied but never fired - } else { - tracing::Span::current().record("monitoring_len", "empty"); - } + tracing::Span::current().record("fail_to_fire_rows", fetched_rows.len()); + self.reset_to_init_db(fetched_rows).await; } Err(e) => { - println!("error in monitor {}", e); + log::error!("error in monitor {}", e); + tracing::Span::current().record("error", e.to_string()); } } } diff --git a/chronos_bin/src/postgres/pg.rs b/chronos_bin/src/postgres/pg.rs index aa50e50..34318d0 100644 --- a/chronos_bin/src/postgres/pg.rs +++ b/chronos_bin/src/postgres/pg.rs @@ -1,6 +1,6 @@ use chrono::{DateTime, Utc}; use deadpool_postgres::{Config, GenericClient, ManagerConfig, Object, Pool, PoolConfig, Runtime, Transaction}; -use log::{error, info}; +use log::error; use std::time::{Duration, Instant}; use tokio_postgres::error::SqlState; use tokio_postgres::types::ToSql; @@ -179,7 +179,7 @@ impl Pg { log::warn!("insert_to_delay query_execute_instant: {:?} ", time_elapsed); } - if outcome.is_ok() { + if outcome > 0 { event!(tracing::Level::INFO, "insert_to_delay success"); let cmt_rdy = pg_txn.txn.commit().await; if let Err(e) = cmt_rdy { @@ -274,40 +274,45 @@ impl Pg { let ready_query = "UPDATE hanger SET readied_at = $1, readied_by = $2 where deadline < $3 AND readied_at IS NULL RETURNING id, deadline, readied_at, readied_by, message_headers, message_key, message_value"; - if let Ok(stmt) = pg_txn.txn.prepare(ready_query).await { - let query_execute_instant = Instant::now(); - let response = pg_txn.txn.query(&stmt, &[¶m.readied_at, ¶m.readied_by, ¶m.deadline]).await; - - match response { - Ok(resp) => { - let cmt_rdy = pg_txn.txn.commit().await; - if let Err(e) = cmt_rdy { - error!("Unable to commit: {}. The original transaction updated: {:?} rows", e, resp); - return Err(format!( - "ready_to_fire: Unable to commit: {}. The original transaction updated: {:?} rows", - e, resp - )); - } - let time_elapsed = query_execute_instant.elapsed(); - if time_elapsed > Duration::from_millis(100) { - log::warn!(" ready_to_fire query_execute_instant: {:?} params: {:?}", time_elapsed, param); - } - Ok(resp) + // println!("ready_query: {}", ready_query); + // if let Ok(stmt) = pg_txn.txn.prepare(ready_query).await { + let stmt = match pg_txn.txn.prepare(ready_query).await { + Ok(stmt) => stmt, + Err(e) => { + error!("ready_to_fire: Unable to prepare query: {}", e); + return Err(format!("ready_to_fire: Unable to prepare query: {}", e)); + } + }; + + let query_execute_instant = Instant::now(); + let response = pg_txn.txn.query(&stmt, &[¶m.readied_at, ¶m.readied_by, ¶m.deadline]).await; + + match response { + Ok(resp) => { + let cmt_rdy = pg_txn.txn.commit().await; + if let Err(e) = cmt_rdy { + error!("Unable to commit: {}. The original transaction updated: {:?} rows", e, resp); + return Err(format!( + "ready_to_fire: Unable to commit: {}. The original transaction updated: {:?} rows", + e, resp + )); } - Err(e) => { - if let Some(err_code) = e.code() { - if err_code == &SqlState::T_R_SERIALIZATION_FAILURE { - error!("ready_to_fire: Unable to execute txn due to : {}", e); - return Err(format!("ready_to_fire: Unable to execute txn due to : {}", e)); - } + let time_elapsed = query_execute_instant.elapsed(); + if time_elapsed > Duration::from_millis(100) { + log::warn!(" ready_to_fire query_execute_instant: {:?} params: {:?}", time_elapsed, param); + } + Ok(resp) + } + Err(e) => { + if let Some(err_code) = e.code() { + if err_code == &SqlState::T_R_SERIALIZATION_FAILURE { + error!("ready_to_fire: Unable to execute txn due to : {}", e); + return Err(format!("ready_to_fire: Unable to execute txn due to : {}", e)); } - error!("ready_to_fire: Unknow exception {:?}", e); - Err(format!("ready_to_fire: Unknow exception {:?}", e)) } + error!("ready_to_fire: Unknow exception {:?}", e); + Err(format!("ready_to_fire: Unknow exception {:?}", e)) } - } else { - error!("ready_to_fire: Unable to prepare query"); - Err("ready_to_fire: Unable to prepare query".to_string()) } } @@ -318,7 +323,7 @@ impl Pg { let mut pg_access = PgAccess { client: pg_client }; let pg_txn: PgTxn = pg_access.build_txn().await?; - log::info!("failed_to_fire delay_time: {:?}", delay_time); + log::debug!("failed_to_fire param delay_time: {:?}", delay_time); let get_query = "SELECT * from hanger where readied_at > $1 ORDER BY deadline DESC"; let stmt = pg_txn.txn.prepare(get_query).await?; @@ -370,6 +375,8 @@ impl Pg { }; query += ")"; + println!("query: {}", query); + let stmt = match pg_txn.txn.prepare(query.as_str()).await { Ok(stmt) => stmt, Err(e) => { diff --git a/chronos_bin/src/runner.rs b/chronos_bin/src/runner.rs index c74ae62..563382a 100644 --- a/chronos_bin/src/runner.rs +++ b/chronos_bin/src/runner.rs @@ -4,8 +4,7 @@ use crate::message_processor::MessageProcessor; use crate::message_receiver::MessageReceiver; use crate::monitor::FailureDetector; use crate::postgres::pg::Pg; -use log::debug; -use std::fs::{create_dir, read, remove_file, write}; +use std::fs::{create_dir, read, write}; use std::sync::Arc; pub struct Runner { @@ -47,27 +46,29 @@ impl Runner { }); // check if healthcheck file exists in healthcheck dir - let healthcheck_file = "healthcheck/chronos_healthcheck"; - let healthcheck_file_exists = read(healthcheck_file).is_ok(); + let healthcheck_file = std::env::var("HEALTHCHECK_FILE").unwrap_or_else(|_| "healthcheck/chronos_healthcheck".to_string()); + let healthcheck_file_exists = read(&healthcheck_file).is_ok(); if healthcheck_file_exists { log::info!("healthcheck file exists"); - let write_resp = write(healthcheck_file, b"1"); + let write_resp = write(&healthcheck_file, b"1"); if write_resp.is_err() { log::error!("error while writing to healthcheck file {:?}", write_resp); } } else if create_dir("healthcheck").is_ok() { - let write_resp = write(healthcheck_file, b"1"); - if write_resp.is_err() { - log::error!("error while writing to healthcheck file {:?}", write_resp); - } - } - let future_tuple = futures::future::try_join3(monitor_handler, message_processor_handler, message_receiver_handler).await; - if future_tuple.is_err() { - log::error!("Chronos Stopping all threads {:?}", future_tuple); - let write_resp = write(healthcheck_file, b"0"); + let write_resp = write(&healthcheck_file, b"1"); if write_resp.is_err() { log::error!("error while writing to healthcheck file {:?}", write_resp); } } + message_receiver_handler.await.unwrap(); + // futures::future::join(monitor_handler); + // let future_tuple = futures::future::try_join3(monitor_handler, message_processor_handler, message_receiver_handler).await; + // if future_tuple.is_err() { + // log::error!("Chronos Stopping all threads {:?}", future_tuple); + // let write_resp = write(&healthcheck_file, b"0"); + // if write_resp.is_err() { + // log::error!("error while writing to healthcheck file {:?}", write_resp); + // } + // } } } diff --git a/chronos_bin/src/utils/util.rs b/chronos_bin/src/utils/util.rs index 7ef0150..b9375ae 100644 --- a/chronos_bin/src/utils/util.rs +++ b/chronos_bin/src/utils/util.rs @@ -5,23 +5,24 @@ use std::collections::HashMap; pub static CHRONOS_ID: &str = "chronosMessageId"; pub static DEADLINE: &str = "chronosDeadline"; -//TODO check correctness for two headers in this method pub fn required_headers(message: &BorrowedMessage) -> Option> { if let Some(headers) = message.headers() { - let reqd_headers = headers.iter().fold(HashMap::::new(), |mut acc, header| { - let key: String = match header.key.parse() { - Ok(key) => key, - Err(e) => { - log::error!("Error parsing header key: {}", e); - return acc; + if headers_check(headers) { + let reqd_headers = headers.iter().fold(HashMap::::new(), |mut acc, header| { + if let Ok(key) = header.key.parse() { + if let Some(value) = header.value { + let value: String = String::from_utf8_lossy(value).into_owned(); + acc.insert(key, value); + acc + } else { + acc + } + } else { + acc } - }; - let value: String = String::from_utf8_lossy(header.value.expect("utf8 parsing for header value failed")).into_owned(); - - acc.insert(key, value); - acc - }); - return Some(reqd_headers); + }); + return Some(reqd_headers); + } } None } @@ -35,38 +36,24 @@ pub fn into_headers(headers: &HashMap) -> OwnedHeaders { } pub fn headers_check(headers: &BorrowedHeaders) -> bool { + // println!("headers_check {:?}", headers); let outcome = headers .iter() .filter(|h| { let header_keys = [CHRONOS_ID, DEADLINE]; header_keys.contains(&h.key) && h.value.is_some() }) - .count() - == 2; + .count(); - outcome + outcome == 2 } pub fn get_payload_utf8<'a>(message: &'a BorrowedMessage) -> Option<&'a [u8]> { message.payload() } -pub fn get_message_key(message: &BorrowedMessage) -> String { - let key = String::from_utf8_lossy(message.key().expect("No key found for message")).to_string(); - key -} - -pub fn get_chronos_id(headers: &BorrowedHeaders) -> String { - let value = headers - .iter() - .find(|h| { - let header_keys = [CHRONOS_ID]; - header_keys.contains(&h.key) && h.value.is_some() - }) - .expect("No chronosId found for message") - .value - .expect("No chronosId found for message"); - - String::from_utf8_lossy(value).into_owned() - // return value; +pub fn get_message_key(message: &BorrowedMessage) -> Option { + message.key().map(|key| String::from_utf8_lossy(key).to_string()) + // let key = String::from_utf8_lossy(.expect("No key found for message")).to_string(); + // key } diff --git a/docker-compose.yml b/docker-compose.yml index 39f2789..0f4f21f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,18 +2,18 @@ version: '3.1' services: #postgres DB - postgres: - image: postgres:13.3 - ports: - - 5432:5432 - volumes: - - postgres:/var/lib/postgresql/data/ - environment: - POSTGRES_USER: admin - POSTGRES_PASSWORD: admin - POSTGRES_DB: chronos_db - networks: - - chronos + # postgres: + # image: postgres:13.3 + # ports: + # - 5432:5432 + # volumes: + # - postgres:/var/lib/postgresql/data/ + # environment: + # POSTGRES_USER: admin + # POSTGRES_PASSWORD: admin + # POSTGRES_DB: chronos_db + # networks: + # - chronos # migration / init container # chronos-pg-mig: # image: mig @@ -33,59 +33,59 @@ services: # depends_on: # - postgres - zookeeper: - image: bitnami/zookeeper:3.7.0 - ports: - - 2180:2181 - volumes: - - zookeeper:/bitnami/zookeeper - environment: - ALLOW_ANONYMOUS_LOGIN: "yes" - networks: - - chronos + # zookeeper: + # image: bitnami/zookeeper:3.7.0 + # ports: + # - 2180:2181 + # volumes: + # - zookeeper:/bitnami/zookeeper + # environment: + # ALLOW_ANONYMOUS_LOGIN: "yes" + # networks: + # - chronos - kafka: - image: bitnami/kafka:2.8.0 - ports: - - 9092:9092 - - 9093:9093 - - 9094:9094 - volumes: - - kafka:/bitnami/kafka - - ./infra:/opt/infra - environment: - KAFKA_BROKER_ID: "1" - KAFKA_CFG_LISTENERS: "INTERNAL://:9092, EXTERNAL://:9093, K8S://:9094" - KAFKA_CFG_ADVERTISED_LISTENERS: "INTERNAL://kafka:9092, EXTERNAL://localhost:9093" - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT, EXTERNAL:PLAINTEXT, K8S:PLAINTEXT" - KAFKA_CFG_ZOOKEEPER_CONNECT: "zookeeper:2181" - KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL" - KAFKA_INTER_BROKER_USER: "admin" - KAFKA_INTER_BROKER_PASSWORD: "admin-secret" - KAFKA_CFG_NUM_PARTITIONS: "1" - KAFKA_LOG_RETENTION_BYTES: -1 - KAFKA_LOG_RETENTION_MS: -1 - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true" - KAFKA_CFG_SUPER_USERS: "User:admin" - KAFKA_CLIENT_USERS: "admin,kafdrop" - KAFKA_CLIENT_PASSWORDS: "admin-secret,admin-secret" - ALLOW_PLAINTEXT_LISTENER: "yes" - networks: - - chronos - depends_on: - - zookeeper + # kafka: + # image: bitnami/kafka:2.8.0 + # ports: + # - 9092:9092 + # - 9093:9093 + # - 9094:9094 + # volumes: + # - kafka:/bitnami/kafka + # - ./infra:/opt/infra + # environment: + # KAFKA_BROKER_ID: "1" + # KAFKA_CFG_LISTENERS: "INTERNAL://:9092, EXTERNAL://:9093, K8S://:9094" + # KAFKA_CFG_ADVERTISED_LISTENERS: "INTERNAL://kafka:9092, EXTERNAL://localhost:9093" + # KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT, EXTERNAL:PLAINTEXT, K8S:PLAINTEXT" + # KAFKA_CFG_ZOOKEEPER_CONNECT: "zookeeper:2181" + # KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL" + # KAFKA_INTER_BROKER_USER: "admin" + # KAFKA_INTER_BROKER_PASSWORD: "admin-secret" + # KAFKA_CFG_NUM_PARTITIONS: "1" + # KAFKA_LOG_RETENTION_BYTES: -1 + # KAFKA_LOG_RETENTION_MS: -1 + # KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true" + # KAFKA_CFG_SUPER_USERS: "User:admin" + # KAFKA_CLIENT_USERS: "admin,kafdrop" + # KAFKA_CLIENT_PASSWORDS: "admin-secret,admin-secret" + # ALLOW_PLAINTEXT_LISTENER: "yes" + # networks: + # - chronos + # depends_on: + # - zookeeper - kowl: - image: quay.io/cloudhut/kowl:master - ports: - - 9091:8080 - environment: - KAFKA_BROKERS: "kafka:9092" - networks: - - chronos - depends_on: - - kafka - - zookeeper + # kowl: + # image: quay.io/cloudhut/kowl:master + # ports: + # - 9091:8080 + # environment: + # KAFKA_BROKERS: "kafka:9092" + # networks: + # - chronos + # depends_on: + # - kafka + # - zookeeper # chronos # chronos-delay-scheduler: diff --git a/infra/otelcol-config.yml b/infra/otelcol-config.yml index bf3193e..41af551 100644 --- a/infra/otelcol-config.yml +++ b/infra/otelcol-config.yml @@ -41,11 +41,11 @@ extensions: health_check: pprof: endpoint: :1888 - zpages: - endpoint: :55679 + # zpages: + # endpoint: :55679 service: - extensions: [pprof, zpages, health_check] + extensions: [pprof, health_check] pipelines: traces: receivers: [otlp]