Skip to content

Commit

Permalink
fix: add husky as ddependency and formatted files
Browse files Browse the repository at this point in the history
  • Loading branch information
Amninder Kaur committed Aug 21, 2023
1 parent 8cc486e commit 8838f01
Show file tree
Hide file tree
Showing 14 changed files with 125 additions and 186 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,8 @@ criterion = "0.4.0"
name="consumer_bench"
harness=false

[dev-dependencies.cargo-husky]
version = "1"
default-features = false # Disable features which are enabled by default
features = ["user-hooks"]

10 changes: 5 additions & 5 deletions benches/consumer_bench.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#![allow(unused)]
use chronos_bin::runner::Runner;
// use chronos_bin::runner::Runner;
// use criterion::{black_box, criterion_group, criterion_main, Criterion};
// // use hello_cargo::fibonacci;
//
Expand All @@ -14,17 +14,17 @@ use chronos_bin::runner::Runner;
use criterion::*;

async fn my_function() {
let runner = Runner {};
runner.run().await;
// let runner = Runner {};
// runner.run().await;
}

fn bench(c: &mut Criterion) {
let mut group = c.benchmark_group("sample-size-example");
// Configure Criterion.rs to detect smaller differences and increase sample size to improve
// precision and counteract the resulting noise.
group.significance_level(0.1).sample_size(500);
group.bench_function("my-function", |b| b.iter(|| my_function()));
group.finish();
// group.bench_function("my-function", |b| b.iter(|| my_function()));
// group.finish();
}

criterion_group!(benches, bench);
Expand Down
20 changes: 4 additions & 16 deletions chronos_bin/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,10 @@ pub trait MessageProducer {

#[async_trait]
pub trait DataStore {
async fn insert(
&self,
message: ChronosDeliveryMessage,
) -> Result<ChronosDeliveryMessage, ChronosError>;
async fn delete(
&self,
message: ChronosDeliveryMessage,
) -> Result<ChronosDeliveryMessage, ChronosError>;
async fn move_to_initial_state(
&self,
message: ChronosDeliveryMessage,
) -> Result<ChronosDeliveryMessage, ChronosError>;
async fn move_to_ready_state(
&self,
message: ChronosDeliveryMessage,
) -> Result<ChronosDeliveryMessage, ChronosError>;
async fn insert(&self, message: ChronosDeliveryMessage) -> Result<ChronosDeliveryMessage, ChronosError>;
async fn delete(&self, message: ChronosDeliveryMessage) -> Result<ChronosDeliveryMessage, ChronosError>;
async fn move_to_initial_state(&self, message: ChronosDeliveryMessage) -> Result<ChronosDeliveryMessage, ChronosError>;
async fn move_to_ready_state(&self, message: ChronosDeliveryMessage) -> Result<ChronosDeliveryMessage, ChronosError>;
async fn get_messages(
&self,
status: ChronosMessageStatus,
Expand Down
2 changes: 1 addition & 1 deletion chronos_bin/src/kafka/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::env_var;
use rdkafka::ClientConfig;
use std::collections::HashMap;
use crate::env_var;

#[derive(Debug)]
pub struct KafkaConfig {
Expand Down
22 changes: 7 additions & 15 deletions chronos_bin/src/kafka/consumer.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use std::{num::TryFromIntError, time::Duration};

use crate::core::MessageConsumer;
use crate::kafka::errors::KafkaAdapterError;
use async_trait::async_trait;
use log::{debug, info};
use rdkafka::message::BorrowedMessage;
use rdkafka::{
consumer::{Consumer, StreamConsumer},
Message, TopicPartitionList,
};
use rdkafka::message::BorrowedMessage;
use crate::kafka::errors::KafkaAdapterError;
use crate::core::MessageConsumer;

use super::{config::KafkaConfig};
use super::config::KafkaConfig;

// Kafka Consumer Client
// #[derive(Debug, Clone)]
Expand All @@ -24,21 +24,13 @@ impl KafkaConsumer {
let consumer = config.build_consumer_config().create().expect("Failed to create consumer");

let topic = config.in_topic.clone();
Self {
consumer,
topic,
}
Self { consumer, topic }
}

pub(crate) async fn subscribe(&self) {

let _ = &self
.consumer
.subscribe(&[&self.topic])
.expect("consumer Subscribe to topic failed");
let _ = &self.consumer.subscribe(&[&self.topic]).expect("consumer Subscribe to topic failed");
}
pub(crate) async fn consume_message(&self) ->Result<BorrowedMessage, KafkaAdapterError> {

pub(crate) async fn consume_message(&self) -> Result<BorrowedMessage, KafkaAdapterError> {
self.consumer.recv().await.map_err(|e| KafkaAdapterError::ReceiveMessage(e))
}
}
22 changes: 11 additions & 11 deletions chronos_bin/src/kafka/kafka_deploy.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::time::Duration;

use crate::kafka::config::KafkaConfig;
use rdkafka::{
admin::{AdminClient, AdminOptions, NewTopic, TopicReplication},
client::DefaultClientContext,
Expand All @@ -8,7 +9,6 @@ use rdkafka::{
types::RDKafkaErrorCode,
};
use thiserror::Error as ThisError;
use crate::kafka::config::KafkaConfig;

pub enum KafkaDeployStatus {
TopicExists,
Expand All @@ -27,18 +27,20 @@ pub async fn create_topics(replication_factor: i32, num_of_partitions: i32) -> R
let kafka_config = KafkaConfig::from_env();
println!("kafka configs received from env... {kafka_config:#?}");

let _ = create_topic(&kafka_config, kafka_config.in_topic.as_str(),replication_factor, num_of_partitions).await?;
let _ = create_topic(&kafka_config, kafka_config.out_topic.as_str(),replication_factor, num_of_partitions).await?;
let _ = create_topic(&kafka_config, kafka_config.in_topic.as_str(), replication_factor, num_of_partitions).await?;
let _ = create_topic(&kafka_config, kafka_config.out_topic.as_str(), replication_factor, num_of_partitions).await?;
Ok(())
}

async fn create_topic(kafka_config: &KafkaConfig, topic_name: &str, replication_factor: i32, num_of_partitions: i32) -> Result<KafkaDeployStatus, KafkaDeployError> {

async fn create_topic(
kafka_config: &KafkaConfig,
topic_name: &str,
replication_factor: i32,
num_of_partitions: i32,
) -> Result<KafkaDeployStatus, KafkaDeployError> {
let consumer: StreamConsumer = kafka_config.build_consumer_config().create()?;
let timeout = Duration::from_secs(1);
let metadata = consumer
.fetch_metadata(Some(topic_name), timeout)
.expect("Fetching topic metadata failed");
let metadata = consumer.fetch_metadata(Some(topic_name), timeout).expect("Fetching topic metadata failed");

if !metadata.topics().is_empty() && !metadata.topics()[0].partitions().is_empty() {
println!("Topic {topic_name} exists");
Expand All @@ -58,9 +60,7 @@ async fn create_topic(kafka_config: &KafkaConfig, topic_name: &str, replication_

let results = admin.create_topics(&[topic], &opts).await?;

results[0]
.as_ref()
.map_err(|e| KafkaDeployError::TopicCreation(topic_name.to_string(), e.1))?;
results[0].as_ref().map_err(|e| KafkaDeployError::TopicCreation(topic_name.to_string(), e.1))?;
println!("Topic : {topic_name} created");
Ok(KafkaDeployStatus::TopicCreated)
}
Expand Down
13 changes: 6 additions & 7 deletions chronos_bin/src/kafka/producer.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use std::collections::HashMap;
use std::time::Duration;

use crate::kafka::errors::KafkaAdapterError;
use crate::utils::util::into_headers;
use async_trait::async_trait;
use log::debug;
use rdkafka::producer::{BaseRecord, DefaultProducerContext, FutureProducer, FutureRecord, ThreadedProducer};
use rdkafka::producer::future_producer::OwnedDeliveryResult;
use crate::kafka::errors::KafkaAdapterError;
use crate::utils::util::into_headers;

use rdkafka::producer::{BaseRecord, DefaultProducerContext, FutureProducer, FutureRecord, ThreadedProducer};

use super::config::KafkaConfig;

Expand All @@ -30,13 +29,14 @@ impl KafkaProducer {
message: String,
headers: Option<HashMap<String, String>>,
key: String,
id: String
id: String,
) -> Result<String, KafkaAdapterError> {
let o_header = into_headers(&headers.unwrap());
// println!("headers {:?}", o_header);
// println!("headers {:?} headers--{:?}", &headers["chronosId)"].to_string(), &headers["chronosDeadline)"].to_string());

let delivery_status = self.producer
let delivery_status = self
.producer
.send(
FutureRecord::to(&self.topic.as_str())
.payload(message.as_str())
Expand All @@ -49,4 +49,3 @@ impl KafkaProducer {
Ok(id)
}
}

52 changes: 22 additions & 30 deletions chronos_bin/src/message_processor.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
use crate::kafka::producer::KafkaProducer;
use crate::postgres::config::PgConfig;
use crate::postgres::errors::PgError;
use crate::postgres::pg::{GetReady, Pg, TableRow};
use chrono::Utc;
use serde_json::{json, Value};
use std::collections::HashMap;
use std::sync::Arc;
use std::thread;
use chrono::Utc;
use std::time::Duration;
use tokio_postgres::Row;
use uuid::Uuid;
use crate::kafka::producer::KafkaProducer;
use tokio_postgres::{Row};
use serde_json::{json, Value};
use crate::postgres::pg::{GetReady, Pg, TableRow};
use crate::postgres::config::PgConfig;
use crate::postgres::errors::PgError;


pub struct MessageProcessor {
pub(crate) data_store: Arc<Box<Pg>>,
Expand All @@ -21,7 +20,6 @@ impl MessageProcessor {
pub async fn run(&self) {
println!("MessageProcessor ON!");


loop {
tokio::time::sleep(Duration::from_millis(10)).await;
// println!("MessageProcessor");
Expand All @@ -39,7 +37,7 @@ impl MessageProcessor {
let mut ready_params = Vec::new();
ready_params.push(param);

match &self.data_store.ready_to_fire( &ready_params).await {
match &self.data_store.ready_to_fire(&ready_params).await {
Ok(publish_rows) => {
let mut ids: Vec<String> = Vec::with_capacity(publish_rows.len());
let mut publish_futures = Vec::with_capacity(publish_rows.len());
Expand All @@ -54,24 +52,22 @@ impl MessageProcessor {
message_value: row.get("message_value"),
};

let headers: HashMap<String, String> =
match serde_json::from_str(&updated_row.message_headers.to_string()) {
Ok(T) => T,
Err(E) => {
println!("error occurred while parsing");
HashMap::new()
}
};
let headers: HashMap<String, String> = match serde_json::from_str(&updated_row.message_headers.to_string()) {
Ok(T) => T,
Err(E) => {
println!("error occurred while parsing");
HashMap::new()
}
};
//TODO: handle empty headers
// println!("checking {:?}",headers);

publish_futures.push(self.producer
.publish(
updated_row.message_value.to_string(),
Some(headers),
updated_row.message_key.to_string(),
updated_row.id.to_string()
))
publish_futures.push(self.producer.publish(
updated_row.message_value.to_string(),
Some(headers),
updated_row.message_key.to_string(),
updated_row.id.to_string(),
))
}
let results = futures::future::join_all(publish_futures).await;
for result in results {
Expand All @@ -86,20 +82,16 @@ impl MessageProcessor {
}
}


if ids.len() > 0 {
if let Err(outcome_error) = &self.data_store.delete_fired( &ids).await {
if let Err(outcome_error) = &self.data_store.delete_fired(&ids).await {
println!("error occurred in message processor delete_fired {}", outcome_error);
}
}
}
Err(e) => {
println!("error occurred in message processor {}", e);
}

}


}
}
}
Loading

0 comments on commit 8838f01

Please sign in to comment.