Skip to content

Commit

Permalink
o tel examples and chronos example
Browse files Browse the repository at this point in the history
  • Loading branch information
Amninder Kaur committed Sep 26, 2023
1 parent 77d02db commit be60ac0
Show file tree
Hide file tree
Showing 17 changed files with 880 additions and 112 deletions.
288 changes: 285 additions & 3 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,11 @@ clippy = "0.0.302"
rand = "0.8.5"
serial_test = "2.0.0"

#tracing
tracing = "0.1"
tracing-subscriber = "0.3"
tracing-opentelemetry = "0.21.0"
opentelemetry = { version = "0.20.0", features = ["rt-tokio", "trace"]}
opentelemetry-jaeger = {version="0.19.0", features=["rt-tokio"]}
opentelemetry-stdout = { version = "0.1.0", features = ["trace"] }

7 changes: 7 additions & 0 deletions chronos_bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ uuid = { version="1.3.0", features = [
rand.workspace = true
openssl = "0.10.57"

#tracing
tracing.workspace = true
tracing-subscriber.workspace = true
tracing-opentelemetry.workspace = true
opentelemetry.workspace = true
opentelemetry-jaeger.workspace = true

[dev-dependencies]
serial_test.workspace = true

Expand Down
16 changes: 14 additions & 2 deletions chronos_bin/src/kafka/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use rdkafka::message::BorrowedMessage;

use super::config::KafkaConfig;

use tracing::{info_span, instrument, span, trace, warn};

// Kafka Consumer Client
// #[derive(Debug, Clone)]
pub struct KafkaConsumer {
pub consumer: StreamConsumer,
pub topic: String,
Expand All @@ -18,8 +19,15 @@ impl KafkaConsumer {
pub fn new(config: &KafkaConfig) -> Self {
// let consumer = config.build_consumer_config().create().expect("Failed to create consumer");
let consumer = match config.build_consumer_config().create() {
Ok(consumer) => consumer,
Ok(consumer) => {
//TODO add more information to the trace, which broker, which topic, etc
trace!("connected to kafka");
consumer
}
Err(e) => {
// Add error trace to the span
warn!("error creating consumer {:?}", e);

log::error!("error creating consumer {:?}", e);
//retry
log::info!("retrying in 5 seconds");
Expand All @@ -46,6 +54,8 @@ impl KafkaConsumer {
}

pub(crate) async fn subscribe(&self) {
let consumer_span = info_span!("consumer_subscribe");
let _ = consumer_span.enter();
match &self.consumer.subscribe(&[&self.topic]) {
Ok(_) => {
info!("subscribed to topic {}", &self.topic);
Expand All @@ -57,6 +67,8 @@ impl KafkaConsumer {
};
}
pub(crate) async fn consume_message(&self) -> Result<BorrowedMessage, KafkaAdapterError> {
let consumer_span = info_span!("consume_message");
let _ = consumer_span.enter();
self.consumer.recv().await.map_err(KafkaAdapterError::ReceiveMessage)
}
}
21 changes: 12 additions & 9 deletions chronos_bin/src/kafka/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use rdkafka::producer::{FutureProducer, FutureRecord};

use super::config::KafkaConfig;

use tracing::{instrument, span, trace, warn, Level};

// Kafka Producer
// #[derive(Clone)]
pub struct KafkaProducer {
Expand All @@ -16,19 +18,20 @@ pub struct KafkaProducer {

impl KafkaProducer {
pub fn new(config: &KafkaConfig) -> Self {
// Kafka Producer
let producer = config.build_producer_config().create().expect("Failed to create producer");
let topic = config.out_topic.to_owned();

Self { producer, topic }
}
pub(crate) async fn publish(
&self,
message: String,
headers: Option<HashMap<String, String>>,
key: String,
id: String,
) -> Result<String, KafkaAdapterError> {
let o_header = into_headers(&headers.unwrap());
pub(crate) async fn publish(&self, message: String, headers: Option<HashMap<String, String>>, key: String) -> Result<String, KafkaAdapterError> {
// Span for kafka publish
let producer_span = span!(Level::INFO, "publish_span");
let _ = producer_span.enter();

let unwrap_header = &headers.unwrap();

let o_header = into_headers(unwrap_header);
// println!("headers {:?}", o_header);
// println!("headers {:?} headers--{:?}", &headers["chronosId)"].to_string(), &headers["chronosDeadline)"].to_string());

Expand All @@ -43,6 +46,6 @@ impl KafkaProducer {
)
.await
.map_err(|(kafka_error, _record)| KafkaAdapterError::PublishMessage(kafka_error, "message publishing failed".to_string()))?;
Ok(id)
Ok(unwrap_header["chronosId"].to_string())
}
}
34 changes: 22 additions & 12 deletions chronos_bin/src/message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use chrono::Utc;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tracing::{event, field, info_span, span, trace, Level};
use uuid::Uuid;

pub struct MessageProcessor {
Expand All @@ -15,15 +16,16 @@ pub struct MessageProcessor {

impl MessageProcessor {
pub async fn run(&self) {
log::info!("MessageProcessor ON!");
// log::info!("MessageProcessor ON!");
event!(tracing::Level::INFO, "Chronos Processor On!");

//Get UUID for the node that deployed this thread
let node_id: String = std::env::var("NODE_ID").unwrap_or_else(|_| uuid::Uuid::new_v4().to_string());
// log::info!("node_id {}", node_id);

let mut delay_controller = DelayController::new(100);
loop {
tokio::time::sleep(Duration::from_millis(10)).await;
// tokio::time::sleep(Duration::from_millis(ChronosConfig::from_env().db_poll_interval)).await;
tokio::time::sleep(Duration::from_millis(ChronosConfig::from_env().processor_db_poll)).await;

let deadline = Utc::now() - Duration::from_secs(ChronosConfig::from_env().time_advance);

let param = GetReady {
Expand All @@ -40,12 +42,16 @@ impl MessageProcessor {
let max_retry_count = 3;
let mut retry_count = 0;

let mut node_id: Option<String> = None;
let node_id_option: Option<String> = node_id.clone().into();
// let mut row_id: Option<String> = None;
let monitor_span = info_span!("processor_picked", node_id = field::Empty, errors = field::Empty);
let _ = monitor_span.enter();
match &self.data_store.ready_to_fire(&param).await {
Ok(publish_rows) => {
let rdy_to_pblsh_count = publish_rows.len();
if rdy_to_pblsh_count > 0 {
monitor_span.record("node_id", &node_id);
trace!("ready_to_publish_count {}", rdy_to_pblsh_count);
let mut ids: Vec<String> = Vec::with_capacity(rdy_to_pblsh_count);
let mut publish_futures = Vec::with_capacity(rdy_to_pblsh_count);
for row in publish_rows {
Expand All @@ -67,27 +73,31 @@ impl MessageProcessor {
}
};
//TODO: handle empty headers
// println!("checking {:?}",headers);

node_id = Some(updated_row.readied_by.to_string());
// row_id = Some(updated_row.id.to_string());
let readied_by = updated_row.readied_by.to_string();

headers.insert("readied_by".to_string(), node_id.unwrap());
headers.insert("readied_by".to_string(), readied_by);

publish_futures.push(self.producer.publish(
updated_row.message_value.to_string(),
Some(headers),
updated_row.message_key.to_string(),
updated_row.id.to_string(),
// updated_row.id.to_string(),
))
}
let publish_kafka = info_span!("publish_kafka", node_id = &node_id, errors = field::Empty, published = field::Empty);
let _ = publish_kafka.enter();
let results = futures::future::join_all(publish_futures).await;
for result in results {
match result {
Ok(m) => {
publish_kafka.record("published", "success");
ids.push(m);
}
Err(e) => {
publish_kafka.record("published", "failure");
publish_kafka.record("error", &e.to_string());

log::error!("Error: delayed message publish failed {:?}", e);
break;
// failure detection needs to pick
Expand All @@ -100,7 +110,7 @@ impl MessageProcessor {
println!("Error: error occurred in message processor delete_fired {}", outcome_error);
//add retry logic here
}
println!("delete ids {:?} and break", ids);
// println!("delete ids {:?} and break", ids);
break;
}
log::debug!("number of rows published successfully and deleted from DB {}", ids.len());
Expand All @@ -118,7 +128,7 @@ impl MessageProcessor {
log::error!(
"Error: max retry count {} reached by node {} for row ",
max_retry_count,
node_id.unwrap(),
node_id_option.unwrap(),
// row_id.unwrap()
);
break;
Expand Down
18 changes: 16 additions & 2 deletions chronos_bin/src/message_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use rdkafka::message::Message;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Instant;
use tracing::{event, field, info_span, trace_span};
use tracing_subscriber::fmt::FormatFields;

pub struct MessageReceiver {
pub(crate) consumer: Arc<KafkaConsumer>,
Expand All @@ -25,31 +27,42 @@ impl MessageReceiver {
// data_store,
// }
// }

// #[tracing::instrument]
pub async fn run(&self) {
println!("Receiver ON!");
event!(tracing::Level::INFO, "Chronos Receiver On!");
let _ = &self.consumer.subscribe().await;
// for _n in 0..100 {
let mut total_count = 0;
let mut direct_sent_count = 0;
let mut db_insert_count = 0;
loop {
if let Ok(message) = &self.consumer.consume_message().await {
let receiver_span = info_span!(
"message_received",
errors = field::Empty,
message_key = get_message_key(message),
flow = field::Empty
);
let _ = receiver_span.enter();
total_count += 1;
if headers_check(message.headers().unwrap()) {
let new_message = &message;
let headers = required_headers(new_message).expect("parsing headers failed");
let message_deadline: DateTime<Utc> = DateTime::<Utc>::from_str(&headers[DEADLINE]).expect("String date parsing failed");

if message_deadline <= Utc::now() {
receiver_span.record("flow", "deadline passed, publish message!");
direct_sent_count += 1;
let string_payload = String::from_utf8_lossy(get_payload_utf8(new_message)).to_string();
let message_key = get_message_key(new_message);
let _outcome = &self
.producer
.publish(string_payload, Some(headers), message_key, "same id".to_string())
.publish(string_payload, Some(headers), message_key)
.await
.expect("Publish failed for received message");
} else {
receiver_span.record("flow", "deadline not passed, insert to db!");
db_insert_count += 1;
let chronos_message_id = &headers[CHRONOS_ID];

Expand All @@ -70,6 +83,7 @@ impl MessageReceiver {
}
} else {
warn!("message with improper headers on inbox.topic ");
receiver_span.record("flow", "improper headers, ignore!");
//TODO: ignore
}
// println!("{direct_sent_count} messages sent directly and {db_insert_count} added to db from total of {total_count} ");
Expand Down
15 changes: 14 additions & 1 deletion chronos_bin/src/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,27 @@ use crate::utils::config::ChronosConfig;
use chrono::Utc;
use std::sync::Arc;
use std::time::Duration;
use tracing::{error_span, event, field, info_span, instrument, span, trace, Level};

#[derive(Debug)]
pub struct FailureDetector {
pub(crate) data_store: Arc<Pg>,
}

//Needs to accept the poll time
impl FailureDetector {
// #[instrument]
pub async fn run(&self) {
println!("Monitoring On!");
// println!("Monitoring On!");
trace!("Monitoring On!");
loop {
// TODO multiple rows are fetched, what to track in the monitor?
let monitor_span = info_span!("failure_detector", records_len = field::Empty, exception = field::Empty);
let _ = monitor_span.enter();

let _ = tokio::time::sleep(Duration::from_secs(ChronosConfig::from_env().monitor_db_poll)).await; // sleep for 10sec

trace!("failed_to_fire On!");
match &self
.data_store
.failed_to_fire(&(Utc::now() - Duration::from_secs(ChronosConfig::from_env().fail_detect_interval)))
Expand All @@ -23,8 +32,12 @@ impl FailureDetector {
Ok(fetched_rows) => {
if !fetched_rows.is_empty() {
if let Err(e) = &self.data_store.reset_to_init(fetched_rows).await {
monitor_span.record("exception", e);
println!("error in monitor reset_to_init {}", e);
}
monitor_span.record("records_len", fetched_rows.len());
} else {
monitor_span.record("records_len", "empty");
}
}
Err(e) => {
Expand Down
Loading

0 comments on commit be60ac0

Please sign in to comment.