Skip to content

Commit

Permalink
fix: enter env vars to the readme
Browse files Browse the repository at this point in the history
  • Loading branch information
Amninder Kaur committed Oct 19, 2023
1 parent 846f987 commit ab21777
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 21 deletions.
2 changes: 2 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,5 @@ PROCESSOR_DB_POLL=10
# TRACING
OTEL_SERVICE_NAME=chronos
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://localhost:4318/v1/traces
OTEL_EXPORTER_OTLP_PROTOCOL="http/json"
OTEL_TRACES_EXPORTER=otlp
13 changes: 10 additions & 3 deletions How-to.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ All the required configurations for Chronos can be passed in environment variabl

### Required Vars
|Env Var|Example Value|
|----|----|----|
|----|----|
|KAFKA_HOST|"localhost"
|KAFKA_PORT|9093
| KAFKA_CLIENT_ID|"chronos"
Expand All @@ -45,14 +45,21 @@ All the required configurations for Chronos can be passed in environment variabl
### Optional Vars
These values are set to fine tune performance Chrono in need, refer to [Chronos](./README.md)
|Env Var| Default Value|
|----|----|----|
|----|----|
| MONITOR_DB_POLL|5 sec
| PROCESSOR_DB_POLL|5 milli sec
| TIMING_ADVANCE|0 sec
| FAIL_DETECT_INTERVAL|10 sec



## Observability
At this time Chronos supports Http protocol based connectivity to the Otel collector. By providing following env variables for connecting to the Otel collector instance, traces will appear under the service name mentioned.
|Env var| Default Value|
|---|--|
| OTEL_SERVICE_NAME|Chronos|
| OTEL_TRACES_EXPORTER|otlp|
| OTEL_EXPORTER_OTLP_TRACES_ENDPOINT|"http://localhost:4318/v1/traces"
| OTEL_EXPORTER_OTLP_PROTOCOL|"http/json"

## Chronos Images
Two images are published for each [RELEASE]( `https://github.com/kindredgroup/chronos/pkgs/container/chronos`)
Expand Down
6 changes: 4 additions & 2 deletions chronos_bin/src/bin/chronos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use chronos_bin::kafka::producer::KafkaProducer;
use chronos_bin::postgres::config::PgConfig;
use chronos_bin::postgres::pg::Pg;
use chronos_bin::runner::Runner;
use chronos_bin::telemetry::register_telemetry::TelemetryCollector;
use chronos_bin::telemetry::register_telemetry::{TelemetryCollector, TelemetryCollectorType};
use log::debug;
use std::sync::Arc;

Expand All @@ -13,7 +13,9 @@ async fn main() {
env_logger::init();
dotenvy::dotenv().ok();

let tracing_opentelemetry = TelemetryCollector::new();
let protocol = std::env::var("TELEMETRY_PROTOCOL").unwrap_or_else(|_| "http/json".to_string());

let tracing_opentelemetry = TelemetryCollector::new(protocol, TelemetryCollectorType::Otlp);
tracing_opentelemetry.register_traces();

let kafka_config = KafkaConfig::from_env();
Expand Down
8 changes: 4 additions & 4 deletions chronos_bin/src/message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl MessageProcessor {
}
}

#[tracing::instrument(skip_all, fields(node_id, chronos_id, is_published, error))]
#[tracing::instrument(skip_all, fields(node_id, correlationId, is_published, error))]
async fn processor_message_ready(&self, params: &GetReady) {
loop {
let max_retry_count = 3;
Expand Down Expand Up @@ -76,7 +76,7 @@ impl MessageProcessor {
tracing::Span::current().record("node_id", &readied_by);
headers.insert("readied_by".to_string(), readied_by);

tracing::Span::current().record("chronos_id", updated_row.id.to_string());
tracing::Span::current().record("correlationId", updated_row.id.to_string());

publish_futures.push(self.producer.kafka_publish(
updated_row.message_value.to_string(),
Expand Down Expand Up @@ -143,7 +143,7 @@ impl MessageProcessor {
// let mut row_id: Option<String> = None;
}

#[tracing::instrument(skip_all, fields(chronos_ids_deleted))]
#[tracing::instrument(skip_all, fields(correlationId))]
async fn clean_db(&self, ids: Vec<String>) {
//rety in case delete fails
let max_retries = 3;
Expand All @@ -152,7 +152,7 @@ impl MessageProcessor {
if retry_count < max_retries {
match &self.data_store.delete_fired_db(&ids).await {
Ok(_) => {
tracing::Span::current().record("chronos_ids_deleted", ids.join(","));
tracing::Span::current().record("correlationId", ids.join(","));
break;
}
Err(e) => {
Expand Down
12 changes: 6 additions & 6 deletions chronos_bin/src/message_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ pub struct MessageReceiver {
}

impl MessageReceiver {
#[instrument(skip_all, fields(chronos_id))]
#[instrument(skip_all, fields(correlationId))]
pub async fn receiver_publish_to_kafka(&self, new_message: &BorrowedMessage<'_>, headers: HashMap<String, String>) {
let string_payload = String::from_utf8_lossy(get_payload_utf8(new_message)).to_string();
let message_key = get_message_key(new_message);
tracing::Span::current().record("chronos_id", &message_key);
tracing::Span::current().record("correlationId", &message_key);
let outcome = &self.producer.kafka_publish(string_payload, Some(headers), message_key.to_string()).await;
match outcome {
Ok(_) => {
Expand All @@ -37,7 +37,7 @@ impl MessageReceiver {
}
}

#[instrument(skip_all, fields(chronos_id))]
#[instrument(skip_all, fields(correlationId))]
pub async fn receiver_insert_to_db(&self, new_message: &BorrowedMessage<'_>, headers: HashMap<String, String>, deadline: DateTime<Utc>) {
let result_value = &serde_json::from_slice(get_payload_utf8(new_message));
let payload = match result_value {
Expand All @@ -49,7 +49,7 @@ impl MessageReceiver {
};

let message_key = get_message_key(new_message);
tracing::Span::current().record("chronos_id", &message_key);
tracing::Span::current().record("correlationId", &message_key);

let params = TableInsertRow {
id: &headers[CHRONOS_ID],
Expand Down Expand Up @@ -80,13 +80,13 @@ impl MessageReceiver {
}
}

#[tracing::instrument(name = "receiver_handle_message", skip_all, fields(chronos_id))]
#[tracing::instrument(name = "receiver_handle_message", skip_all, fields(correlationId))]
pub async fn handle_message(&self, message: &BorrowedMessage<'_>) {
if headers_check(message.headers().unwrap()) {
let new_message = &message;

if let Some(headers) = required_headers(new_message) {
tracing::Span::current().record("chronos_id", &headers[CHRONOS_ID]);
tracing::Span::current().record("correlationId", &headers[CHRONOS_ID]);
let message_deadline: DateTime<Utc> = match DateTime::<Utc>::from_str(&headers[DEADLINE]) {
Ok(d) => d,
Err(e) => {
Expand Down
2 changes: 2 additions & 0 deletions chronos_bin/src/telemetry/otlp_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ impl OtlpCollector {
}

pub fn http_collector_connect(&self, protocol: Protocol) -> Result<sdktrace::Tracer, TraceError> {
// service name will be picked from "OTEL_SERVICE_NAME" env variable

if let Ok(trace_exporter) = std::env::var("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT") {
global::set_text_map_propagator(TraceContextPropagator::new());
opentelemetry_otlp::new_pipeline()
Expand Down
22 changes: 16 additions & 6 deletions chronos_bin/src/telemetry/register_telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,38 @@ pub enum TelemetryCollectorType {

pub struct TelemetryCollector {
pub collector_type: TelemetryCollectorType,
pub protocol: Protocol,
}

impl Default for TelemetryCollector {
fn default() -> Self {
TelemetryCollector {
collector_type: TelemetryCollectorType::Otlp,
protocol: Protocol::HttpBinary,
}
}
}

impl TelemetryCollector {
pub fn new() -> Self {
TelemetryCollector::default()
pub fn new(env_protocol: String, collector_type: TelemetryCollectorType) -> Self {
let protocol = if env_protocol.to_lowercase().contains("grpc") {
Protocol::Grpc
} else {
Protocol::HttpBinary
};
TelemetryCollector { collector_type, protocol }
}

pub fn register_traces(self) {
let tracer = match &self.collector_type {
TelemetryCollectorType::Jaegar => instrument_jaegar_pipleline(),
TelemetryCollectorType::Otlp => {
let otlp_collector = OtlpCollector::new();
otlp_collector.http_collector_connect(Protocol::HttpBinary)
}
TelemetryCollectorType::Otlp => match self.protocol {
Protocol::Grpc => todo!(),
Protocol::HttpBinary => {
let otlp_collector = OtlpCollector::new();
otlp_collector.http_collector_connect(Protocol::HttpBinary)
}
},
};

match tracer {
Expand Down

0 comments on commit ab21777

Please sign in to comment.