diff --git a/Cargo.lock b/Cargo.lock index e7c709f..8706b29 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -177,6 +177,12 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" +[[package]] +name = "cargo-husky" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b02b629252fe8ef6460461409564e2c21d0c8e77e0944f3d189ff06c4e932ad" + [[package]] name = "cast" version = "0.3.0" @@ -217,6 +223,7 @@ dependencies = [ name = "chronos" version = "0.1.0" dependencies = [ + "cargo-husky", "criterion", ] diff --git a/Cargo.toml b/Cargo.toml index d9b5c09..a07c5ac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,3 +55,8 @@ criterion = "0.4.0" name="consumer_bench" harness=false +[dev-dependencies.cargo-husky] +version = "1" +default-features = false # Disable features which are enabled by default +features = ["user-hooks"] + diff --git a/benches/consumer_bench.rs b/benches/consumer_bench.rs index a794765..b34ba29 100644 --- a/benches/consumer_bench.rs +++ b/benches/consumer_bench.rs @@ -1,5 +1,5 @@ #![allow(unused)] -use chronos_bin::runner::Runner; +// use chronos_bin::runner::Runner; // use criterion::{black_box, criterion_group, criterion_main, Criterion}; // // use hello_cargo::fibonacci; // @@ -14,8 +14,8 @@ use chronos_bin::runner::Runner; use criterion::*; async fn my_function() { - let runner = Runner {}; - runner.run().await; + // let runner = Runner {}; + // runner.run().await; } fn bench(c: &mut Criterion) { @@ -23,8 +23,8 @@ fn bench(c: &mut Criterion) { // Configure Criterion.rs to detect smaller differences and increase sample size to improve // precision and counteract the resulting noise. group.significance_level(0.1).sample_size(500); - group.bench_function("my-function", |b| b.iter(|| my_function())); - group.finish(); + // group.bench_function("my-function", |b| b.iter(|| my_function())); + // group.finish(); } criterion_group!(benches, bench); diff --git a/chronos_bin/src/core.rs b/chronos_bin/src/core.rs index 7cb4bb4..26f7c43 100644 --- a/chronos_bin/src/core.rs +++ b/chronos_bin/src/core.rs @@ -37,22 +37,10 @@ pub trait MessageProducer { #[async_trait] pub trait DataStore { - async fn insert( - &self, - message: ChronosDeliveryMessage, - ) -> Result; - async fn delete( - &self, - message: ChronosDeliveryMessage, - ) -> Result; - async fn move_to_initial_state( - &self, - message: ChronosDeliveryMessage, - ) -> Result; - async fn move_to_ready_state( - &self, - message: ChronosDeliveryMessage, - ) -> Result; + async fn insert(&self, message: ChronosDeliveryMessage) -> Result; + async fn delete(&self, message: ChronosDeliveryMessage) -> Result; + async fn move_to_initial_state(&self, message: ChronosDeliveryMessage) -> Result; + async fn move_to_ready_state(&self, message: ChronosDeliveryMessage) -> Result; async fn get_messages( &self, status: ChronosMessageStatus, diff --git a/chronos_bin/src/kafka/config.rs b/chronos_bin/src/kafka/config.rs index 2b67ce9..d64b406 100644 --- a/chronos_bin/src/kafka/config.rs +++ b/chronos_bin/src/kafka/config.rs @@ -1,6 +1,6 @@ +use crate::env_var; use rdkafka::ClientConfig; use std::collections::HashMap; -use crate::env_var; #[derive(Debug)] pub struct KafkaConfig { diff --git a/chronos_bin/src/kafka/consumer.rs b/chronos_bin/src/kafka/consumer.rs index 787bd54..55f4983 100644 --- a/chronos_bin/src/kafka/consumer.rs +++ b/chronos_bin/src/kafka/consumer.rs @@ -1,16 +1,16 @@ use std::{num::TryFromIntError, time::Duration}; +use crate::core::MessageConsumer; +use crate::kafka::errors::KafkaAdapterError; use async_trait::async_trait; use log::{debug, info}; +use rdkafka::message::BorrowedMessage; use rdkafka::{ consumer::{Consumer, StreamConsumer}, Message, TopicPartitionList, }; -use rdkafka::message::BorrowedMessage; -use crate::kafka::errors::KafkaAdapterError; -use crate::core::MessageConsumer; -use super::{config::KafkaConfig}; +use super::config::KafkaConfig; // Kafka Consumer Client // #[derive(Debug, Clone)] @@ -24,21 +24,13 @@ impl KafkaConsumer { let consumer = config.build_consumer_config().create().expect("Failed to create consumer"); let topic = config.in_topic.clone(); - Self { - consumer, - topic, - } + Self { consumer, topic } } pub(crate) async fn subscribe(&self) { - - let _ = &self - .consumer - .subscribe(&[&self.topic]) - .expect("consumer Subscribe to topic failed"); + let _ = &self.consumer.subscribe(&[&self.topic]).expect("consumer Subscribe to topic failed"); } - pub(crate) async fn consume_message(&self) ->Result { - + pub(crate) async fn consume_message(&self) -> Result { self.consumer.recv().await.map_err(|e| KafkaAdapterError::ReceiveMessage(e)) } } diff --git a/chronos_bin/src/kafka/kafka_deploy.rs b/chronos_bin/src/kafka/kafka_deploy.rs index 27e7ddc..7d0128d 100644 --- a/chronos_bin/src/kafka/kafka_deploy.rs +++ b/chronos_bin/src/kafka/kafka_deploy.rs @@ -1,5 +1,6 @@ use std::time::Duration; +use crate::kafka::config::KafkaConfig; use rdkafka::{ admin::{AdminClient, AdminOptions, NewTopic, TopicReplication}, client::DefaultClientContext, @@ -8,7 +9,6 @@ use rdkafka::{ types::RDKafkaErrorCode, }; use thiserror::Error as ThisError; -use crate::kafka::config::KafkaConfig; pub enum KafkaDeployStatus { TopicExists, @@ -27,18 +27,20 @@ pub async fn create_topics(replication_factor: i32, num_of_partitions: i32) -> R let kafka_config = KafkaConfig::from_env(); println!("kafka configs received from env... {kafka_config:#?}"); - let _ = create_topic(&kafka_config, kafka_config.in_topic.as_str(),replication_factor, num_of_partitions).await?; - let _ = create_topic(&kafka_config, kafka_config.out_topic.as_str(),replication_factor, num_of_partitions).await?; + let _ = create_topic(&kafka_config, kafka_config.in_topic.as_str(), replication_factor, num_of_partitions).await?; + let _ = create_topic(&kafka_config, kafka_config.out_topic.as_str(), replication_factor, num_of_partitions).await?; Ok(()) } -async fn create_topic(kafka_config: &KafkaConfig, topic_name: &str, replication_factor: i32, num_of_partitions: i32) -> Result { - +async fn create_topic( + kafka_config: &KafkaConfig, + topic_name: &str, + replication_factor: i32, + num_of_partitions: i32, +) -> Result { let consumer: StreamConsumer = kafka_config.build_consumer_config().create()?; let timeout = Duration::from_secs(1); - let metadata = consumer - .fetch_metadata(Some(topic_name), timeout) - .expect("Fetching topic metadata failed"); + let metadata = consumer.fetch_metadata(Some(topic_name), timeout).expect("Fetching topic metadata failed"); if !metadata.topics().is_empty() && !metadata.topics()[0].partitions().is_empty() { println!("Topic {topic_name} exists"); @@ -58,9 +60,7 @@ async fn create_topic(kafka_config: &KafkaConfig, topic_name: &str, replication_ let results = admin.create_topics(&[topic], &opts).await?; - results[0] - .as_ref() - .map_err(|e| KafkaDeployError::TopicCreation(topic_name.to_string(), e.1))?; + results[0].as_ref().map_err(|e| KafkaDeployError::TopicCreation(topic_name.to_string(), e.1))?; println!("Topic : {topic_name} created"); Ok(KafkaDeployStatus::TopicCreated) } diff --git a/chronos_bin/src/kafka/producer.rs b/chronos_bin/src/kafka/producer.rs index f22362a..5684d59 100644 --- a/chronos_bin/src/kafka/producer.rs +++ b/chronos_bin/src/kafka/producer.rs @@ -1,13 +1,12 @@ use std::collections::HashMap; use std::time::Duration; +use crate::kafka::errors::KafkaAdapterError; +use crate::utils::util::into_headers; use async_trait::async_trait; use log::debug; -use rdkafka::producer::{BaseRecord, DefaultProducerContext, FutureProducer, FutureRecord, ThreadedProducer}; use rdkafka::producer::future_producer::OwnedDeliveryResult; -use crate::kafka::errors::KafkaAdapterError; -use crate::utils::util::into_headers; - +use rdkafka::producer::{BaseRecord, DefaultProducerContext, FutureProducer, FutureRecord, ThreadedProducer}; use super::config::KafkaConfig; @@ -30,13 +29,14 @@ impl KafkaProducer { message: String, headers: Option>, key: String, - id: String + id: String, ) -> Result { let o_header = into_headers(&headers.unwrap()); // println!("headers {:?}", o_header); // println!("headers {:?} headers--{:?}", &headers["chronosId)"].to_string(), &headers["chronosDeadline)"].to_string()); - let delivery_status = self.producer + let delivery_status = self + .producer .send( FutureRecord::to(&self.topic.as_str()) .payload(message.as_str()) @@ -49,4 +49,3 @@ impl KafkaProducer { Ok(id) } } - diff --git a/chronos_bin/src/message_processor.rs b/chronos_bin/src/message_processor.rs index 93a6226..e52f36d 100644 --- a/chronos_bin/src/message_processor.rs +++ b/chronos_bin/src/message_processor.rs @@ -1,16 +1,15 @@ +use crate::kafka::producer::KafkaProducer; +use crate::postgres::config::PgConfig; +use crate::postgres::errors::PgError; +use crate::postgres::pg::{GetReady, Pg, TableRow}; +use chrono::Utc; +use serde_json::{json, Value}; use std::collections::HashMap; use std::sync::Arc; use std::thread; -use chrono::Utc; use std::time::Duration; +use tokio_postgres::Row; use uuid::Uuid; -use crate::kafka::producer::KafkaProducer; -use tokio_postgres::{Row}; -use serde_json::{json, Value}; -use crate::postgres::pg::{GetReady, Pg, TableRow}; -use crate::postgres::config::PgConfig; -use crate::postgres::errors::PgError; - pub struct MessageProcessor { pub(crate) data_store: Arc>, @@ -21,7 +20,6 @@ impl MessageProcessor { pub async fn run(&self) { println!("MessageProcessor ON!"); - loop { tokio::time::sleep(Duration::from_millis(10)).await; // println!("MessageProcessor"); @@ -39,7 +37,7 @@ impl MessageProcessor { let mut ready_params = Vec::new(); ready_params.push(param); - match &self.data_store.ready_to_fire( &ready_params).await { + match &self.data_store.ready_to_fire(&ready_params).await { Ok(publish_rows) => { let mut ids: Vec = Vec::with_capacity(publish_rows.len()); let mut publish_futures = Vec::with_capacity(publish_rows.len()); @@ -54,24 +52,22 @@ impl MessageProcessor { message_value: row.get("message_value"), }; - let headers: HashMap = - match serde_json::from_str(&updated_row.message_headers.to_string()) { - Ok(T) => T, - Err(E) => { - println!("error occurred while parsing"); - HashMap::new() - } - }; + let headers: HashMap = match serde_json::from_str(&updated_row.message_headers.to_string()) { + Ok(T) => T, + Err(E) => { + println!("error occurred while parsing"); + HashMap::new() + } + }; //TODO: handle empty headers // println!("checking {:?}",headers); - publish_futures.push(self.producer - .publish( - updated_row.message_value.to_string(), - Some(headers), - updated_row.message_key.to_string(), - updated_row.id.to_string() - )) + publish_futures.push(self.producer.publish( + updated_row.message_value.to_string(), + Some(headers), + updated_row.message_key.to_string(), + updated_row.id.to_string(), + )) } let results = futures::future::join_all(publish_futures).await; for result in results { @@ -86,9 +82,8 @@ impl MessageProcessor { } } - if ids.len() > 0 { - if let Err(outcome_error) = &self.data_store.delete_fired( &ids).await { + if let Err(outcome_error) = &self.data_store.delete_fired(&ids).await { println!("error occurred in message processor delete_fired {}", outcome_error); } } @@ -96,10 +91,7 @@ impl MessageProcessor { Err(e) => { println!("error occurred in message processor {}", e); } - } - - } } } diff --git a/chronos_bin/src/message_receiver.rs b/chronos_bin/src/message_receiver.rs index d5228b0..995c296 100644 --- a/chronos_bin/src/message_receiver.rs +++ b/chronos_bin/src/message_receiver.rs @@ -4,34 +4,30 @@ use chrono::{DateTime, Utc}; use log::{error, info, warn}; use serde_json::json; -use std::str::{from_utf8, FromStr}; -use std::sync::Arc; -use std::time::Instant; -use rdkafka::message::{BorrowedMessage, Message}; use crate::kafka::consumer::KafkaConsumer; use crate::kafka::producer::KafkaProducer; use crate::postgres::pg::{Pg, TableInsertRow}; -use crate::utils::util::{headers_check, required_headers, CHRONOS_ID, DEADLINE, get_payload_utf8, get_message_key}; +use crate::utils::util::{get_message_key, get_payload_utf8, headers_check, required_headers, CHRONOS_ID, DEADLINE}; +use rdkafka::message::{BorrowedMessage, Message}; +use std::str::{from_utf8, FromStr}; +use std::sync::Arc; +use std::time::Instant; pub struct MessageReceiver { - pub(crate) consumer: Arc< Box>, + pub(crate) consumer: Arc>, pub(crate) producer: Arc>, pub(crate) data_store: Arc>, } - impl MessageReceiver { - pub fn new(consumer: Arc< Box>, - producer: Arc>, - data_store: Arc>) -> Self { + pub fn new(consumer: Arc>, producer: Arc>, data_store: Arc>) -> Self { Self { consumer, producer, - data_store + data_store, } } pub async fn run(&self) { - println!("Receiver ON!"); &self.consumer.subscribe().await; // for _n in 0..100 { @@ -40,18 +36,16 @@ impl MessageReceiver { let mut db_insert_count = 0; loop { if let Ok(message) = &self.consumer.consume_message().await { - total_count = total_count+1; + total_count = total_count + 1; if headers_check(&message.headers().unwrap()) { let new_message = &message; let headers = required_headers(&new_message).expect("parsing headers failed"); - let message_deadline: DateTime = - DateTime::::from_str(&headers[DEADLINE]) - .expect("String date parsing failed"); + let message_deadline: DateTime = DateTime::::from_str(&headers[DEADLINE]).expect("String date parsing failed"); if message_deadline <= Utc::now() { direct_sent_count = direct_sent_count + 1; let string_payload = String::from_utf8_lossy(get_payload_utf8(new_message)).to_string(); - let message_key = get_message_key(new_message) ; + let message_key = get_message_key(new_message); let _outcome = &self .producer .publish(string_payload, Some(headers), message_key, "same id".to_string()) @@ -61,7 +55,7 @@ impl MessageReceiver { db_insert_count = db_insert_count + 1; let chronos_message_id = &headers[CHRONOS_ID]; - let payload =get_payload_utf8(new_message); + let payload = get_payload_utf8(new_message); let message_key = get_message_key(new_message); @@ -75,13 +69,12 @@ impl MessageReceiver { let insert_time = Instant::now(); self.data_store.insert_to_delay(¶ms).await.expect("insert to db failed"); // println!("insert took: {:?}", insert_time.elapsed()) - } } else { warn!("message with improper headers on inbox.topic "); //TODO: ignore } - // println!("{direct_sent_count} messages sent directly and {db_insert_count} added to db from total of {total_count} "); + // println!("{direct_sent_count} messages sent directly and {db_insert_count} added to db from total of {total_count} "); } // println!("commit received message {:?}", new_message); diff --git a/chronos_bin/src/postgres/errors.rs b/chronos_bin/src/postgres/errors.rs index 72c44de..b2e4a86 100644 --- a/chronos_bin/src/postgres/errors.rs +++ b/chronos_bin/src/postgres/errors.rs @@ -13,6 +13,4 @@ pub enum PgError { // Tokio-postgres errors #[error("Unknown exception")] UnknownException(#[from] TokioPostgresError), - - } diff --git a/chronos_bin/src/postgres/pg.rs b/chronos_bin/src/postgres/pg.rs index e553c88..b68af89 100644 --- a/chronos_bin/src/postgres/pg.rs +++ b/chronos_bin/src/postgres/pg.rs @@ -1,18 +1,17 @@ -use std::time::{Duration, Instant}; use async_trait::async_trait; use chrono::{DateTime, Utc}; use deadpool_postgres::{Config, GenericClient, ManagerConfig, Object, Pool, PoolConfig, Runtime}; use log::error; use log::kv::Source; use serde_json::{json, Value}; -use tokio_postgres::{Client, NoTls, Row}; +use std::time::{Duration, Instant}; use tokio_postgres::types::ToSql; +use tokio_postgres::{Client, NoTls, Row}; use uuid::Uuid; use crate::postgres::config::PgConfig; use crate::postgres::errors::PgError; - #[derive(Clone)] pub struct Pg { pub pool: Pool, @@ -74,7 +73,7 @@ impl Pg { //test connection let _ = pool.get().await.map_err(PgError::GetClientFromPool)?; - println!("pool.status: {:?}", pool.status()); + println!("pool.status: {:?}", pool.status()); Ok(Pg { pool }) } @@ -92,7 +91,6 @@ impl Pg { // }); // Ok(client) - match self.pool.get().await { Err(e) => { error!("error::: {:?}", e); @@ -107,10 +105,9 @@ impl Pg { impl Pg { pub(crate) async fn insert_to_delay(&self, params: &TableInsertRow<'_>) -> Result { - let get_client_instant = Instant::now(); - let pg_client = self.get_client().await?; - let insert_query = - "INSERT INTO hanger (id, deadline, message_headers, message_key, message_value) + let get_client_instant = Instant::now(); + let pg_client = self.get_client().await?; + let insert_query = "INSERT INTO hanger (id, deadline, message_headers, message_key, message_value) VALUES ($1, $2 ,$3, $4, $5 )"; let query_execute_instant = Instant::now(); let stmt = pg_client.prepare(insert_query).await?; @@ -125,32 +122,30 @@ impl Pg { ¶ms.message_value, ], ) - .await.expect("insert failed"); + .await + .expect("insert failed"); let time_elapsed = query_execute_instant.elapsed(); - if time_elapsed > Duration::from_millis(100) { + if time_elapsed > Duration::from_millis(100) { println!("insert_to_delay query_execute_instant: {:?} ", time_elapsed); } Ok(outcome) } - pub(crate) async fn delete_fired(&self, ids: &Vec) -> Result { + pub(crate) async fn delete_fired(&self, ids: &Vec) -> Result { let query_execute_instant = Instant::now(); let pg_client = self.get_client().await?; - // let transaction = pg_client.transaction().await; + // let transaction = pg_client.transaction().await; // let delete_ids = ids.join(","); // let delete_ids = ids.strip_suffix(",").unwrap().to_string(); // println!("delete ids {:?}", ids); - let values_as_slice: Vec<_> = ids - .iter() - .map(|x| x as &(dyn ToSql + Sync)) - .collect(); + let values_as_slice: Vec<_> = ids.iter().map(|x| x as &(dyn ToSql + Sync)).collect(); let mut query: String = "DELETE FROM hanger WHERE id IN (".to_owned(); for i in 0..ids.len() { - query = query + "$" + (i+1).to_string().as_str() + ","; + query = query + "$" + (i + 1).to_string().as_str() + ","; } query = query.strip_suffix(",").unwrap().to_string(); query = query + ")"; @@ -159,20 +154,18 @@ impl Pg { let stmt = pg_client.prepare(query.as_str()).await?; let response = pg_client.execute(&stmt, &values_as_slice[..]).await?; let time_elapsed = query_execute_instant.elapsed(); - if time_elapsed > Duration::from_millis(100) { + if time_elapsed > Duration::from_millis(100) { println!(" delete_fired query_execute_instant: {:?} ", time_elapsed); } Ok(response) } pub(crate) async fn ready_to_fire(&self, params: &Vec) -> Result, PgError> { - let pg_client = self.get_client().await?; // println!("readying_update DB"); let param = ¶ms[0]; - // let ready_query = format!( "UPDATE hanger SET readied_at = '{}', readied_by= '{}' where id IN\ // (SELECT ID FROM hanger WHERE deadline <= '{}' AND readied_at IS NULL LIMIT {})\ // RETURNING id, deadline, readied_at, readied_by, message_headers, message_key, message_value;",¶m.readied_at, @@ -188,7 +181,7 @@ impl Pg { .await .expect("update failed"); let time_elapsed = query_execute_instant.elapsed(); - if time_elapsed > Duration::from_millis(100) { + if time_elapsed > Duration::from_millis(100) { println!(" ready_to_fire query_execute_instant: {:?} params: {:?}", time_elapsed, param); } // println!("redying success {:?}", &response); @@ -201,13 +194,10 @@ impl Pg { let query_execute_instant = Instant::now(); let pg_client = self.get_client().await?; - let get_query = "SELECT * from hanger where readied_at > $1"; - let response = pg_client - .query(get_query, &[&delay_time]) - .await - .expect("get delayed messages query failed"); + let get_query = "SELECT * from hanger where readied_at > $1"; + let response = pg_client.query(get_query, &[&delay_time]).await.expect("get delayed messages query failed"); let time_elapsed = query_execute_instant.elapsed(); - if time_elapsed > Duration::from_millis(100) { + if time_elapsed > Duration::from_millis(100) { println!(" failed_to_fire query_execute_instant: {:?} ", time_elapsed); } Ok(response) @@ -215,7 +205,7 @@ impl Pg { pub(crate) async fn reset_to_init(&self, to_init_list: &Vec) -> Result, PgError> { let query_execute_instant = Instant::now(); - println!("to_init_list: {}",to_init_list.len()); + println!("to_init_list: {}", to_init_list.len()); let mut id_list = Vec::::new(); for row in to_init_list { // let updated_row = TableRow { @@ -239,14 +229,11 @@ impl Pg { // ids_list // ); - let values_as_slice: Vec<_> = id_list - .iter() - .map(|x| x as &(dyn ToSql + Sync)) - .collect(); + let values_as_slice: Vec<_> = id_list.iter().map(|x| x as &(dyn ToSql + Sync)).collect(); let mut query: String = "UPDATE hanger SET readied_at=null , readied_by=null WHERE id IN (".to_owned(); for i in 0..id_list.len() { - query = query + "$" + (i+1).to_string().as_str() + ","; + query = query + "$" + (i + 1).to_string().as_str() + ","; } query = query.strip_suffix(",").unwrap().to_string(); query = query + ")"; @@ -255,9 +242,9 @@ impl Pg { pg_client.execute(&stmt, &values_as_slice[..]).await.expect("reset to init query failed"); let time_elapsed = query_execute_instant.elapsed(); - if time_elapsed > Duration::from_millis(100) { + if time_elapsed > Duration::from_millis(100) { println!(" reset_to_init query_execute_instant: {:?} ", time_elapsed); } Ok(id_list) } -} \ No newline at end of file +} diff --git a/chronos_bin/src/runner.rs b/chronos_bin/src/runner.rs index c78c7d8..7a41931 100644 --- a/chronos_bin/src/runner.rs +++ b/chronos_bin/src/runner.rs @@ -1,24 +1,22 @@ -use log::{debug, error, info}; -use std::sync::Arc; -use std::thread; use crate::core::{MessageConsumer, MessageProducer}; use crate::kafka::consumer::KafkaConsumer; use crate::kafka::producer::KafkaProducer; +use log::{debug, error, info}; +use std::sync::Arc; +use std::thread; use crate::message_processor::MessageProcessor; use crate::message_receiver::MessageReceiver; use crate::monitor::FailureDetector; use crate::postgres::pg::Pg; - pub struct Runner { pub consumer: Arc>, pub producer: Arc>, - pub data_store: Arc> + pub data_store: Arc>, } impl Runner { - pub async fn run(&self) { debug!("Chronos Runner"); @@ -31,14 +29,11 @@ impl Runner { let receiver_prod = self.producer.clone(); let receiver_consumer = self.consumer.clone(); - let monitor_handler = tokio::task::spawn(async { - let monitor = FailureDetector { - data_store: monitor_ds - }; - monitor.run().await; + let monitor_handler = tokio::task::spawn(async { + let monitor = FailureDetector { data_store: monitor_ds }; + monitor.run().await; }); let message_processor_handler = tokio::task::spawn(async { - let message_processor = MessageProcessor { producer: process_producer, data_store: process_ds, @@ -46,21 +41,15 @@ impl Runner { message_processor.run().await; }); let message_receiver_handler = tokio::spawn(async { - let message_receiver = MessageReceiver { consumer: receiver_consumer, producer: receiver_prod, data_store: receiver_ds, }; - message_receiver.run().await; + message_receiver.run().await; }); - futures::future::join_all([ - monitor_handler, - message_processor_handler, - message_receiver_handler, - ]) - .await; + futures::future::join_all([monitor_handler, message_processor_handler, message_receiver_handler]).await; } } diff --git a/chronos_bin/src/utils/util.rs b/chronos_bin/src/utils/util.rs index 23ba1ed..9523f0f 100644 --- a/chronos_bin/src/utils/util.rs +++ b/chronos_bin/src/utils/util.rs @@ -8,18 +8,13 @@ pub static DEADLINE: &str = "chronosDeadline"; //TODO check correctness for two headers in this method pub fn required_headers(message: &BorrowedMessage) -> Option> { if let Some(headers) = message.headers() { - let reqd_headers = - headers - .iter() - .fold(HashMap::::new(), |mut acc, header| { - let key: String = header.key.parse().unwrap(); - let value: String = String::from_utf8_lossy( - header.value.expect("utf8 parsing for header value failed") - ).into_owned(); - - acc.insert(key, value); - acc - }); + let reqd_headers = headers.iter().fold(HashMap::::new(), |mut acc, header| { + let key: String = header.key.parse().unwrap(); + let value: String = String::from_utf8_lossy(header.value.expect("utf8 parsing for header value failed")).into_owned(); + + acc.insert(key, value); + acc + }); return Some(reqd_headers); } return None; @@ -28,10 +23,7 @@ pub fn required_headers(message: &BorrowedMessage) -> Option) -> OwnedHeaders { headers.iter().fold(OwnedHeaders::new(), |acc, header| { let (key, value) = header; - let o_header = Header { - key, - value: Some(value), - }; + let o_header = Header { key, value: Some(value) }; acc.insert(o_header) }) } @@ -49,14 +41,11 @@ pub fn headers_check(headers: &BorrowedHeaders) -> bool { return outcome; } -pub fn get_payload_utf8<'a>(message:&'a BorrowedMessage)->&'a [u8]{ - message - .payload() - .expect("parsing payload failed") +pub fn get_payload_utf8<'a>(message: &'a BorrowedMessage) -> &'a [u8] { + message.payload().expect("parsing payload failed") } -pub fn get_message_key(message: &BorrowedMessage)->String{ - let key = String::from_utf8_lossy(message.key().expect("No key found for message")) - .to_string(); +pub fn get_message_key(message: &BorrowedMessage) -> String { + let key = String::from_utf8_lossy(message.key().expect("No key found for message")).to_string(); key }