Skip to content

Commit

Permalink
chore: add tags
Browse files Browse the repository at this point in the history
  • Loading branch information
Amninder Kaur committed Sep 5, 2023
1 parent 9e880dd commit bc9b0b9
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 24 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ lint:
$(call pp,lint rust...)
cargo check
cargo fmt -- --check
cargo clippy --all-targets -- -D warnings
cargo clippy --all-targets

## test.unit: 🧪 Runs unit tests
test.unit:
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[![Build Status](https://img.shields.io/github/actions/workflow/status/kindredgroup/chronos?branch=master)](https://github.com/kindredgroup/chronos/actions/workflows/build.yml)
[![Build Status](https://img.shields.io/github/actions/workflow/status/kindredgroup/chronos/build.yml?branch=master)](https://github.com/kindredgroup/chronos/actions/workflows/build.yml)
[![codecov](https://img.shields.io/codecov/c/github/kindredgroup/chronos/master?style=flat-square&logo=codecov)](https://app.codecov.io/gh/kindredgroup/chronos)
# Chronos
Chronos (the personification of time in early Greek philosophy) is a persistent time-delay scheduler. It's generic enough to schedule any type of task with a user-specified time delay.
Expand Down
2 changes: 1 addition & 1 deletion chronos_bin/src/message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl MessageProcessor {

let headers: HashMap<String, String> = match serde_json::from_str(&updated_row.message_headers.to_string()) {
Ok(t) => t,
Err(e) => {
Err(_e) => {
println!("error occurred while parsing");
HashMap::new()
}
Expand Down
18 changes: 9 additions & 9 deletions chronos_bin/src/message_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ pub struct MessageReceiver {
}

impl MessageReceiver {
pub fn new(consumer: Arc<KafkaConsumer>, producer: Arc<KafkaProducer>, data_store: Arc<Pg>) -> Self {
Self {
consumer,
producer,
data_store,
}
}
// pub fn new(consumer: Arc<KafkaConsumer>, producer: Arc<KafkaProducer>, data_store: Arc<Pg>) -> Self {
// Self {
// consumer,
// producer,
// data_store,
// }
// }
pub async fn run(&self) {
println!("Receiver ON!");
let _ = &self.consumer.subscribe().await;
Expand All @@ -35,9 +35,9 @@ impl MessageReceiver {
loop {
if let Ok(message) = &self.consumer.consume_message().await {
total_count += 1;
if headers_check(&message.headers().unwrap()) {
if headers_check(message.headers().unwrap()) {
let new_message = &message;
let headers = required_headers(&new_message).expect("parsing headers failed");
let headers = required_headers(new_message).expect("parsing headers failed");
let message_deadline: DateTime<Utc> = DateTime::<Utc>::from_str(&headers[DEADLINE]).expect("String date parsing failed");

if message_deadline <= Utc::now() {
Expand Down
2 changes: 1 addition & 1 deletion chronos_bin/src/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ impl FailureDetector {

match &self.data_store.failed_to_fire(Utc::now() - chrono_duration::seconds(10)).await {
Ok(fetched_rows) => {
if fetched_rows.len() > 0 {
if !fetched_rows.is_empty() {
if let Err(e) = &self.data_store.reset_to_init(fetched_rows).await {
println!("error in monitor reset_to_init {}", e);
}
Expand Down
16 changes: 6 additions & 10 deletions chronos_bin/src/postgres/pg.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use deadpool_postgres::{Config, GenericClient, ManagerConfig, Object, Pool, PoolConfig, Runtime};
use log::error;
use log::kv::Source;
use serde_json::{json, Value};
use std::time::{Duration, Instant};
use tokio_postgres::types::ToSql;
use tokio_postgres::{Client, NoTls, Row};
use tokio_postgres::{NoTls, Row};
use uuid::Uuid;

use crate::postgres::config::PgConfig;
Expand Down Expand Up @@ -105,7 +102,6 @@ impl Pg {

impl Pg {
pub(crate) async fn insert_to_delay(&self, params: &TableInsertRow<'_>) -> Result<u64, PgError> {
let get_client_instant = Instant::now();
let pg_client = self.get_client().await?;
let insert_query = "INSERT INTO hanger (id, deadline, message_headers, message_key, message_value)
VALUES ($1, $2 ,$3, $4, $5 )";
Expand Down Expand Up @@ -147,8 +143,8 @@ impl Pg {
for i in 0..ids.len() {
query = query + "$" + (i + 1).to_string().as_str() + ",";
}
query = query.strip_suffix(",").unwrap().to_string();
query = query + ")";
query = query.strip_suffix(',').unwrap().to_string();
query += ")";
// println!("query {}", query);
//let sql = format!("DELETE FROM hanger WHERE id IN ({})", ids);
let stmt = pg_client.prepare(query.as_str()).await?;
Expand All @@ -160,7 +156,7 @@ impl Pg {
Ok(response)
}

pub(crate) async fn ready_to_fire(&self, params: &Vec<GetReady>) -> Result<Vec<Row>, PgError> {
pub(crate) async fn ready_to_fire(&self, params: &[GetReady]) -> Result<Vec<Row>, PgError> {
let pg_client = self.get_client().await?;

// println!("readying_update DB");
Expand Down Expand Up @@ -235,8 +231,8 @@ impl Pg {
for i in 0..id_list.len() {
query = query + "$" + (i + 1).to_string().as_str() + ",";
}
query = query.strip_suffix(",").unwrap().to_string();
query = query + ")";
query = query.strip_suffix(',').unwrap().to_string();
query += ")";
// println!("reset query {}", query);
let stmt = pg_client.prepare(query.as_str()).await?;

Expand Down
2 changes: 1 addition & 1 deletion scripts/pre-commit-checks.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/bash
set -e

# make withenv RECIPE=lint
make withenv RECIPE=lint
make withenv RECIPE=test.unit


0 comments on commit bc9b0b9

Please sign in to comment.