Skip to content

Commit

Permalink
fix: update querues
Browse files Browse the repository at this point in the history
  • Loading branch information
anil0906 committed May 16, 2023
1 parent e3a9be3 commit ea4e897
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 118 deletions.
4 changes: 2 additions & 2 deletions .env
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# WHEN DEVELOPING LOCALLY, WE NEED TO ACCESS THE HOST NETWORK FROM K8S (FOR POSTGRES/KAFKA/ELASTIC/ETC)
LOCAL_HOST_IP=$(ifconfig en0 | grep inet | grep -v inet6 | awk '{print $2}')
# LOCAL_HOST_IP=$(ifconfig en0 | grep inet | grep -v inet6 | awk '{print $2}')

# RUST version
RUST_VERSION=stable
Expand Down Expand Up @@ -28,4 +28,4 @@ ADMIN_PG_PASSWORD=admin
ADMIN_PG_DATABASE=postgres

# CONFIG
RUST_LOG=debug
RUST_LOG=info
5 changes: 3 additions & 2 deletions src/kafka/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ impl KafkaProducer {
message: String,
headers: Option<HashMap<String, String>>,
key: String,
) -> Result<(), KafkaAdapterError> {
id: String
) -> Result<String, KafkaAdapterError> {
let o_header = into_headers(&headers.unwrap());
// println!("headers {:?}", o_header);
// println!("headers {:?} headers--{:?}", &headers["chronosId)"].to_string(), &headers["chronosDeadline)"].to_string());
Expand All @@ -45,7 +46,7 @@ impl KafkaProducer {
)
.await
.map_err(|(kafka_error, record)| KafkaAdapterError::PublishMessage(kafka_error, "message publishing failed".to_string()))?;
Ok(())
Ok(id)
}
}

57 changes: 29 additions & 28 deletions src/message_processor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::thread;
use chrono::Utc;
use std::time::Duration;
use uuid::Uuid;
Expand All @@ -18,21 +19,20 @@ pub struct MessageProcessor {

impl MessageProcessor {
pub async fn run(&self) {

println!("MessageProcessor ON!");


loop {
tokio::time::sleep(Duration::from_secs(1)).await;

//TODO: fine tune this 1sec duration
let deadline = Utc::now() + chrono::Duration::seconds(1);
tokio::time::sleep(Duration::from_millis(10)).await;
// println!("MessageProcessor");
let deadline = Utc::now();
let uuid = Uuid::new_v4();

let param = GetReady {
readied_at: deadline,
readied_by: uuid,
deadline,
limit: 10,
limit: 1000,
// order: "asc",
};

Expand All @@ -41,14 +41,14 @@ impl MessageProcessor {

let publish_rows = &self.data_store.ready_to_fire( &ready_params).await.unwrap();

println!(
"Rows Needs Readying:: {:?} @ {:?}",
publish_rows.len(),
Utc::now()
);

let mut ids: Vec<&str> = Vec::new();
// println!(
// "Rows Needs Readying:: {:?} @ {:?}",
// publish_rows.len(),
// Utc::now()
// );

let mut ids: Vec<String> = Vec::with_capacity(publish_rows.len());
let mut publish_futures = Vec::with_capacity(publish_rows.len());
for row in publish_rows {
let updated_row = TableRow {
id: row.get("id"),
Expand All @@ -71,21 +71,24 @@ impl MessageProcessor {
//TODO: handle empty headers
// println!("checking {:?}",headers);

let result = self.producer
publish_futures.push(self.producer
.publish(
updated_row.message_value.to_string(),
Some(headers),
updated_row.message_key.to_string(),
)
.await;
updated_row.id.to_string()
))
}
let results = futures::future::join_all(publish_futures).await;
for result in results {
match result {
Ok(m) => {
ids.push(&updated_row.id);
println!(
"insert success with number changed {:?} @{:?}",
m,
Utc::now()
);
ids.push(m);
// println!(
// "insert success with number changed {:?} @{:?}",
// m,
// Utc::now()
// );
}
Err(e) => {
println!("publish failed {:?}", e);
Expand All @@ -94,15 +97,13 @@ impl MessageProcessor {
}
}

println!("finished the loop for publish now delete published from DB");
// println!("finished the loop for publish now delete published from DB");
if ids.len() > 0 {
let outcome = &self.data_store.delete_fired( &ids.join(",")).await.unwrap();
println!("delete fired id {:?} @{:?} outcome {outcome}", &ids, Utc::now());
} else {
println!("no more processing {}", Utc::now().to_rfc3339());
let outcome = &self.data_store.delete_fired( &ids).await.unwrap();
// println!("delete fired id {:?} @{:?} outcome {outcome}", &ids, Utc::now());
}

println!("after the delete statement");
//println!("after the delete statement");

// if let Ok(messages) =
// {
Expand Down
42 changes: 19 additions & 23 deletions src/message_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ use serde_json::json;

use std::str::{from_utf8, FromStr};
use std::sync::Arc;
use std::time::Instant;
use rdkafka::message::{BorrowedMessage, Message};
use crate::kafka::consumer::KafkaConsumer;
use crate::kafka::producer::KafkaProducer;
use crate::postgres::pg::{Pg, TableInsertRow};
use crate::utils::util::{headers_check, required_headers, CHRONOS_ID, DEADLINE};
use crate::utils::util::{headers_check, required_headers, CHRONOS_ID, DEADLINE, get_payload_utf8, get_message_key};

pub struct MessageReceiver {
pub(crate) consumer: Arc< Box<KafkaConsumer>>,
Expand All @@ -34,8 +35,12 @@ impl MessageReceiver {
println!("Receiver ON!");
&self.consumer.subscribe().await;
// for _n in 0..100 {
let mut total_count = 0;
let mut direct_sent_count = 0;
let mut db_insert_count = 0;
loop {
if let Ok(message) = &self.consumer.consume_message().await {
total_count = total_count+1;
if headers_check(&message.headers().unwrap()) {
let new_message = &message;
let headers = required_headers(&new_message).expect("parsing headers failed");
Expand All @@ -44,48 +49,39 @@ impl MessageReceiver {
.expect("String date parsing failed");

if message_deadline <= Utc::now() {
//TODO: missing check the DB is the entry is present and mark it readied


let payload = new_message
.payload_view::<str>()
.expect("parsing payload failed")
.unwrap()
.to_string();
let message_key =
from_utf8(new_message.key().expect("no message Key found"))
.unwrap()
.to_string();
direct_sent_count = direct_sent_count + 1;
let string_payload = String::from_utf8_lossy(get_payload_utf8(new_message)).to_string();
let message_key = get_message_key(new_message) ;
let _outcome = &self
.producer
.publish(payload, Some(headers), message_key)
.publish(string_payload, Some(headers), message_key, "same id".to_string())
.await
.expect("Publish failed for received message");
} else {
db_insert_count = db_insert_count + 1;
let chronos_message_id = &headers[CHRONOS_ID];

let payload = new_message
.payload_view::<str>()
.expect("parsing payload failed")
.unwrap()
.to_string();
let message_key =
from_utf8(new_message.key().expect("no message Key found")).unwrap();
let payload =get_payload_utf8(new_message);

let message_key = get_message_key(new_message);

let params = TableInsertRow {
id: &*chronos_message_id,
deadline: message_deadline,
message_headers: &json!(&headers),
message_key,
message_value: &json!(&payload),
message_key: message_key.as_str(),
message_value: &serde_json::from_slice(&payload).expect("de-ser failed for payload"),
};
let insert_time = Instant::now();
self.data_store.insert_to_delay(&params).await.expect("TODO: panic message");
// println!("insert took: {:?}", insert_time.elapsed())

}
} else {
warn!("message with improper headers on inbox.topic ");
//TODO: ignore
}
// println!("{direct_sent_count} messages sent directly and {db_insert_count} added to db from total of {total_count} ");
}

// println!("commit received message {:?}", new_message);
Expand Down
9 changes: 5 additions & 4 deletions src/monitor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::sync::Arc;
use std::thread;
use chrono::{Duration as chrono_duration, Utc};
use log::{error, info};
use std::time::Duration;
Expand All @@ -17,12 +18,12 @@ impl FailureDetector {


let fetched_rows = &self.data_store.failed_to_fire(
Utc::now() + chrono_duration::seconds(100)
Utc::now() - chrono_duration::seconds(10)
)
.await.unwrap();

let _id_list = &self.data_store.reset_to_init( fetched_rows);

if fetched_rows.len() > 0 {
let _id_list = &self.data_store.reset_to_init( fetched_rows).await.unwrap();
}
//TODO Log the list of id's that failed to fire and were re-sent to the init state

}
Expand Down
Loading

0 comments on commit ea4e897

Please sign in to comment.