From 948d1e53ac32e3d97d4350ce3634a9174db53c84 Mon Sep 17 00:00:00 2001 From: Amninder Kaur Date: Wed, 18 Oct 2023 12:41:44 +1100 Subject: [PATCH] chore: add otlp support for chronos --- .env | 8 +- .env.example | 12 +- Cargo.lock | 390 +++++++++--------- Cargo.toml | 6 +- How-to.md | 49 ++- Makefile | 5 + chronos_bin/Cargo.toml | 5 + chronos_bin/src/bin/chronos.rs | 4 + chronos_bin/src/kafka/config.rs | 36 +- chronos_bin/src/kafka/consumer.rs | 10 +- chronos_bin/src/kafka/producer.rs | 12 +- chronos_bin/src/kafka/utils.rs | 141 ------- chronos_bin/src/lib.rs | 1 + chronos_bin/src/message_processor.rs | 217 +++++----- chronos_bin/src/message_receiver.rs | 169 ++++---- chronos_bin/src/monitor.rs | 48 +-- chronos_bin/src/postgres/pg.rs | 57 +-- chronos_bin/src/runner.rs | 4 - chronos_bin/src/telemetry/jaegar_backend.rs | 12 + chronos_bin/src/telemetry/mod.rs | 3 + chronos_bin/src/telemetry/otlp_collector.rs | 38 ++ .../src/telemetry/register_telemetry.rs | 55 +++ chronos_bin/src/utils/util.rs | 25 +- docker-compose.yml | 59 +-- examples/chronos_ex/Cargo.toml | 9 +- examples/chronos_ex/examples/chronos_ex.rs | 70 ++-- .../examples/telemetry_async_simple.rs | 75 ++++ .../chronos_ex/examples/telemetry_simple.rs | 109 ++++- otel-collector-config.yaml | 36 ++ 29 files changed, 923 insertions(+), 742 deletions(-) delete mode 100644 chronos_bin/src/kafka/utils.rs create mode 100644 chronos_bin/src/telemetry/jaegar_backend.rs create mode 100644 chronos_bin/src/telemetry/mod.rs create mode 100644 chronos_bin/src/telemetry/otlp_collector.rs create mode 100644 chronos_bin/src/telemetry/register_telemetry.rs create mode 100644 examples/chronos_ex/examples/telemetry_async_simple.rs create mode 100644 otel-collector-config.yaml diff --git a/.env b/.env index 713ff15..75f421f 100644 --- a/.env +++ b/.env @@ -5,8 +5,8 @@ RUST_VERSION=stable # KAFKA -# KAFKA_BROKERS="localhost:9093\,$LOCAL_HOST_IP:9094" -KAFKA_BROKERS="localhost:9092" +KAFKA_HOST="localhost\,$LOCAL_HOST_IP" +KAFKA_PORT=9093 KAFKA_CLIENT_ID="chronos" KAFKA_GROUP_ID="chronos" KAFKA_IN_TOPIC="chronos.in" @@ -35,3 +35,7 @@ TIMING_ADVANCE=0 FAIL_DETECT_INTERVAL=500 MAX_RETRIES=3 PROCESSOR_DB_POLL=10 + +# TRACING +OTEL_SERVICE_NAME=chronos +OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://localhost:4318/v1/traces diff --git a/.env.example b/.env.example index 1c5f647..e74c43a 100644 --- a/.env.example +++ b/.env.example @@ -5,8 +5,8 @@ LOCAL_HOST_IP=$(ifconfig en0 | grep inet | grep -v inet6 | awk '{print $2}') RUST_VERSION=stable # KAFKA -KAFKA_BROKERS="localhost:9093\,$LOCAL_HOST_IP:9094" -#KAFKA_BROKERS="localhost:9093" +KAFKA_HOST="localhost\,$LOCAL_HOST_IP" +KAFKA_PORT="9094" KAFKA_CLIENT_ID="chronos" KAFKA_GROUP_ID="chronos" KAFKA_IN_TOPIC="chronos.in" @@ -25,7 +25,7 @@ PG_DATABASE=chronos_db PG_POOL_SIZE=50 # CONFIG -RUST_LOG=debug +RUST_LOG=info #APP DELAY_TIME=0 @@ -34,4 +34,8 @@ MONITOR_DB_POLL=5 TIMING_ADVANCE=0 FAIL_DETECT_INTERVAL=500 MAX_RETRIES=3 -PROCESSOR_DB_POLL=10 \ No newline at end of file +PROCESSOR_DB_POLL=10 + +# TRACING +OTEL_SERVICE_NAME=chronos +OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://localhost:4318/v1/traces \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 111e852..b1ee408 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -19,9 +19,9 @@ checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" [[package]] name = "aho-corasick" -version = "1.0.5" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c378d78423fdad8089616f827526ee33c19f2fddbd5de1629152c9593ba4783" +checksum = "b2969dcb958b36655471fc61f7e416fa76033bdd4bfed0678d8fee1e2d07a1f0" dependencies = [ "memchr", ] @@ -43,9 +43,9 @@ dependencies = [ [[package]] name = "anstream" -version = "0.5.0" +version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1f58811cfac344940f1a400b6e6231ce35171f614f26439e80f8c1465c5cc0c" +checksum = "2ab91ebe16eb252986481c5b62f6098f3b698a45e34b5b98200cf20dd2484a44" dependencies = [ "anstyle", "anstyle-parse", @@ -57,15 +57,15 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.2" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15c4c2c83f81532e5845a733998b6971faca23490340a418e9b72a3ec9de12ea" +checksum = "7079075b41f533b8c61d2a4d073c4676e1f8b249ff94a393b0595db304e0dd87" [[package]] name = "anstyle-parse" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "938874ff5980b03a87c5524b3ae5b59cf99b1d6bc836848df7bc5ada9643c333" +checksum = "317b9a89c1868f5ea6ff1d9539a69f45dffc21ce321ac1fd1160dfa48c8e2140" dependencies = [ "utf8parse", ] @@ -81,9 +81,9 @@ dependencies = [ [[package]] name = "anstyle-wincon" -version = "2.1.0" +version = "3.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58f54d10c6dfa51283a066ceab3ec1ab78d13fae00aa49243a45e4571fb79dfd" +checksum = "f0699d10d2f4d628a98ee7b57b289abbc98ff3bad977cb3152709d4bf2330628" dependencies = [ "anstyle", "windows-sys", @@ -109,13 +109,13 @@ checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" [[package]] name = "async-trait" -version = "0.1.73" +version = "0.1.74" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" +checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.31", + "syn 2.0.38", ] [[package]] @@ -203,9 +203,9 @@ checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" [[package]] name = "base64" -version = "0.21.3" +version = "0.21.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "414dcefbc63d77c526a76b3afcf6fbb9b5e2791c19c3aa2297733208750c6e53" +checksum = "9ba43ea6f343b788c8764558649e08df62f86c6ef251fdaeb1ffd010a9ae50a2" [[package]] name = "bitflags" @@ -215,9 +215,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.4.0" +version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635" +checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07" [[package]] name = "blake2b_simd" @@ -241,21 +241,21 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.13.0" +version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3e2c3daef883ecc1b5d58c15adae93470a91d425f3532ba1695849656af3fc1" +checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" [[package]] name = "byteorder" -version = "1.4.3" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.4.0" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" +checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" [[package]] name = "cargo-husky" @@ -280,15 +280,14 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.28" +version = "0.4.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95ed24df0632f708f5f6d8082675bef2596f7084dee3dd55f632290bf35bfe0f" +checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" dependencies = [ "android-tzdata", "iana-time-zone", "js-sys", "num-traits", - "time 0.1.45", "wasm-bindgen", "windows-targets", ] @@ -311,6 +310,9 @@ dependencies = [ "openssl", "opentelemetry", "opentelemetry-jaeger", + "opentelemetry-otlp", + "opentelemetry_api", + "opentelemetry_sdk", "rand", "rdkafka", "refinery", @@ -336,10 +338,11 @@ dependencies = [ "futures", "log", "opentelemetry", - "opentelemetry-http", "opentelemetry-jaeger", "opentelemetry-otlp", "opentelemetry-stdout", + "opentelemetry_api", + "opentelemetry_sdk", "tokio", "tracing", "tracing-opentelemetry", @@ -348,9 +351,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.4.2" +version = "4.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a13b88d2c62ff462f88e4a121f17a82c1af05693a2f192b5c38d14de73c19f6" +checksum = "d04704f56c2cde07f43e8e2c154b43f216dc5c92fc98ada720177362f953b956" dependencies = [ "clap_builder", "clap_derive", @@ -358,9 +361,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.4.2" +version = "4.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bb9faaa7c2ef94b2743a21f5a29e6f0010dff4caa69ac8e9d6cf8b6fa74da08" +checksum = "0e231faeaca65ebd1ea3c737966bf858971cd38c3849107aa3ea7de90a804e45" dependencies = [ "anstream", "anstyle", @@ -377,7 +380,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.31", + "syn 2.0.38", ] [[package]] @@ -468,7 +471,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" dependencies = [ "cfg-if", - "hashbrown 0.14.0", + "hashbrown 0.14.1", "lock_api", "once_cell", "parking_lot_core", @@ -501,18 +504,21 @@ dependencies = [ [[package]] name = "deadpool-runtime" -version = "0.1.2" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eaa37046cc0f6c3cc6090fbdbf73ef0b8ef4cfcc37f6befc0020f63e8cf121e1" +checksum = "63dfa964fe2a66f3fde91fc70b267fe193d822c7e603e2a675a49a7f46ad3f49" dependencies = [ "tokio", ] [[package]] name = "deranged" -version = "0.3.8" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2696e8a945f658fd14dc3b87242e6b80cd0f36ff04ea560fa39082368847946" +checksum = "0f32d04922c60427da6f9fef14d042d9edddef64cb9d4ce0d64d0685fbeb1fd3" +dependencies = [ + "powerfmt", +] [[package]] name = "digest" @@ -618,25 +624,14 @@ dependencies = [ [[package]] name = "errno" -version = "0.3.3" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "136526188508e25c6fef639d7927dfb3e0e3084488bf202267829cf7fc23dbdd" +checksum = "ac3e13f66a2f95e32a39eaa81f6b95d42878ca0e1db0c7543723dfe12557e860" dependencies = [ - "errno-dragonfly", "libc", "windows-sys", ] -[[package]] -name = "errno-dragonfly" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf" -dependencies = [ - "cc", - "libc", -] - [[package]] name = "fallible-iterator" version = "0.2.0" @@ -735,7 +730,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.31", + "syn 2.0.38", ] [[package]] @@ -833,9 +828,9 @@ checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" [[package]] name = "hashbrown" -version = "0.14.0" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a" +checksum = "7dfda62a12f55daeae5015f81b0baea145391cb4520f86c248fc615d72640d12" [[package]] name = "heck" @@ -854,9 +849,9 @@ dependencies = [ [[package]] name = "hermit-abi" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "443144c8cdadd93ebf52ddb4056d257f5b52c04d3c804e657d19eb73fc33668b" +checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" [[package]] name = "hmac" @@ -988,12 +983,12 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.0.0" +version = "2.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d" +checksum = "8adf3ddd720272c6ea8bf59463c04e0f93d0bbf7c5439b691bca2987e0270897" dependencies = [ "equivalent", - "hashbrown 0.14.0", + "hashbrown 0.14.1", ] [[package]] @@ -1014,7 +1009,7 @@ version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b" dependencies = [ - "hermit-abi 0.3.2", + "hermit-abi 0.3.3", "rustix", "windows-sys", ] @@ -1051,9 +1046,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.147" +version = "0.2.149" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" +checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b" [[package]] name = "libz-sys" @@ -1069,9 +1064,9 @@ dependencies = [ [[package]] name = "linux-raw-sys" -version = "0.4.5" +version = "0.4.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57bcfdad1b858c2db7c38303a6d2ad4dfaf5eb53dfeb0910128b2c26d6158503" +checksum = "da2479e8c062e40bf0066ffa0bc823de0a9368974af99c9f6df941d2c231e03f" [[package]] name = "lock_api" @@ -1101,18 +1096,19 @@ checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" [[package]] name = "md-5" -version = "0.10.5" +version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6365506850d44bff6e2fbcb5176cf63650e48bd45ef2fe2665ae1570e0f4b9ca" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" dependencies = [ + "cfg-if", "digest", ] [[package]] name = "memchr" -version = "2.6.3" +version = "2.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f232d6ef707e1956a43342693d2a31e72989554d58299d7a88738cc95b0d35c" +checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" [[package]] name = "mime" @@ -1152,9 +1148,9 @@ dependencies = [ [[package]] name = "num-traits" -version = "0.2.16" +version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f30b0abd723be7e2ffca1272140fac1a2f084c77ec3e123c192b66af1ee9e6c2" +checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c" dependencies = [ "autocfg", ] @@ -1165,7 +1161,7 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" dependencies = [ - "hermit-abi 0.3.2", + "hermit-abi 0.3.3", "libc", ] @@ -1211,7 +1207,7 @@ version = "0.10.57" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bac25ee399abb46215765b1cb35bc0212377e58a061560d8b29b024fd0430e7c" dependencies = [ - "bitflags 2.4.0", + "bitflags 2.4.1", "cfg-if", "foreign-types", "libc", @@ -1228,7 +1224,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.31", + "syn 2.0.38", ] [[package]] @@ -1332,7 +1328,7 @@ dependencies = [ "futures-util", "opentelemetry_api", "opentelemetry_sdk", - "ordered-float 3.9.1", + "ordered-float 3.9.2", "serde", "serde_json", ] @@ -1366,7 +1362,7 @@ dependencies = [ "futures-util", "once_cell", "opentelemetry_api", - "ordered-float 3.9.1", + "ordered-float 3.9.2", "percent-encoding", "rand", "regex", @@ -1378,18 +1374,18 @@ dependencies = [ [[package]] name = "ordered-float" -version = "2.10.0" +version = "2.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7940cf2ca942593318d07fcf2596cdca60a85c9e7fab408a5e21a4f9dcd40d87" +checksum = "68f19d67e5a2795c94e73e0bb1cc1a7edeb2e28efd39e2e1c9b7a40c1108b11c" dependencies = [ "num-traits", ] [[package]] name = "ordered-float" -version = "3.9.1" +version = "3.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a54938017eacd63036332b4ae5c8a49fc8c0c1d6d629893057e4f13609edd06" +checksum = "f1e1c390732d15f1d48471625cd92d154e66db2c56645e29a9cd26f4699f72dc" dependencies = [ "num-traits", ] @@ -1465,7 +1461,7 @@ version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90fcb95eef784c2ac79119d1dd819e162b5da872ce6f3c3abe1e8ca1c082f72b" dependencies = [ - "siphasher", + "siphasher 0.3.11", ] [[package]] @@ -1485,7 +1481,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.31", + "syn 2.0.38", ] [[package]] @@ -1512,7 +1508,7 @@ version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49b6c5ef183cd3ab4ba005f1ca64c21e8bd97ce4699cfea9e8d9a2c4958ca520" dependencies = [ - "base64 0.21.3", + "base64 0.21.4", "byteorder", "bytes", "fallible-iterator", @@ -1539,6 +1535,12 @@ dependencies = [ "uuid", ] +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -1557,9 +1559,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.66" +version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9" +checksum = "134c189feb4956b20f6f547d2cf727d4c0fe06722b20a0eec87ed445a97f92da" dependencies = [ "unicode-ident", ] @@ -1686,9 +1688,9 @@ dependencies = [ [[package]] name = "refinery" -version = "0.8.10" +version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cdb0436d0dd7bd8d4fce1e828751fa79742b08e35f27cfea7546f8a322b5ef24" +checksum = "529664dbccc0a296947615c997a857912d72d1c44be1fafb7bae54ecfa7a8c24" dependencies = [ "refinery-core", "refinery-macros", @@ -1696,9 +1698,9 @@ dependencies = [ [[package]] name = "refinery-core" -version = "0.8.10" +version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19206547cd047e8f4dfa6b20c30d3ecaf24be05841b6aa0aa926a47a3d0662bb" +checksum = "e895cb870cf06e92318cbbeb701f274d022d5ca87a16fa8244e291cd035ef954" dependencies = [ "async-trait", "cfg-if", @@ -1706,9 +1708,9 @@ dependencies = [ "log", "regex", "serde", - "siphasher", + "siphasher 1.0.0", "thiserror", - "time 0.3.28", + "time", "tokio", "tokio-postgres", "toml", @@ -1718,22 +1720,22 @@ dependencies = [ [[package]] name = "refinery-macros" -version = "0.8.10" +version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d94d4b9241859ba19eaa5c04c86e782eb3aa0aae2c5868e0cfa90c856e58a174" +checksum = "123e8b80f8010c3ae38330c81e76938fc7adf6cdbfbaad20295bb8c22718b4f1" dependencies = [ "proc-macro2", "quote", "refinery-core", "regex", - "syn 2.0.31", + "syn 2.0.38", ] [[package]] name = "regex" -version = "1.9.5" +version = "1.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "697061221ea1b4a94a624f67d0ae2bfe4e22b8a17b6a192afb11046542cc8c47" +checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343" dependencies = [ "aho-corasick", "memchr", @@ -1743,9 +1745,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.3.8" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2f401f4955220693b56f8ec66ee9c78abffd8d1c4f23dc41a23839eb88f0795" +checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f" dependencies = [ "aho-corasick", "memchr", @@ -1754,9 +1756,9 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.7.5" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" +checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" [[package]] name = "reqwest" @@ -1764,7 +1766,7 @@ version = "0.11.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "046cd98826c46c2ac8ddecae268eb5c2e58628688a5fc7a2643704a73faba95b" dependencies = [ - "base64 0.21.3", + "base64 0.21.4", "bytes", "encoding_rs", "futures-core", @@ -1819,11 +1821,11 @@ checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" [[package]] name = "rustix" -version = "0.38.11" +version = "0.38.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0c3dde1fc030af041adc40e79c0e7fbcf431dd24870053d187d7c66e4b87453" +checksum = "745ecfa778e66b2b63c88a61cb36e0eea109e803b0b86bf9879fbc77c70e86ed" dependencies = [ - "bitflags 2.4.0", + "bitflags 2.4.1", "errno", "libc", "linux-raw-sys", @@ -1871,22 +1873,22 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "serde" -version = "1.0.188" +version = "1.0.189" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf9e0fcba69a370eed61bcf2b728575f726b50b55cba78064753d708ddc7549e" +checksum = "8e422a44e74ad4001bdc8eede9a4570ab52f71190e9c076d14369f38b9200537" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.188" +version = "1.0.189" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2" +checksum = "1e48d1f918009ce3145511378cf68d613e3b3d9137d67272562080d68a2b32d5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.31", + "syn 2.0.38", ] [[package]] @@ -1900,9 +1902,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.105" +version = "1.0.107" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "693151e1ac27563d6dbcec9dee9fbd5da8539b20fa14ad3752b2e6d363ace360" +checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65" dependencies = [ "itoa", "ryu", @@ -1952,14 +1954,14 @@ checksum = "91d129178576168c589c9ec973feedf7d3126c01ac2bf08795109aa35b69fb8f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.31", + "syn 2.0.38", ] [[package]] name = "sha2" -version = "0.10.7" +version = "0.10.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "479fb9d862239e610720565ca91403019f2f00410f1864c5aa7479b950a76ed8" +checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" dependencies = [ "cfg-if", "cpufeatures", @@ -1968,9 +1970,9 @@ dependencies = [ [[package]] name = "sharded-slab" -version = "0.1.4" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" dependencies = [ "lazy_static", ] @@ -2000,6 +2002,12 @@ version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" +[[package]] +name = "siphasher" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54ac45299ccbd390721be55b412d41931911f654fa99e2cb8bfb57184b2061fe" + [[package]] name = "slab" version = "0.4.9" @@ -2011,9 +2019,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.11.0" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9" +checksum = "942b4a808e05215192e39f4ab80813e599068285906cc91aa64f923db842bd5a" [[package]] name = "socket2" @@ -2027,9 +2035,9 @@ dependencies = [ [[package]] name = "socket2" -version = "0.5.3" +version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2538b18701741680e0322a2302176d3253a35388e2e62f172f64f4f16605f877" +checksum = "4031e820eb552adee9295814c0ced9e5cf38ddf1e8b7d566d6de8e2538ea989e" dependencies = [ "libc", "windows-sys", @@ -2060,15 +2068,15 @@ checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" [[package]] name = "sval" -version = "2.6.1" +version = "2.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b031320a434d3e9477ccf9b5756d57d4272937b8d22cb88af80b7633a1b78b1" +checksum = "e55089b73dfa822e1eb6b635f8795215512cca94bfae11aee3a1a06228bc88bb" [[package]] name = "sval_buffer" -version = "2.6.1" +version = "2.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bf7e9412af26b342f3f2cc5cc4122b0105e9d16eb76046cd14ed10106cf6028" +checksum = "df307823073d63f1fb126895439fead41afc493ea35d636cceedef9f6b32ba81" dependencies = [ "sval", "sval_ref", @@ -2076,18 +2084,18 @@ dependencies = [ [[package]] name = "sval_dynamic" -version = "2.6.1" +version = "2.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0ef628e8a77a46ed3338db8d1b08af77495123cc229453084e47cd716d403cf" +checksum = "e5f8e4c4d6d028d3cbff66c2bb3d98181d031d312b7df4550eea7142d7036f37" dependencies = [ "sval", ] [[package]] name = "sval_fmt" -version = "2.6.1" +version = "2.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7dc09e9364c2045ab5fa38f7b04d077b3359d30c4c2b3ec4bae67a358bd64326" +checksum = "ad53f8eb502b0a3051fea001ae2e3723044699868ebfe06ea81b45545db392c2" dependencies = [ "itoa", "ryu", @@ -2096,9 +2104,9 @@ dependencies = [ [[package]] name = "sval_json" -version = "2.6.1" +version = "2.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ada6f627e38cbb8860283649509d87bc4a5771141daa41c78fd31f2b9485888d" +checksum = "f913253c9f6cd27645ba9a0b6788039b5d4338eae0833c64b42ef178168d2862" dependencies = [ "itoa", "ryu", @@ -2107,18 +2115,18 @@ dependencies = [ [[package]] name = "sval_ref" -version = "2.6.1" +version = "2.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "703ca1942a984bd0d9b5a4c0a65ab8b4b794038d080af4eb303c71bc6bf22d7c" +checksum = "66a9661412d06740ebe81512a527b3d9220460eb7685f4399232c0e670108cb7" dependencies = [ "sval", ] [[package]] name = "sval_serde" -version = "2.6.1" +version = "2.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "830926cd0581f7c3e5d51efae4d35c6b6fc4db583842652891ba2f1bed8db046" +checksum = "b8d077e98c1c8dfa466837ae0ec1e03c78138d42ac75662dac05e1bf0aebae20" dependencies = [ "serde", "sval", @@ -2139,9 +2147,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.31" +version = "2.0.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "718fa2415bcb8d8bd775917a1bf12a7931b6dfa890753378538118181e0cb398" +checksum = "e96b79aaa137db8f61e26363a0c9b47d8b4ec75da28b7d1d614c2303e232408b" dependencies = [ "proc-macro2", "quote", @@ -2188,31 +2196,31 @@ dependencies = [ [[package]] name = "termcolor" -version = "1.2.0" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be55cf8942feac5c765c2c993422806843c9a9a45d4d5c407ad6dd2ea95eb9b6" +checksum = "6093bad37da69aab9d123a8091e4be0aa4a03e4d601ec641c327398315f62b64" dependencies = [ "winapi-util", ] [[package]] name = "thiserror" -version = "1.0.48" +version = "1.0.49" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d6d7a740b8a666a7e828dd00da9c0dc290dff53154ea77ac109281de90589b7" +checksum = "1177e8c6d7ede7afde3585fd2513e611227efd6481bd78d2e82ba1ce16557ed4" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.48" +version = "1.0.49" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49922ecae66cc8a249b77e68d1d0623c1b2c514f0060c27cdc68bd62a1219d35" +checksum = "10712f02019e9288794769fba95cd6847df9874d49d871d062172f9dd41bc4cc" dependencies = [ "proc-macro2", "quote", - "syn 2.0.31", + "syn 2.0.38", ] [[package]] @@ -2243,29 +2251,19 @@ dependencies = [ "byteorder", "integer-encoding", "log", - "ordered-float 2.10.0", + "ordered-float 2.10.1", "threadpool", ] [[package]] name = "time" -version = "0.1.45" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b797afad3f312d1c66a56d11d0316f916356d11bd158fbc6ca6389ff6bf805a" -dependencies = [ - "libc", - "wasi 0.10.0+wasi-snapshot-preview1", - "winapi", -] - -[[package]] -name = "time" -version = "0.3.28" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17f6bb557fd245c28e6411aa56b6403c689ad95061f50e4be16c274e70a17e48" +checksum = "c4a34ab300f2dee6e562c10a046fc05e358b29f9bf92277f30c3c8d82275f6f5" dependencies = [ "deranged", "itoa", + "powerfmt", "serde", "time-core", "time-macros", @@ -2273,15 +2271,15 @@ dependencies = [ [[package]] name = "time-core" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb" +checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" [[package]] name = "time-macros" -version = "0.2.14" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a942f44339478ef67935ab2bbaec2fb0322496cf3cbe84b261e06ac3814c572" +checksum = "4ad70d68dba9e1f8aceda7aa6711965dfec1cac869f311a51bd08b3a2ccbce20" dependencies = [ "time-core", ] @@ -2303,9 +2301,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.32.0" +version = "1.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9" +checksum = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653" dependencies = [ "backtrace", "bytes", @@ -2315,8 +2313,9 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2 0.5.3", + "socket2 0.5.4", "tokio-macros", + "tracing", "windows-sys", ] @@ -2338,7 +2337,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.31", + "syn 2.0.38", ] [[package]] @@ -2361,7 +2360,7 @@ dependencies = [ "postgres-protocol", "postgres-types", "rand", - "socket2 0.5.3", + "socket2 0.5.4", "tokio", "tokio-util", "whoami", @@ -2380,9 +2379,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.8" +version = "0.7.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" +checksum = "1d68074620f57a0b21594d9735eb2e98ab38b17f80d3fcb189fca266771ca60d" dependencies = [ "bytes", "futures-core", @@ -2394,9 +2393,9 @@ dependencies = [ [[package]] name = "toml" -version = "0.7.6" +version = "0.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c17e963a819c331dcacd7ab957d80bc2b9a9c1e71c804826d2f283dd65306542" +checksum = "dd79e69d3b627db300ff956027cc6c3798cef26d22526befdfcd12feeb6d2257" dependencies = [ "serde", "serde_spanned", @@ -2415,11 +2414,11 @@ dependencies = [ [[package]] name = "toml_edit" -version = "0.19.14" +version = "0.19.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8123f27e969974a3dfba720fdb560be359f57b44302d280ba72e76a74480e8a" +checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" dependencies = [ - "indexmap 2.0.0", + "indexmap 2.0.2", "serde", "serde_spanned", "toml_datetime", @@ -2434,7 +2433,7 @@ checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a" dependencies = [ "async-trait", "axum", - "base64 0.21.3", + "base64 0.21.4", "bytes", "futures-core", "futures-util", @@ -2488,11 +2487,10 @@ checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" [[package]] name = "tracing" -version = "0.1.37" +version = "0.1.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" +checksum = "ee2ef2af84856a50c1d430afce2fdded0a4ec7eda868db86409b4543df0797f9" dependencies = [ - "cfg-if", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -2500,20 +2498,20 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.26" +version = "0.1.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" +checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.31", + "syn 2.0.38", ] [[package]] name = "tracing-core" -version = "0.1.31" +version = "0.1.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a" +checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", "valuable", @@ -2568,9 +2566,9 @@ checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" [[package]] name = "typenum" -version = "1.16.0" +version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" +checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" [[package]] name = "unicode-bidi" @@ -2580,9 +2578,9 @@ checksum = "92888ba5573ff080736b3648696b70cafad7d250551175acbaa4e0385b3e1460" [[package]] name = "unicode-ident" -version = "1.0.11" +version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "301abaae475aa91687eb82514b328ab47a211a533026cb25fc3e519b86adfc3c" +checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" [[package]] name = "unicode-normalization" @@ -2635,7 +2633,7 @@ checksum = "f7e1ba1f333bd65ce3c9f27de592fcbc256dafe3af2717f56d7c87761fbaccf4" dependencies = [ "proc-macro2", "quote", - "syn 2.0.31", + "syn 2.0.38", ] [[package]] @@ -2694,9 +2692,9 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" [[package]] name = "walkdir" -version = "2.3.3" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36df944cda56c7d8d8b7496af378e6b16de9284591917d307c9b4d313c44e698" +checksum = "d71d857dc86794ca4c280d616f7da00d2dbfd8cd788846559a6813e6aa4b54ee" dependencies = [ "same-file", "winapi-util", @@ -2717,12 +2715,6 @@ version = "0.9.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" -[[package]] -name = "wasi" -version = "0.10.0+wasi-snapshot-preview1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" - [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" @@ -2750,7 +2742,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.31", + "syn 2.0.38", "wasm-bindgen-shared", ] @@ -2784,7 +2776,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.31", + "syn 2.0.38", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -2833,9 +2825,9 @@ checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" [[package]] name = "winapi-util" -version = "0.1.5" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +checksum = "f29e6f9198ba0d26b4c9f07dbe6f9ed633e1f3d5b8b414090084349e46a52596" dependencies = [ "winapi", ] @@ -2923,9 +2915,9 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" [[package]] name = "winnow" -version = "0.5.15" +version = "0.5.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c2e3184b9c4e92ad5167ca73039d0c42476302ab603e2fec4487511f38ccefc" +checksum = "a3b801d0e0a6726477cc207f60162da452f3a95adb368399bef20a946e06f65c" dependencies = [ "memchr", ] diff --git a/Cargo.toml b/Cargo.toml index 3f69f52..16987fa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,7 @@ members = [ #async futures="0.3.26" async-trait = "0.1.66" -tokio = { version = "1", features = ["full"] } +tokio = { version = "1", features = ["full", "tracing"] } #db @@ -34,8 +34,10 @@ tracing = "0.1" tracing-subscriber = "0.3" tracing-opentelemetry = "0.21.0" -opentelemetry_sdk = { version = "0.20.0", features = ["rt-tokio-current-thread"] } opentelemetry = { version = "0.20.0", features = ["rt-tokio", "trace"]} +opentelemetry_sdk = { version = "0.20.0", features = ["rt-tokio", "trace"]} +opentelemetry_api = { version = "0.20.0"} +# Collector opentelemetry-jaeger = {version="0.19.0", features=["rt-tokio"]} opentelemetry-stdout = { version = "0.1.0", features = ["trace"] } opentelemetry-otlp = { version = "0.13.0", features = ["http-proto", "reqwest-client"] } diff --git a/How-to.md b/How-to.md index 5b1bc51..bbabef9 100644 --- a/How-to.md +++ b/How-to.md @@ -1,6 +1,7 @@ # How to work with Chronos - [How to run Chronos binary](#run-binary) - [How to run Chronos in a docker container](#run-chronos-docker-image) +- [Environment Variables](#env-vars) ## Pre-requisites For starting the delay queue process, Chronos expects a DB in Postgres and two topics one for input and other for publishing the messages after delay to be already created. The names of the topics and DB should be passed as env variables mentioned in [Env vars](#env-vars) @@ -22,31 +23,39 @@ Use `make withenv RECIPE=docker.up` ## ENV vars All the required configurations for Chronos can be passed in environment variables mentioned below -|Env Var|Example Value| Required| + +### Required Vars +|Env Var|Example Value| +|----|----|----| +|KAFKA_HOST|"localhost" +|KAFKA_PORT|9093 +| KAFKA_CLIENT_ID|"chronos" +| KAFKA_GROUP_ID|"chronos" +| KAFKA_IN_TOPIC|"chronos.in" +| KAFKA_OUT_TOPIC|"chronos.out" +| KAFKA_USERNAME| +| KAFKA_PASSWORD| +| PG_HOST|localhost +| PG_PORT|5432 +| PG_USER|admin +| PG_PASSWORD|admin +| PG_DATABASE|chronos_db +| PG_POOL_SIZE|50 + +### Optional Vars +These values are set to fine tune performance Chrono in need, refer to [Chronos](./README.md) +|Env Var| Default Value| |----|----|----| -|KAFKA_BROKERS|"localhost:9093"|True -| KAFKA_CLIENT_ID|"chronos"|True -| KAFKA_GROUP_ID|"chronos"|True -| KAFKA_IN_TOPIC|"chronos.in"|True -| KAFKA_OUT_TOPIC|"chronos.out"|True -| KAFKA_USERNAME||True -| KAFKA_PASSWORD||True -| PG_HOST|localhost|True -| PG_PORT|5432|True -| PG_USER|admin|True -| PG_PASSWORD|admin|True -| PG_DATABASE|chronos_db|True -| PG_POOL_SIZE|50|True -| DELAY_TIME|0|False -| RANDOMNESS_DELAY|100|False -| MONITOR_DB_POLL|5|False -| TIMING_ADVANCE|0|False -| FAIL_DETECT_INTERVAL|500|False +| MONITOR_DB_POLL|5 sec +| PROCESSOR_DB_POLL|5 milli sec +| TIMING_ADVANCE|0 sec +| FAIL_DETECT_INTERVAL|10 sec + ## Chronos Images -Two images are published for each release to `https://github.com/kindredgroup/chronos/pkgs/container/chronos` +Two images are published for each [RELEASE]( `https://github.com/kindredgroup/chronos/pkgs/container/chronos`) - migrations image - chornos image diff --git a/Makefile b/Makefile index e86472f..0e86d69 100644 --- a/Makefile +++ b/Makefile @@ -38,6 +38,10 @@ dev.init: install # $(call pp,creating kafka topic...) # cargo run --example kafka_create_topic +dev.chronos_ex: + $(call pp,creating kafka topic...) + cargo run --example chronos_ex + ## pg.create: 🥁 Create database pg.create: $(call pp,creating database...) @@ -70,6 +74,7 @@ build: dev.run: $(call pp,run app...) cargo watch -q -c -x 'run --package chronos_bin --bin chronos' + ## run: 🧪 Runs rust app run: $(call pp,run app...) diff --git a/chronos_bin/Cargo.toml b/chronos_bin/Cargo.toml index c9e0b25..a3c2dfc 100644 --- a/chronos_bin/Cargo.toml +++ b/chronos_bin/Cargo.toml @@ -57,8 +57,13 @@ tracing.workspace = true tracing-subscriber.workspace = true tracing-opentelemetry.workspace = true opentelemetry.workspace = true +opentelemetry_sdk.workspace = true +opentelemetry_api.workspace = true opentelemetry-jaeger.workspace = true +# opentelemetry-jaeger.workspace = true +opentelemetry-otlp.workspace = true + [dev-dependencies] serial_test.workspace = true diff --git a/chronos_bin/src/bin/chronos.rs b/chronos_bin/src/bin/chronos.rs index fde8bc4..80b8fd3 100644 --- a/chronos_bin/src/bin/chronos.rs +++ b/chronos_bin/src/bin/chronos.rs @@ -4,6 +4,7 @@ use chronos_bin::kafka::producer::KafkaProducer; use chronos_bin::postgres::config::PgConfig; use chronos_bin::postgres::pg::Pg; use chronos_bin::runner::Runner; +use chronos_bin::telemetry::register_telemetry::TelemetryCollector; use log::debug; use std::sync::Arc; @@ -12,6 +13,9 @@ async fn main() { env_logger::init(); dotenvy::dotenv().ok(); + let tracing_opentelemetry = TelemetryCollector::new(); + tracing_opentelemetry.register_traces(); + let kafka_config = KafkaConfig::from_env(); let pg_config = PgConfig::from_env(); diff --git a/chronos_bin/src/kafka/config.rs b/chronos_bin/src/kafka/config.rs index bc5ee1a..a5200cb 100644 --- a/chronos_bin/src/kafka/config.rs +++ b/chronos_bin/src/kafka/config.rs @@ -4,7 +4,8 @@ use std::collections::HashMap; #[derive(Debug)] pub struct KafkaConfig { - pub brokers: Vec, + pub host: Vec, + pub port: String, pub in_topic: String, pub out_topic: String, pub client_id: String, @@ -18,7 +19,8 @@ pub struct KafkaConfig { impl KafkaConfig { pub fn from_env() -> Self { KafkaConfig { - brokers: env_var!("KAFKA_BROKERS", Vec), + host: env_var!("KAFKA_HOST", Vec), + port: env_var!("KAFKA_PORT"), in_topic: env_var!("KAFKA_IN_TOPIC"), out_topic: env_var!("KAFKA_OUT_TOPIC"), client_id: env_var!("KAFKA_CLIENT_ID"), @@ -44,7 +46,13 @@ impl KafkaConfig { let username = self.username.to_owned(); let password = self.password.to_owned(); - let brokers = self.brokers.join(","); + let brokers = &self.host; + let mut brokers = brokers.to_owned(); + for elm in brokers.iter_mut() { + elm.push(':'); + elm.push_str(&self.port); + } + let brokers = brokers.join(","); let mut base_config = HashMap::from([ ("group.id", self.group_id.as_str()), ("bootstrap.servers", brokers.as_str()), @@ -76,7 +84,13 @@ impl KafkaConfig { let username = self.username.to_owned(); let password = self.password.to_owned(); - let brokers = self.brokers.join(","); + let brokers = &self.host; + let mut brokers = brokers.to_owned(); + for elm in brokers.iter_mut() { + elm.push(':'); + elm.push_str(&self.port); + } + let brokers = brokers.join(","); let mut base_config = HashMap::from([ ("message.timeout.ms", "30000"), ("bootstrap.servers", brokers.as_str()), @@ -116,7 +130,8 @@ mod tests { fn get_kafka_env_variables() -> HashMap<&'static str, &'static str> { let env_hashmap = [ - ("KAFKA_BROKERS", "broker1, broker2 "), + ("KAFKA_HOST", "broker1, broker2 "), + ("KAFKA_PORT", "port"), ("KAFKA_IN_TOPIC", "some-topic"), ("KAFKA_OUT_TOPIC", "some-topic"), ("KAFKA_CLIENT_ID", "some-client-id"), @@ -129,7 +144,7 @@ mod tests { fn build_test_kafka_config() -> KafkaConfig { KafkaConfig { - brokers: vec!["broker1".to_string()], + host: vec!["broker1".to_string()], in_topic: "in_topic".to_owned(), out_topic: "out_topic".to_owned(), client_id: "client-id-1".to_string(), @@ -138,6 +153,7 @@ mod tests { password: "password".to_owned(), producer_config_overrides: Default::default(), consumer_config_overrides: Default::default(), + port: "3000".to_string(), } } @@ -151,7 +167,7 @@ mod tests { let config = KafkaConfig::from_env(); assert_eq!(config.client_id, "some-client-id"); - assert_eq!(config.brokers.len(), 2); + assert_eq!(config.host.len(), 2); get_kafka_env_variables().iter().for_each(|(k, _)| { unset_env_var(k); @@ -184,7 +200,7 @@ mod tests { #[test] fn test_passing_credentials_to_build_consumer_config() { let config = KafkaConfig { - brokers: vec!["broker1".to_string()], + host: vec!["broker1".to_string()], in_topic: "consumer-topic-1".to_owned(), out_topic: "consumer-topic-2".to_owned(), client_id: "client-id-1".to_string(), @@ -193,6 +209,7 @@ mod tests { password: "password".to_string(), producer_config_overrides: Default::default(), consumer_config_overrides: Default::default(), + port: "3000".to_string(), }; let client_config = config.build_consumer_config(); assert_eq!(client_config.get("auto.offset.reset").unwrap(), "earliest"); @@ -215,7 +232,7 @@ mod tests { #[test] fn test_passing_overrides() { let mut kafka_config = KafkaConfig { - brokers: vec!["broker1".to_string()], + host: vec!["broker1".to_string()], in_topic: "in_topic".to_owned(), out_topic: "out_topic".to_owned(), client_id: "client-id-1".to_string(), @@ -224,6 +241,7 @@ mod tests { password: "password".to_string(), producer_config_overrides: Default::default(), consumer_config_overrides: Default::default(), + port: "3000".to_string(), }; let producer_override = HashMap::from([("message.timeout.ms", "10")]); let consumer_override = HashMap::from([("auto.offset.reset", "latest")]); diff --git a/chronos_bin/src/kafka/consumer.rs b/chronos_bin/src/kafka/consumer.rs index 46b191c..cf6ae8a 100644 --- a/chronos_bin/src/kafka/consumer.rs +++ b/chronos_bin/src/kafka/consumer.rs @@ -7,7 +7,7 @@ use rdkafka::message::BorrowedMessage; use super::config::KafkaConfig; -use tracing::{info_span, instrument, span, trace, warn}; +use tracing::{instrument, trace, warn}; // Kafka Consumer Client pub struct KafkaConsumer { @@ -54,8 +54,6 @@ impl KafkaConsumer { } pub(crate) async fn subscribe(&self) { - let consumer_span = info_span!("consumer_subscribe"); - let _ = consumer_span.enter(); match &self.consumer.subscribe(&[&self.topic]) { Ok(_) => { info!("subscribed to topic {}", &self.topic); @@ -66,9 +64,9 @@ impl KafkaConsumer { } }; } - pub(crate) async fn consume_message(&self) -> Result { - let consumer_span = info_span!("consume_message"); - let _ = consumer_span.enter(); + + #[instrument(skip(self))] + pub(crate) async fn kafka_consume_message(&self) -> Result { self.consumer.recv().await.map_err(KafkaAdapterError::ReceiveMessage) } } diff --git a/chronos_bin/src/kafka/producer.rs b/chronos_bin/src/kafka/producer.rs index f9cc623..7df7df9 100644 --- a/chronos_bin/src/kafka/producer.rs +++ b/chronos_bin/src/kafka/producer.rs @@ -7,7 +7,7 @@ use rdkafka::producer::{FutureProducer, FutureRecord}; use super::config::KafkaConfig; -use tracing::{instrument, span, trace, warn, Level}; +use tracing::instrument; // Kafka Producer // #[derive(Clone)] @@ -24,12 +24,10 @@ impl KafkaProducer { Self { producer, topic } } - pub(crate) async fn publish(&self, message: String, headers: Option>, key: String) -> Result { - // Span for kafka publish - let producer_span = span!(Level::INFO, "publish_span"); - let _ = producer_span.enter(); - - let unwrap_header = &headers.unwrap(); + #[instrument(skip_all, fields(topic = %self.topic))] + pub(crate) async fn kafka_publish(&self, message: String, headers: Option>, key: String) -> Result { + // Only because never expecting wrong headers to reach here + let unwrap_header = &headers.unwrap_or_default(); let o_header = into_headers(unwrap_header); // println!("headers {:?}", o_header); diff --git a/chronos_bin/src/kafka/utils.rs b/chronos_bin/src/kafka/utils.rs deleted file mode 100644 index 6200d5e..0000000 --- a/chronos_bin/src/kafka/utils.rs +++ /dev/null @@ -1,141 +0,0 @@ -// use std::{collections::HashMap, str::FromStr}; -// -// use rdkafka::{ -// message::{BorrowedMessage, Header, Headers, OwnedHeaders}, -// Message, -// }; -// use serde::de::DeserializeOwned; -// use talos_certifier::{core::MessageVariant, errors::CommonError}; -// -// /// Builds a map of headers for the received Kafka message -// pub fn get_message_headers(message: &BorrowedMessage) -> Option> { -// if let Some(headers) = message.headers() { -// let headers = (0..headers.count()).fold(HashMap::::new(), |mut acc, i| { -// if let (k, Some(v)) = (headers.get(i).key, headers.get(i).value) { -// acc.insert(k.to_owned(), String::from_utf8_lossy(v).into_owned()); -// } -// -// acc -// }); -// -// if headers.is_empty() { -// return None; -// } else { -// return Some(headers); -// } -// } -// -// None -// } -// -// pub fn build_kafka_headers(headers: HashMap) -> OwnedHeaders { -// let owned_headers = OwnedHeaders::new(); -// -// let owned_headers = headers.iter().fold(owned_headers, |acc, x| { -// let header = Header { key: x.0, value: Some(x.1) }; -// acc.insert(header) -// }); -// -// owned_headers -// } -// -// /// Parses the payload message from Kafka into the struct defined by the generic. -// /// -// /// returns a result of either `T` or `ParseError` -// pub fn parse_kafka_payload(message: &[u8]) -> Result { -// serde_json::from_slice::(message).map_err(|err| CommonError::ParseError { -// data: String::from_utf8_lossy(message).into_owned(), -// reason: err.to_string(), -// }) -// } -// -// /// Parses the message type string to enum MessageVariant. -// /// -// /// returns a result of either `MessageVariant` enum or `ParseError` -// pub fn parse_message_variant(message_type: &String) -> Result { -// MessageVariant::from_str(message_type).map_err(|e| CommonError::ParseError { -// reason: e.to_string(), -// data: message_type.to_string(), -// }) -// } -// -// #[cfg(test)] -// mod tests { -// -// use std::collections::HashMap; -// -// use rdkafka::message::Headers; -// use serde::Deserialize; -// use talos_certifier::{core::MessageVariant, errors::CommonError}; -// -// use crate::kafka::utils::parse_kafka_payload; -// -// use super::{build_kafka_headers, parse_message_variant}; -// -// #[test] -// fn test_parse_kafka_payload_successfully() { -// #[derive(Deserialize)] -// struct User { -// name: String, -// age: u32, -// } -// -// let json_user_data_as_str = r#" -// { -// "name": "John Doe", -// "age": 43 -// }"#; -// -// let json_user_data_as_u8array = json_user_data_as_str.as_bytes(); -// -// let parse_result = parse_kafka_payload::(json_user_data_as_u8array).unwrap(); -// -// assert_eq!(parse_result.name, "John Doe".to_owned()); -// assert_eq!(parse_result.age, 43); -// } -// -// #[test] -// fn test_parse_kafka_payload_error_parsing() { -// #[derive(Deserialize, Debug)] -// struct User { -// _name: String, -// _age: u32, -// } -// -// let json_user_data_as_str = "Hello World"; -// -// let json_user_data_as_u8array = json_user_data_as_str.as_bytes(); -// -// let parse_error = parse_kafka_payload::(json_user_data_as_u8array).unwrap_err(); -// -// assert!(matches!(parse_error, CommonError::ParseError { .. })); -// } -// -// #[test] -// fn test_parse_message_variant_successfully() { -// let parse_candidate_variant = parse_message_variant(&"Candidate".to_string()).unwrap(); -// assert_eq!(parse_candidate_variant, MessageVariant::Candidate); -// -// let parse_decision_variant = parse_message_variant(&"Decision".to_string()).unwrap(); -// assert_eq!(parse_decision_variant, MessageVariant::Decision); -// } -// -// #[test] -// fn test_parse_message_variant_error() { -// let parse_error = parse_message_variant(&"Error type".to_string()).unwrap_err(); -// assert!(matches!(parse_error, CommonError::ParseError { .. })); -// } -// -// #[test] -// fn test_building_kafka_headers_correctly() { -// let mut header_hashmap = HashMap::new(); -// header_hashmap.insert("header1".to_owned(), "value1".to_owned()); -// -// // test correct header is returned from index -// let owned_headers_result = build_kafka_headers(header_hashmap.clone()); -// assert_eq!(owned_headers_result.get(0).key, "header1".to_owned()); -// -// //test the count is correct. -// assert_eq!(build_kafka_headers(header_hashmap.clone()).count(), 1); -// } -// } diff --git a/chronos_bin/src/lib.rs b/chronos_bin/src/lib.rs index 47bff38..98bac13 100644 --- a/chronos_bin/src/lib.rs +++ b/chronos_bin/src/lib.rs @@ -11,3 +11,4 @@ pub mod utils; // Infra pub mod kafka; pub mod postgres; +pub mod telemetry; diff --git a/chronos_bin/src/message_processor.rs b/chronos_bin/src/message_processor.rs index 979434b..8cb2d70 100644 --- a/chronos_bin/src/message_processor.rs +++ b/chronos_bin/src/message_processor.rs @@ -6,7 +6,6 @@ use chrono::Utc; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; -use tracing::{event, field, info_span, span, trace, Level}; use uuid::Uuid; pub struct MessageProcessor { @@ -16,19 +15,18 @@ pub struct MessageProcessor { impl MessageProcessor { pub async fn run(&self) { - // log::info!("MessageProcessor ON!"); - event!(tracing::Level::INFO, "Chronos Processor On!"); - //Get UUID for the node that deployed this thread let node_id: String = std::env::var("NODE_ID").unwrap_or_else(|_| uuid::Uuid::new_v4().to_string()); + log::info!("MessageProcessor ON @ node_id: {}", node_id); + let mut delay_controller = DelayController::new(100); loop { tokio::time::sleep(Duration::from_millis(ChronosConfig::from_env().processor_db_poll)).await; let deadline = Utc::now() - Duration::from_secs(ChronosConfig::from_env().time_advance); - let param = GetReady { + let params = GetReady { readied_at: deadline, readied_by: Uuid::parse_str(&node_id).unwrap(), deadline, @@ -37,109 +35,136 @@ impl MessageProcessor { }; //retry loop - loop { - // thread::sleep(Duration::from_millis(100)); - let max_retry_count = 3; - let mut retry_count = 0; - - let node_id_option: Option = node_id.clone().into(); - // let mut row_id: Option = None; - let monitor_span = info_span!("processor_picked", node_id = field::Empty, errors = field::Empty); - let _ = monitor_span.enter(); - match &self.data_store.ready_to_fire(¶m).await { - Ok(publish_rows) => { - let rdy_to_pblsh_count = publish_rows.len(); - if rdy_to_pblsh_count > 0 { - monitor_span.record("node_id", &node_id); - trace!("ready_to_publish_count {}", rdy_to_pblsh_count); - let mut ids: Vec = Vec::with_capacity(rdy_to_pblsh_count); - let mut publish_futures = Vec::with_capacity(rdy_to_pblsh_count); - for row in publish_rows { - let updated_row = TableRow { - id: row.get("id"), - deadline: row.get("deadline"), - readied_at: row.get("readied_at"), - readied_by: row.get("readied_by"), - message_headers: row.get("message_headers"), - message_key: row.get("message_key"), - message_value: row.get("message_value"), - }; - - let mut headers: HashMap = match serde_json::from_str(&updated_row.message_headers.to_string()) { - Ok(t) => t, - Err(_e) => { - println!("error occurred while parsing"); - HashMap::new() - } - }; - //TODO: handle empty headers - - let readied_by = updated_row.readied_by.to_string(); - - headers.insert("readied_by".to_string(), readied_by); - - publish_futures.push(self.producer.publish( - updated_row.message_value.to_string(), - Some(headers), - updated_row.message_key.to_string(), - // updated_row.id.to_string(), - )) - } - let publish_kafka = info_span!("publish_kafka", node_id = &node_id, errors = field::Empty, published = field::Empty); - let _ = publish_kafka.enter(); - let results = futures::future::join_all(publish_futures).await; - for result in results { - match result { - Ok(m) => { - publish_kafka.record("published", "success"); - ids.push(m); - } - Err(e) => { - publish_kafka.record("published", "failure"); - publish_kafka.record("error", &e.to_string()); - - log::error!("Error: delayed message publish failed {:?}", e); - break; - // failure detection needs to pick - } + let _ = &self.processor_message_ready(¶ms).await; + + delay_controller.sleep().await; + } + } + + #[tracing::instrument(skip_all, fields(node_id, chronos_id, is_published, error))] + async fn processor_message_ready(&self, params: &GetReady) { + loop { + let max_retry_count = 3; + let mut retry_count = 0; + + match &self.data_store.ready_to_fire_db(params).await { + Ok(publish_rows) => { + let rdy_to_pblsh_count = publish_rows.len(); + if rdy_to_pblsh_count > 0 { + let mut ids: Vec = Vec::with_capacity(rdy_to_pblsh_count); + let mut publish_futures = Vec::with_capacity(rdy_to_pblsh_count); + for row in publish_rows { + let updated_row = TableRow { + id: row.get("id"), + deadline: row.get("deadline"), + readied_at: row.get("readied_at"), + readied_by: row.get("readied_by"), + message_headers: row.get("message_headers"), + message_key: row.get("message_key"), + message_value: row.get("message_value"), + }; + let mut headers: HashMap = match serde_json::from_str(&updated_row.message_headers.to_string()) { + Ok(t) => t, + Err(_e) => { + println!("error occurred while parsing"); + HashMap::new() } - } + }; + //TODO: handle empty headers + + let readied_by = updated_row.readied_by.to_string(); + tracing::Span::current().record("node_id", &readied_by); + headers.insert("readied_by".to_string(), readied_by); - if !ids.is_empty() { - if let Err(outcome_error) = &self.data_store.delete_fired(&ids).await { - println!("Error: error occurred in message processor delete_fired {}", outcome_error); - //add retry logic here + tracing::Span::current().record("chronos_id", updated_row.id.to_string()); + + publish_futures.push(self.producer.kafka_publish( + updated_row.message_value.to_string(), + Some(headers), + updated_row.message_key.to_string(), + // updated_row.id.to_string(), + )) + } + let results = futures::future::join_all(publish_futures).await; + for result in results { + match result { + Ok(m) => { + tracing::Span::current().record("is_published", "true"); + ids.push(m); + } + Err(e) => { + tracing::Span::current().record("is_published", "false"); + tracing::Span::current().record("error", &e.to_string()); + + log::error!("Error: delayed message publish failed {:?}", e); + break; + // failure detection needs to pick } - // println!("delete ids {:?} and break", ids); - break; } - log::debug!("number of rows published successfully and deleted from DB {}", ids.len()); - } else { - log::debug!("no rows ready to fire for dealine {}", deadline); + } + + if !ids.is_empty() { + if let Err(outcome_error) = &self.data_store.delete_fired_db(&ids).await { + println!("Error: error occurred in message processor delete_fired {}", outcome_error); + //add retry logic here + } + // println!("delete ids {:?} and break", ids); break; } + log::debug!("number of rows published successfully and deleted from DB {}", ids.len()); + } else { + log::debug!("no rows ready to fire for dealine "); + break; } - Err(e) => { - if e.contains("could not serialize access due to concurrent update") && retry_count < max_retry_count { - //retry goes here - eprintln!("retrying"); - retry_count += 1; - if retry_count == max_retry_count { - log::error!( - "Error: max retry count {} reached by node {} for row ", - max_retry_count, - node_id_option.unwrap(), - // row_id.unwrap() - ); - break; - } + } + Err(e) => { + if e.contains("could not serialize access due to concurrent update") && retry_count < max_retry_count { + //retry goes here + eprintln!("retrying"); + retry_count += 1; + if retry_count == max_retry_count { + log::error!( + "Error: max retry count {} reached by node {} for row ", + max_retry_count, + "node_id_option.unwrap()", + // row_id.unwrap() + ); + break; } - log::error!("Error: error occurred in message processor while publishing {}", e); + // &self.process_db_rows(¶ms).await; + } + log::error!("Error: error occurred in message processor while publishing {}", e); + break; + } + } + } + + // let node_id_option: Option = node_id.clone().into(); + // let mut row_id: Option = None; + } + + #[tracing::instrument(skip_all, fields(chronos_ids_deleted))] + async fn clean_db(&self, ids: Vec) { + //rety in case delete fails + let max_retries = 3; + let mut retry_count = 0; + loop { + if retry_count < max_retries { + match &self.data_store.delete_fired_db(&ids).await { + Ok(_) => { + tracing::Span::current().record("chronos_ids_deleted", ids.join(",")); break; } + Err(e) => { + println!("Error: error occurred in message processor delete_fired {}", e); + retry_count += 1; + continue; + } } + } else { + log::error!("Error: max retry count {} reached by node {} for row ", max_retries, "node_id_option.unwrap()",); + break; } - delay_controller.sleep().await; } } } diff --git a/chronos_bin/src/message_receiver.rs b/chronos_bin/src/message_receiver.rs index 728b53a..cec86b7 100644 --- a/chronos_bin/src/message_receiver.rs +++ b/chronos_bin/src/message_receiver.rs @@ -1,17 +1,17 @@ use chrono::{DateTime, Utc}; -use log::warn; +use log::{debug, error, info, warn}; use serde_json::json; +use tracing::instrument; use crate::kafka::consumer::KafkaConsumer; use crate::kafka::producer::KafkaProducer; use crate::postgres::pg::{Pg, TableInsertRow}; use crate::utils::util::{get_message_key, get_payload_utf8, headers_check, required_headers, CHRONOS_ID, DEADLINE}; -use rdkafka::message::Message; +use rdkafka::message::{BorrowedMessage, Message}; +use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; use std::time::Instant; -use tracing::{event, field, info_span, trace_span}; -use tracing_subscriber::fmt::FormatFields; pub struct MessageReceiver { pub(crate) consumer: Arc, @@ -20,82 +20,111 @@ pub struct MessageReceiver { } impl MessageReceiver { - // pub fn new(consumer: Arc, producer: Arc, data_store: Arc) -> Self { - // Self { - // consumer, - // producer, - // data_store, - // } - // } + #[instrument(skip_all, fields(chronos_id))] + pub async fn receiver_publish_to_kafka(&self, new_message: &BorrowedMessage<'_>, headers: HashMap) { + let string_payload = String::from_utf8_lossy(get_payload_utf8(new_message)).to_string(); + let message_key = get_message_key(new_message); + tracing::Span::current().record("chronos_id", &message_key); + let outcome = &self.producer.kafka_publish(string_payload, Some(headers), message_key.to_string()).await; + match outcome { + Ok(_) => { + debug!("Published message to Kafka {}", &message_key); + } + Err(e) => { + error!("Failed to publish message to Kafka: {:?}", e); + // TODO check if needs to retry publishing + } + } + } - // #[tracing::instrument] - pub async fn run(&self) { - event!(tracing::Level::INFO, "Chronos Receiver On!"); - let _ = &self.consumer.subscribe().await; - // for _n in 0..100 { - let mut total_count = 0; - let mut direct_sent_count = 0; - let mut db_insert_count = 0; - loop { - if let Ok(message) = &self.consumer.consume_message().await { - let receiver_span = info_span!( - "message_received", - errors = field::Empty, - message_key = get_message_key(message), - flow = field::Empty - ); - let _ = receiver_span.enter(); - total_count += 1; - if headers_check(message.headers().unwrap()) { - let new_message = &message; - let headers = required_headers(new_message).expect("parsing headers failed"); - let message_deadline: DateTime = DateTime::::from_str(&headers[DEADLINE]).expect("String date parsing failed"); + #[instrument(skip_all, fields(chronos_id))] + pub async fn receiver_insert_to_db(&self, new_message: &BorrowedMessage<'_>, headers: HashMap, deadline: DateTime) { + let result_value = &serde_json::from_slice(get_payload_utf8(new_message)); + let payload = match result_value { + Ok(payload) => payload, + Err(e) => { + error!("de-ser failed for payload: {:?}", e); + return; + } + }; + + let message_key = get_message_key(new_message); + tracing::Span::current().record("chronos_id", &message_key); - if message_deadline <= Utc::now() { - receiver_span.record("flow", "deadline passed, publish message!"); - direct_sent_count += 1; - let string_payload = String::from_utf8_lossy(get_payload_utf8(new_message)).to_string(); - let message_key = get_message_key(new_message); - let _outcome = &self - .producer - .publish(string_payload, Some(headers), message_key) - .await - .expect("Publish failed for received message"); - } else { - receiver_span.record("flow", "deadline not passed, insert to db!"); - db_insert_count += 1; - let chronos_message_id = &headers[CHRONOS_ID]; + let params = TableInsertRow { + id: &headers[CHRONOS_ID], + deadline, + message_headers: &json!(&headers), + message_key: message_key.as_str(), + message_value: payload, + }; + let _insert_time = Instant::now(); - let payload = get_payload_utf8(new_message); + //retry + let total_retry_count = 3; + let mut retry_count = 0; + loop { + match self.data_store.insert_to_delay_db(¶ms).await { + Ok(_) => { + break; + } + Err(e) => { + error!("insert_to_delay failed: {:?} retrying again", e); + retry_count += 1; + if retry_count == total_retry_count { + error!("max retry count {} exceeded aborting insert_to_db for {}", total_retry_count, message_key); + break; + } + } + } + } + } - let message_key = get_message_key(new_message); + #[tracing::instrument(name = "receiver_handle_message", skip_all, fields(chronos_id))] + pub async fn handle_message(&self, message: &BorrowedMessage<'_>) { + if headers_check(message.headers().unwrap()) { + let new_message = &message; - let params = TableInsertRow { - id: chronos_message_id, - deadline: message_deadline, - message_headers: &json!(&headers), - message_key: message_key.as_str(), - message_value: &serde_json::from_slice(payload).expect("de-ser failed for payload"), - }; - let _insert_time = Instant::now(); - self.data_store.insert_to_delay(¶ms).await.expect("insert to db failed"); - // println!("insert took: {:?}", insert_time.elapsed()) + if let Some(headers) = required_headers(new_message) { + tracing::Span::current().record("chronos_id", &headers[CHRONOS_ID]); + let message_deadline: DateTime = match DateTime::::from_str(&headers[DEADLINE]) { + Ok(d) => d, + Err(e) => { + error!("failed to parse deadline: {}", e); + return; } + }; + + if message_deadline <= Utc::now() { + debug!("message deadline is in the past, sending directly to out_topic"); + // direct_sent_count += 1; + self.receiver_publish_to_kafka(new_message, headers).await } else { - warn!("message with improper headers on inbox.topic "); - receiver_span.record("flow", "improper headers, ignore!"); - //TODO: ignore + debug!("message deadline is in the future, sending to kafka"); + // db_insert_count += 1; + + self.receiver_insert_to_db(new_message, headers, message_deadline).await + // println!("insert took: {:?}", insert_time.elapsed()) } - // println!("{direct_sent_count} messages sent directly and {db_insert_count} added to db from total of {total_count} "); + } else { + warn!("message with improper headers on inbox.topic "); } + } else { + warn!("message with improper headers on inbox.topic "); + } + // println!("{direct_sent_count} messages sent directly and {db_insert_count} added to db from total of {total_count} "); + } - // println!("commit received message {:?}", new_message); - // if let Ok(m) = &kafka_consumer.client{ - // m.commit_message(&message, CommitMode::Async).expect("commit message failed "); - // }else{ - // println!("Error Occurred"); - // } + pub async fn run(&self) { + info!("MessageReceiver ON!"); + let _ = &self.consumer.subscribe().await; + // let mut total_count = 0; + // let mut direct_sent_count = 0; + // let mut db_insert_count = 0; + loop { + if let Ok(message) = &self.consumer.kafka_consume_message().await { + self.handle_message(message).await; + } } } - // } } diff --git a/chronos_bin/src/monitor.rs b/chronos_bin/src/monitor.rs index d5488bc..474451e 100644 --- a/chronos_bin/src/monitor.rs +++ b/chronos_bin/src/monitor.rs @@ -3,7 +3,6 @@ use crate::utils::config::ChronosConfig; use chrono::Utc; use std::sync::Arc; use std::time::Duration; -use tracing::{error_span, event, field, info_span, instrument, span, trace, Level}; #[derive(Debug)] pub struct FailureDetector { @@ -14,35 +13,36 @@ pub struct FailureDetector { impl FailureDetector { // #[instrument] pub async fn run(&self) { - // println!("Monitoring On!"); - trace!("Monitoring On!"); + log::info!("Monitoring On!"); loop { // TODO multiple rows are fetched, what to track in the monitor? - let monitor_span = info_span!("failure_detector", records_len = field::Empty, exception = field::Empty); - let _ = monitor_span.enter(); - let _ = tokio::time::sleep(Duration::from_secs(ChronosConfig::from_env().monitor_db_poll)).await; // sleep for 10sec + let _ = tokio::time::sleep(Duration::from_secs(ChronosConfig::from_env().monitor_db_poll)).await; - trace!("failed_to_fire On!"); - match &self - .data_store - .failed_to_fire(&(Utc::now() - Duration::from_secs(ChronosConfig::from_env().fail_detect_interval))) - .await - { - Ok(fetched_rows) => { - if !fetched_rows.is_empty() { - if let Err(e) = &self.data_store.reset_to_init(fetched_rows).await { - monitor_span.record("exception", e); - println!("error in monitor reset_to_init {}", e); - } - monitor_span.record("records_len", fetched_rows.len()); - } else { - monitor_span.record("records_len", "empty"); + let _ = &self.monitor_failed().await; + } + } + #[tracing::instrument(skip_all, fields(message_key, error, monitoring_len))] + async fn monitor_failed(&self) { + match &self + .data_store + .failed_to_fire_db(&(Utc::now() - Duration::from_secs(ChronosConfig::from_env().fail_detect_interval))) + .await + { + Ok(fetched_rows) => { + if !fetched_rows.is_empty() { + if let Err(e) = &self.data_store.reset_to_init_db(fetched_rows).await { + tracing::Span::current().record("error", e); + println!("error in monitor reset_to_init {}", e); } + tracing::Span::current().record("monitoring_len", fetched_rows.len()); + // TODO Need to monitor the node that redied but never fired + } else { + tracing::Span::current().record("monitoring_len", "empty"); } - Err(e) => { - println!("error in monitor {}", e); - } + } + Err(e) => { + println!("error in monitor {}", e); } } } diff --git a/chronos_bin/src/postgres/pg.rs b/chronos_bin/src/postgres/pg.rs index a5e546b..69bdf1b 100644 --- a/chronos_bin/src/postgres/pg.rs +++ b/chronos_bin/src/postgres/pg.rs @@ -10,12 +10,11 @@ use uuid::Uuid; use crate::postgres::config::PgConfig; use crate::postgres::errors::PgError; -use tracing::{event, field, info_span, Span}; +use tracing::event; #[derive(Clone, Debug)] pub struct Pg { pub pool: Pool, - // pub pg_span: Span, } #[derive(Debug)] @@ -78,9 +77,6 @@ impl PgAccess { impl Pg { pub async fn new(pg_config: PgConfig) -> Result { - // let pg_span = info_span!("pg_instance", error = field::Empty); - // let _ = pg_span.enter(); - let mut config = Config::new(); config.dbname = Some(pg_config.database); config.user = Some(pg_config.user); @@ -99,10 +95,7 @@ impl Pg { let mut tmp_list: Vec = Vec::new(); for _ in 1..=pg_config.pool_size { let client = match pool.get().await { - Ok(client) => { - event!(tracing::Level::INFO, "pg client created"); - client - } + Ok(client) => client, Err(e) => { error!("error::: Cannot get client from the pool while setting transaction isolation level {:?}", &e); event!( @@ -146,19 +139,14 @@ impl Pg { event!(tracing::Level::ERROR,error=%e, "pg client creation error"); Err(PgError::GetClientFromPool(e)) } - Ok(client) => { - event!(tracing::Level::INFO, "pg client created"); - Ok(client) - } + Ok(client) => Ok(client), } } } impl Pg { - pub(crate) async fn insert_to_delay(&self, params: &TableInsertRow<'_>) -> Result { - let delay_query_span = info_span!("insert_to_delay", params = field::Empty, query = field::Empty); - let _ = delay_query_span.enter(); - + #[tracing::instrument(skip_all)] + pub(crate) async fn insert_to_delay_db(&self, params: &TableInsertRow<'_>) -> Result { let pg_client = self.get_client().await?; let mut pg_access = PgAccess { client: pg_client }; let pg_txn: PgTxn = pg_access.get_txn().await; @@ -166,9 +154,6 @@ impl Pg { let insert_query = "INSERT INTO hanger (id, deadline, message_headers, message_key, message_value) VALUES ($1, $2 ,$3, $4, $5 )"; - delay_query_span.record("query", insert_query); - // delay_query_span.record("params", ¶ms); - let query_execute_instant = Instant::now(); let stmt = pg_txn.txn.prepare(insert_query).await.unwrap(); let outcome = pg_txn @@ -200,12 +185,8 @@ impl Pg { Ok(outcome.unwrap()) } - pub(crate) async fn delete_fired(&self, ids: &Vec) -> Result { - // let delete_query_span = info_span!("delete_fired", params = field::Empty, query = field::Empty); - - // let _ = delete_query_span.enter(); - - // let query_execute_instant = Instant::now(); + #[tracing::instrument(skip_all)] + pub(crate) async fn delete_fired_db(&self, ids: &Vec) -> Result { let pg_client = self.get_client().await.expect("Failed to get client from pool"); let mut pg_access = PgAccess { client: pg_client }; let pg_txn: PgTxn = pg_access.get_txn().await; @@ -218,9 +199,6 @@ impl Pg { } query = query.strip_suffix(',').unwrap().to_string(); query += ")"; - // println!("query {}", query); - // delete_query_span.record("query", query.as_str()); - // delete_query_span.record("params", &values_as_slice); let stmt = pg_txn.txn.prepare(query.as_str()).await.unwrap(); let response = pg_txn.txn.execute(&stmt, &values_as_slice).await; @@ -251,10 +229,8 @@ impl Pg { } } - pub(crate) async fn ready_to_fire(&self, param: &GetReady) -> Result, String> { - // let ready_to_fire_query_span = info_span!("ready_to_fire", params = field::Empty, query = field::Empty); - // let _ = ready_to_fire_query_span.enter(); - + #[tracing::instrument(skip_all)] + pub(crate) async fn ready_to_fire_db(&self, param: &GetReady) -> Result, String> { //TODO handle get client error gracefully let pg_client = self.get_client().await.expect("Unable to get client"); let mut pg_access = PgAccess { client: pg_client }; @@ -312,18 +288,14 @@ impl Pg { } } - pub(crate) async fn failed_to_fire(&self, delay_time: &DateTime) -> Result, PgError> { - let failed_to_fire_query_span = info_span!("failed_to_fire", params = field::Empty, query = field::Empty); - let _ = failed_to_fire_query_span.enter(); - + #[tracing::instrument(skip_all)] + pub(crate) async fn failed_to_fire_db(&self, delay_time: &DateTime) -> Result, PgError> { let query_execute_instant = Instant::now(); let pg_client = self.get_client().await?; let get_query = "SELECT * from hanger where readied_at > $1 ORDER BY deadline DESC"; let stmt = pg_client.prepare(get_query).await?; - failed_to_fire_query_span.record("query", get_query); - let response = pg_client.query(&stmt, &[&delay_time]).await.expect("get delayed messages query failed"); let time_elapsed = query_execute_instant.elapsed(); if time_elapsed > Duration::from_millis(100) { @@ -332,10 +304,8 @@ impl Pg { Ok(response) } - pub(crate) async fn reset_to_init(&self, to_init_list: &Vec) -> Result, String> { - let reset_to_init_query_span = info_span!("reset_to_init", params = field::Empty, query = field::Empty); - let _ = reset_to_init_query_span.enter(); - + #[tracing::instrument(skip_all)] + pub(crate) async fn reset_to_init_db(&self, to_init_list: &Vec) -> Result, String> { let query_execute_instant = Instant::now(); let mut id_list = Vec::::new(); for row in to_init_list { @@ -360,7 +330,6 @@ impl Pg { query = query.strip_suffix(',').unwrap().to_string(); query += ")"; - reset_to_init_query_span.record("query", query.as_str()); // println!("reset query {}", query); let stmt = pg_txn.txn.prepare(query.as_str()).await.expect("Unable to prepare query"); diff --git a/chronos_bin/src/runner.rs b/chronos_bin/src/runner.rs index c9d4cc2..0e610b0 100644 --- a/chronos_bin/src/runner.rs +++ b/chronos_bin/src/runner.rs @@ -15,11 +15,7 @@ pub struct Runner { } impl Runner { - // #[instrument(skip(self))] pub async fn run(&self) { - let runner_span = info_span!("runner"); - let _ = runner_span.enter(); - let monitor_ds = Arc::clone(&self.data_store); let process_ds = Arc::clone(&self.data_store); diff --git a/chronos_bin/src/telemetry/jaegar_backend.rs b/chronos_bin/src/telemetry/jaegar_backend.rs new file mode 100644 index 0000000..887f9fa --- /dev/null +++ b/chronos_bin/src/telemetry/jaegar_backend.rs @@ -0,0 +1,12 @@ +use opentelemetry_api::trace::TraceError; +use opentelemetry_sdk::trace::Tracer; + +pub fn instrument_jaegar_pipleline() -> Result { + let service_name = std::env::var("OTEL_SERVICE_NAME"); + if service_name.is_err() { + std::env::set_var("OTEL_SERVICE_NAME", "chronos"); + } + opentelemetry_jaeger::new_agent_pipeline() + .with_service_name(format!("{:?}", service_name)) + .install_simple() +} diff --git a/chronos_bin/src/telemetry/mod.rs b/chronos_bin/src/telemetry/mod.rs new file mode 100644 index 0000000..bde341c --- /dev/null +++ b/chronos_bin/src/telemetry/mod.rs @@ -0,0 +1,3 @@ +mod jaegar_backend; +mod otlp_collector; +pub mod register_telemetry; diff --git a/chronos_bin/src/telemetry/otlp_collector.rs b/chronos_bin/src/telemetry/otlp_collector.rs new file mode 100644 index 0000000..f83630c --- /dev/null +++ b/chronos_bin/src/telemetry/otlp_collector.rs @@ -0,0 +1,38 @@ +use opentelemetry::trace::TraceError; +use opentelemetry::{ + global, + sdk::{propagation::TraceContextPropagator, trace as sdktrace}, +}; +use opentelemetry_otlp::{Protocol, WithExportConfig}; + +pub struct OtlpCollector {} + +impl Default for OtlpCollector { + fn default() -> Self { + Self::new() + } +} + +impl OtlpCollector { + pub fn new() -> Self { + OtlpCollector {} + } + + pub fn http_collector_connect(&self, protocol: Protocol) -> Result { + if let Ok(trace_exporter) = std::env::var("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT") { + global::set_text_map_propagator(TraceContextPropagator::new()); + opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter(opentelemetry_otlp::new_exporter().http().with_protocol(protocol).with_endpoint(trace_exporter)) + .install_batch(opentelemetry::runtime::Tokio) + } else { + log::error!("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT not set"); + + // trace error + Err(TraceError::Other(Box::new(std::io::Error::new( + std::io::ErrorKind::Other, + "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT not set", + )))) + } + } +} diff --git a/chronos_bin/src/telemetry/register_telemetry.rs b/chronos_bin/src/telemetry/register_telemetry.rs new file mode 100644 index 0000000..edf31b2 --- /dev/null +++ b/chronos_bin/src/telemetry/register_telemetry.rs @@ -0,0 +1,55 @@ +use opentelemetry_otlp::Protocol; +use tracing_subscriber::{prelude::__tracing_subscriber_SubscriberExt, util::SubscriberInitExt}; + +use super::{jaegar_backend::instrument_jaegar_pipleline, otlp_collector::OtlpCollector}; + +pub enum TelemetryCollectorType { + Jaegar, + Otlp, +} + +pub struct TelemetryCollector { + pub collector_type: TelemetryCollectorType, +} + +impl Default for TelemetryCollector { + fn default() -> Self { + TelemetryCollector { + collector_type: TelemetryCollectorType::Otlp, + } + } +} + +impl TelemetryCollector { + pub fn new() -> Self { + TelemetryCollector::default() + } + + pub fn register_traces(self) { + let tracer = match &self.collector_type { + TelemetryCollectorType::Jaegar => instrument_jaegar_pipleline(), + TelemetryCollectorType::Otlp => { + let otlp_collector = OtlpCollector::new(); + otlp_collector.http_collector_connect(Protocol::HttpBinary) + } + }; + + match tracer { + Ok(tracer) => { + //creating a layer for Otel + let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer); + + //subscribing to tracing with opentelemetry + match tracing_subscriber::registry().with(otel_layer).try_init() { + Ok(_) => {} + Err(e) => { + println!(" {}", e); + } + } + } + Err(e) => { + log::error!("error while initializing tracing {}", e); + } + } + } +} diff --git a/chronos_bin/src/utils/util.rs b/chronos_bin/src/utils/util.rs index 7049794..1f21f0c 100644 --- a/chronos_bin/src/utils/util.rs +++ b/chronos_bin/src/utils/util.rs @@ -2,14 +2,20 @@ use rdkafka::message::{BorrowedHeaders, BorrowedMessage, Header, Headers, OwnedH use rdkafka::Message; use std::collections::HashMap; -pub static CHRONOS_ID: &str = "chronosId"; +pub static CHRONOS_ID: &str = "chronosMessageId"; pub static DEADLINE: &str = "chronosDeadline"; //TODO check correctness for two headers in this method pub fn required_headers(message: &BorrowedMessage) -> Option> { if let Some(headers) = message.headers() { let reqd_headers = headers.iter().fold(HashMap::::new(), |mut acc, header| { - let key: String = header.key.parse().unwrap(); + let key: String = match header.key.parse() { + Ok(key) => key, + Err(e) => { + log::error!("Error parsing header key: {}", e); + return acc; + } + }; let value: String = String::from_utf8_lossy(header.value.expect("utf8 parsing for header value failed")).into_owned(); acc.insert(key, value); @@ -49,3 +55,18 @@ pub fn get_message_key(message: &BorrowedMessage) -> String { let key = String::from_utf8_lossy(message.key().expect("No key found for message")).to_string(); key } + +pub fn get_chronos_id(headers: &BorrowedHeaders) -> String { + let value = headers + .iter() + .find(|h| { + let header_keys = [CHRONOS_ID]; + header_keys.contains(&h.key) && h.value.is_some() + }) + .expect("No chronosId found for message") + .value + .expect("No chronosId found for message"); + + String::from_utf8_lossy(value).into_owned() + // return value; +} diff --git a/docker-compose.yml b/docker-compose.yml index c08ad4e..90d84cc 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -101,7 +101,8 @@ services: # ENVIRONMENT: "dev" # SERVICE_NAME: "chronos-delay-scheduler" # BUILD_VERSION: "0.0.0" - # KAFKA_BROKERS: "kafka:9092" + # KAFKA_HOST: kafka + # KAFKA_PORT: 9092 # KAFKA_CLIENT_ID: "chronos" # KAFKA_GROUP_ID: "chronos" # KAFKA_IN_TOPIC: "chronos.in" @@ -126,8 +127,6 @@ services: # - zookeeper # - kafka - - # ******************** # Telemetry Components # ******************** @@ -141,6 +140,7 @@ services: - "14250" container_name: Jaeger environment: + # COLLECTOR_OTLP_ENABLED is false in case running Jaeger as Backend - COLLECTOR_OTLP_ENABLED=true networks: - chronos @@ -166,59 +166,6 @@ services: - # jaeger-all-in-one: - # image: jaegertracing/all-in-one:latest - # container_name: jaeger - # command: - # - "--memory.max-traces" - # - "10000" - # - "--query.base-path" - # - "/jaeger/ui" - # # - "--prometheus.server-url" - # # - "http://${PROMETHEUS_ADDR}" - # deploy: - # resources: - # limits: - # memory: 300M - # restart: unless-stopped - # ports: - # - "16686" # Jaeger UI - # - "4317" # OTLP gRPC default port - # environment: - # - COLLECTOR_OTLP_ENABLED=true - # # - METRICS_STORAGE_TYPE=prometheus - # # logging: *logging - # networks: - # - chronos - - # # OpenTelemetry Collector - # otelcol: - # image: otel/opentelemetry-collector-contrib:0.84.0 - # container_name: otel-col - # deploy: - # resources: - # limits: - # memory: 125M - # restart: unless-stopped - # # command: [ "--config=/etc/otelcol-config.yml", "--config=/etc/otelcol-observability.yml", "--config=/etc/otelcol-config-extras.yml" ] - # command: [ "--config=/etc/otelcol-config.yml" ] - # volumes: - # - ./infra/otelcol-config.yml:/etc/otelcol-config.yml - # # - ./src/otelcollector/otelcol-observability.yml:/etc/otelcol-observability.yml - # # - ./src/otelcollector/otelcol-config-extras.yml:/etc/otelcol-config-extras.yml - # ports: - # - "4317" # OTLP over gRPC receiver - # - "4318:4318" # OTLP over HTTP receiver - # - "9464" # Prometheus exporter - # - "8888" # metrics endpoint - # depends_on: - # - jaeger-all-in-one - # # logging: *logging - # networks: - # - chronos - - - networks: chronos: name: chronos diff --git a/examples/chronos_ex/Cargo.toml b/examples/chronos_ex/Cargo.toml index 94cf4b2..3f8d40a 100644 --- a/examples/chronos_ex/Cargo.toml +++ b/examples/chronos_ex/Cargo.toml @@ -8,7 +8,7 @@ env_logger = "0.9.0" dotenv = "0.15.0" log = "0.4.14" -tokio = { version = "1.12.0", features = ["full"] } +tokio.workspace = true futures.workspace = true chronos_bin={path="../../chronos_bin"} @@ -18,13 +18,14 @@ tracing.workspace = true tracing-subscriber.workspace = true tracing-opentelemetry.workspace = true opentelemetry.workspace = true -opentelemetry-jaeger.workspace = true +opentelemetry_api.workspace = true +opentelemetry_sdk.workspace = true opentelemetry-otlp.workspace = true -opentelemetry-http.workspace = true +opentelemetry-jaeger.workspace = true # opentelemetry-stdout.workspace = true # opentelemetry = "0.20" opentelemetry-stdout = { version = "0.1.0", features = ["trace"] } # tracing = "0.1" # tracing-opentelemetry = "0.21" -# tracing-subscriber = "0.3" \ No newline at end of file +# tracing-subscriber = "0.3" diff --git a/examples/chronos_ex/examples/chronos_ex.rs b/examples/chronos_ex/examples/chronos_ex.rs index c0d74c7..d1567b8 100644 --- a/examples/chronos_ex/examples/chronos_ex.rs +++ b/examples/chronos_ex/examples/chronos_ex.rs @@ -23,44 +23,49 @@ use opentelemetry_otlp::WithExportConfig; use std::time::Duration; use tracing_subscriber::layer::SubscriberExt; -use tracing_subscriber::Registry; fn init_tracer() -> Result { - global::set_text_map_propagator(TraceContextPropagator::new()); - let os_resource = OsResourceDetector.detect(Duration::from_secs(0)); - let process_resource = ProcessResourceDetector.detect(Duration::from_secs(0)); - let sdk_resource = SdkProvidedResourceDetector.detect(Duration::from_secs(0)); - let env_resource = EnvResourceDetector::new().detect(Duration::from_secs(0)); - let telemetry_resource = TelemetryResourceDetector.detect(Duration::from_secs(0)); - opentelemetry_otlp::new_pipeline() - .tracing() - .with_exporter(opentelemetry_otlp::new_exporter().http().with_endpoint(format!( - "{}{}", - std::env::var("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT").unwrap_or_else(|_| "http://localhost:4318".to_string()), - "/v1/traces" - ))) - .with_trace_config( - sdktrace::config().with_resource( - os_resource - .merge(&process_resource) - .merge(&sdk_resource) - .merge(&env_resource) - .merge(&telemetry_resource), - ), - ) - .install_batch(opentelemetry::runtime::Tokio) + let service_name = std::env::var("OTEL_SERVICE_NAME"); + let trace_exporter = std::env::var("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"); + if service_name.is_err() { + std::env::set_var("OTEL_SERVICE_NAME", "chronos"); + } + if trace_exporter.is_ok() { + global::set_text_map_propagator(TraceContextPropagator::new()); + let os_resource = OsResourceDetector.detect(Duration::from_secs(0)); + let process_resource = ProcessResourceDetector.detect(Duration::from_secs(0)); + let sdk_resource = SdkProvidedResourceDetector.detect(Duration::from_secs(0)); + let env_resource = EnvResourceDetector::new().detect(Duration::from_secs(0)); + let telemetry_resource = TelemetryResourceDetector.detect(Duration::from_secs(0)); + opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter(opentelemetry_otlp::new_exporter().http().with_endpoint(format!("{:?}", service_name))) + .with_trace_config( + sdktrace::config().with_resource( + os_resource + .merge(&process_resource) + .merge(&sdk_resource) + .merge(&env_resource) + .merge(&telemetry_resource), + ), + ) + .install_batch(opentelemetry::runtime::Tokio) + } else { + log::error!("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT not set"); + + // trace error + Err(TraceError::Other(Box::new(std::io::Error::new( + std::io::ErrorKind::Other, + "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT not set", + )))) + } } #[tokio::main] async fn main() { - // env_logger::init(); + env_logger::init(); dotenv::dotenv().ok(); - // let tracer = opentelemetry_jaeger::new_agent_pipeline() - // .with_service_name("chronos_tel") - // .install_simple() - // .expect("failed to install tracing"); - let tracer = init_tracer().expect("failed to install tracing"); //creating a layer for Otel @@ -75,6 +80,9 @@ async fn main() { println!("error while initializing tracing {}", e); } } + // Initialise telemetry pipeline + // init_pipeline(); + // let tracer = tracer("chronos_tracer"); let kafka_config = KafkaConfig::from_env(); let pg_config = PgConfig::from_env(); @@ -92,6 +100,4 @@ async fn main() { debug!("debug logs starting chronos"); r.run().await; - // let runner = Runner {}; - // runner.run(); } diff --git a/examples/chronos_ex/examples/telemetry_async_simple.rs b/examples/chronos_ex/examples/telemetry_async_simple.rs new file mode 100644 index 0000000..2013ade --- /dev/null +++ b/examples/chronos_ex/examples/telemetry_async_simple.rs @@ -0,0 +1,75 @@ +use tracing::info_span; +use tracing_subscriber::prelude::*; + +struct B {} + +impl B { + pub async fn run(&self) { + let span_sub_runer = info_span!("B"); + let _guard = span_sub_runer.enter(); + println!("B"); + } +} + +struct A {} + +impl A { + pub fn new() -> Self { + Self {} + } + pub async fn run(&self) { + let span_runer = info_span!("A"); + let _guard = span_runer.enter(); + println!("A"); + let sub_runner = B {}; + sub_runner.run().await; + } +} + +struct Runner {} + +impl Runner { + pub async fn run(&self) { + let span_runer = info_span!("Runner"); + let _guard = span_runer.enter(); + println!("Runner"); + // A::new().run().await; + let handler = tokio::task::spawn(async { + println!("this is spawning"); + A::new().run().await; + }); + // let handler_one = tokio::task::spawn(async { + // let runner = Runner {}; + // runner.run().await; + // }); + + futures::future::join_all([handler]).await; + } +} + +#[tokio::main] +async fn main() { + // env_logger::init(); + dotenv::dotenv().ok(); + + let tracer = opentelemetry_jaeger::new_agent_pipeline() + .with_service_name("test_async_tel") + .install_simple() + .expect("failed to install tracing"); + + //creating a layer for Otel + let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer); + + // let subscriber = Registry::default().with(otel_layer); + + //subscribing to tracing with opentelemetry + match tracing_subscriber::registry().with(otel_layer).try_init() { + Ok(_) => {} + Err(e) => { + println!("error while initializing tracing {}", e); + } + } + + let runner = Runner {}; + runner.run().await; +} diff --git a/examples/chronos_ex/examples/telemetry_simple.rs b/examples/chronos_ex/examples/telemetry_simple.rs index af752c8..25faded 100644 --- a/examples/chronos_ex/examples/telemetry_simple.rs +++ b/examples/chronos_ex/examples/telemetry_simple.rs @@ -1,32 +1,52 @@ -use tracing::info_span; +use opentelemetry::trace::TracerProvider as _; +use opentelemetry_otlp::ExportConfig; +use opentelemetry_sdk::{runtime::Tokio, trace::TracerProvider}; +use tracing::{info_span, instrument}; use tracing_subscriber::prelude::*; +use tokio::time::Duration; + struct SubRunner {} impl SubRunner { pub fn run(&self) { - let span_sub_runer = info_span!("sub_hello_world"); + let span_sub_runer = info_span!("sub_hello_world 2"); let _guard = span_sub_runer.enter(); println!("Hello, under world!"); - let sub_runner = SubRunnerOne {}; - sub_runner.run(); + let sub_runner_one = SubRunnerOne {}; + sub_runner_one.run(); + } + + #[instrument(name = "run_db2", skip(self))] + pub async fn run_db(&self) { + // let span_sub_runer = info_span!("run_db 2"); + // let _guard = span_sub_runer.enter(); + let sub_runner_one = SubRunnerOne {}; + sub_runner_one.db().await; } } struct SubRunnerOne {} impl SubRunnerOne { pub fn run(&self) { - let span_sub_runer = info_span!("subone_hello_world"); + let span_sub_runer = info_span!("subone_hello_world 3"); let _guard = span_sub_runer.enter(); println!("Hello, sub one under world!"); } + + #[instrument(name = "db3", skip(self))] + pub async fn db(&self) { + // let span_sub_runer = info_span!("db 3"); + // let _guard = span_sub_runer.enter(); + tokio::time::sleep(Duration::from_secs(3)).await; + } } struct Runner {} impl Runner { - pub async fn run(&self) { - let span_runer = info_span!("hello_world Runner"); + pub fn run(&self) { + let span_runer = info_span!("Run Runner 1"); let _guard = span_runer.enter(); println!("Hello, world!"); let sub_runner = SubRunner {}; @@ -35,6 +55,17 @@ impl Runner { // let sub_runner_one = SubRunnerOne {}; // sub_runner_one.run(); } + + #[instrument(name = "db runner1", skip(self))] + pub async fn run_sub_db(&self) { + // let span_runer = info_span!("db Runner 1"); + // let _guard = span_runer.enter(); + let sub_runner = SubRunner {}; + sub_runner.run_db().await; + + // let sub_runner_one = SubRunnerOne {}; + // sub_runner_one.run(); + } } #[tokio::main] @@ -42,28 +73,65 @@ async fn main() { // env_logger::init(); dotenv::dotenv().ok(); - let tracer = opentelemetry_jaeger::new_agent_pipeline() - .with_service_name("test_tel") - .install_simple() - .expect("failed to install tracing"); + // let no_op = NoopTracerProvider::new(); + + // let tracer = opentelemetry_jaeger::new_agent_pipeline() + // .with_service_name("test_tel") + // .install_simple() + // .expect("failed to install tracing"); + + // let otel_exporter = opentelemetry_otlp::SpanExporter::Tonic { timeout: (), metadata: (), trace_exporter: () } + let otel_exporter = opentelemetry_otlp::SpanExporter::Http { + timeout: Duration::from_secs(0), + headers: None, + collector_endpoint: "http://localhost:4318/v1/traces".parse().unwrap(), + trace_exporter: None, + }; + + // 1. builder pattern for tracer provider + let provider = TracerProvider::builder().with_batch_exporter(otel_exporter, Tokio).build(); + + let _tracer = provider.tracer("Simple_telemetry_test"); + + // //creating a layer for Otel + // let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer); - //creating a layer for Otel - let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer); + // 2. pipeline pattern for tracer provider - best way to stich tracing with opentelemetry + // can mention all exporter configs here + // batching and tracing configs curried + // let otel_new = opentelemetry_otlp::new_pipeline().tracing().install_batch(Tokio).unwrap(); + + // let otel_layer = tracing_opentelemetry::layer().with_tracer(otel_new); // let subscriber = Registry::default().with(otel_layer); //subscribing to tracing with opentelemetry - match tracing_subscriber::registry().with(otel_layer).try_init() { - Ok(_) => {} - Err(e) => { - println!("error while initializing tracing {}", e); - } + // match tracing_subscriber::registry().with(otel_layer).try_init() { + // Ok(_) => {} + // Err(e) => { + // println!("error while initializing tracing {}", e); + // } + // } + + #[instrument] + fn worker() { + // let span = info_span!("worker"); + // let _guard = span.enter(); + println!("Hello, world!"); } let handler = tokio::task::spawn(async { println!("this is spawning"); - let runner = Runner {}; - runner.run().await; + // let runner = Runner {}; + // runner.run(); + // runner.run_sub_db().await; + let mut count = 0; + loop { + count += 1; + if count / 2 == 0 { + tokio::time::sleep(Duration::from_secs(1)).await; + } + } }); // let handler_one = tokio::task::spawn(async { // let runner = Runner {}; @@ -74,4 +142,5 @@ async fn main() { // let runner = Runner {}; // runner.run(); + tokio::time::sleep(Duration::from_secs(30)).await; } diff --git a/otel-collector-config.yaml b/otel-collector-config.yaml new file mode 100644 index 0000000..81b15c7 --- /dev/null +++ b/otel-collector-config.yaml @@ -0,0 +1,36 @@ +receivers: + otlp: + protocols: + http: + grpc: + +exporters: + logging: + loglevel: debug + + jaeger: + endpoint: jaeger-all-in-one:14250 + tls: + insecure: true + +processors: + batch: + +extensions: + health_check: + pprof: + endpoint: :1888 + zpages: + endpoint: :55679 + +service: + extensions: [pprof, zpages, health_check] + pipelines: + traces: + receivers: [otlp] + processors: [batch] + exporters: [logging, jaeger] + metrics: + receivers: [otlp] + processors: [batch] + exporters: [logging] \ No newline at end of file