From 2e6136d92e4d33d6e8341443f18e2e54ff3b4815 Mon Sep 17 00:00:00 2001 From: Vandana Varakantham Date: Tue, 27 Jun 2023 16:55:30 +0000 Subject: [PATCH] feat(callhome): add publisher library Signed-off-by: Vandana Varakantham --- {mbus-api => event-mbus-api}/Cargo.toml | 2 +- {mbus-api => event-mbus-api}/src/common.rs | 0 .../src/common/constants.rs | 0 .../src/common/errors.rs | 0 .../src/common/retry.rs | 0 {mbus-api => event-mbus-api}/src/lib.rs | 0 {mbus-api => event-mbus-api}/src/mbus_nats.rs | 2 +- {mbus-api => event-mbus-api}/src/message.rs | 0 event-publisher/Cargo.toml | 11 ++++ event-publisher/src/common.rs | 2 + event-publisher/src/common/constants.rs | 3 + event-publisher/src/event_handler.rs | 22 +++++++ event-publisher/src/event_layer.rs | 66 +++++++++++++++++++ event-publisher/src/lib.rs | 4 ++ event-publisher/src/publisher.rs | 35 ++++++++++ 15 files changed, 145 insertions(+), 2 deletions(-) rename {mbus-api => event-mbus-api}/Cargo.toml (92%) rename {mbus-api => event-mbus-api}/src/common.rs (100%) rename {mbus-api => event-mbus-api}/src/common/constants.rs (100%) rename {mbus-api => event-mbus-api}/src/common/errors.rs (100%) rename {mbus-api => event-mbus-api}/src/common/retry.rs (100%) rename {mbus-api => event-mbus-api}/src/lib.rs (100%) rename {mbus-api => event-mbus-api}/src/mbus_nats.rs (99%) rename {mbus-api => event-mbus-api}/src/message.rs (100%) create mode 100644 event-publisher/Cargo.toml create mode 100644 event-publisher/src/common.rs create mode 100644 event-publisher/src/common/constants.rs create mode 100644 event-publisher/src/event_handler.rs create mode 100644 event-publisher/src/event_layer.rs create mode 100644 event-publisher/src/lib.rs create mode 100644 event-publisher/src/publisher.rs diff --git a/mbus-api/Cargo.toml b/event-mbus-api/Cargo.toml similarity index 92% rename from mbus-api/Cargo.toml rename to event-mbus-api/Cargo.toml index 36b75de..2edaf82 100644 --- a/mbus-api/Cargo.toml +++ b/event-mbus-api/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "mbus-api" +name = "event-mbus-api" version = "0.1.0" edition = "2021" diff --git a/mbus-api/src/common.rs b/event-mbus-api/src/common.rs similarity index 100% rename from mbus-api/src/common.rs rename to event-mbus-api/src/common.rs diff --git a/mbus-api/src/common/constants.rs b/event-mbus-api/src/common/constants.rs similarity index 100% rename from mbus-api/src/common/constants.rs rename to event-mbus-api/src/common/constants.rs diff --git a/mbus-api/src/common/errors.rs b/event-mbus-api/src/common/errors.rs similarity index 100% rename from mbus-api/src/common/errors.rs rename to event-mbus-api/src/common/errors.rs diff --git a/mbus-api/src/common/retry.rs b/event-mbus-api/src/common/retry.rs similarity index 100% rename from mbus-api/src/common/retry.rs rename to event-mbus-api/src/common/retry.rs diff --git a/mbus-api/src/lib.rs b/event-mbus-api/src/lib.rs similarity index 100% rename from mbus-api/src/lib.rs rename to event-mbus-api/src/lib.rs diff --git a/mbus-api/src/mbus_nats.rs b/event-mbus-api/src/mbus_nats.rs similarity index 99% rename from mbus-api/src/mbus_nats.rs rename to event-mbus-api/src/mbus_nats.rs index 9d41735..e25141a 100644 --- a/mbus-api/src/mbus_nats.rs +++ b/event-mbus-api/src/mbus_nats.rs @@ -42,7 +42,7 @@ pub(crate) struct NatsMessageBus { impl NatsMessageBus { /// Connect to nats server. - pub async fn connect(server: &str) -> Client { + async fn connect(server: &str) -> Client { tracing::debug!("Connecting to the nats server {}...", server); // We retry in a loop until successful. Once connected the nats library will handle // reconnections for us. diff --git a/mbus-api/src/message.rs b/event-mbus-api/src/message.rs similarity index 100% rename from mbus-api/src/message.rs rename to event-mbus-api/src/message.rs diff --git a/event-publisher/Cargo.toml b/event-publisher/Cargo.toml new file mode 100644 index 0000000..b949b1c --- /dev/null +++ b/event-publisher/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "event-publisher" +version = "0.1.0" +edition = "2021" + +[dependencies] +tracing-subscriber = { version = "0.3.16", features = [ "env-filter" ] } +tokio = { version = "1.25.0"} +tracing = "0.1.37" +serde_json = "1.0.83" +event-mbus-api = { path = "../event-mbus-api" } \ No newline at end of file diff --git a/event-publisher/src/common.rs b/event-publisher/src/common.rs new file mode 100644 index 0000000..996f5fe --- /dev/null +++ b/event-publisher/src/common.rs @@ -0,0 +1,2 @@ +/// Constants module for constants. +pub(crate) mod constants; diff --git a/event-publisher/src/common/constants.rs b/event-publisher/src/common/constants.rs new file mode 100644 index 0000000..e0d9f40 --- /dev/null +++ b/event-publisher/src/common/constants.rs @@ -0,0 +1,3 @@ +/// Maximum number of message the channel can have at any instant. Once the channel is full sender +/// cannot send a message to channel untill a message is received by receiver. +pub const MAX_BUFFER_MSGS: usize = 500; diff --git a/event-publisher/src/event_handler.rs b/event-publisher/src/event_handler.rs new file mode 100644 index 0000000..d324bf7 --- /dev/null +++ b/event-publisher/src/event_handler.rs @@ -0,0 +1,22 @@ +use crate::{ + common::constants::MAX_BUFFER_MSGS, event_layer::EventLayer, publisher::MbusPublisher, +}; +use mbus_api::{event::EventMessage, event_traits::initilize_source_component}; + +/// Event handle. +pub struct EventHandle {} + +impl EventHandle { + /// Initilize the buffer, starts the event publisher and return the layer for handling the event + /// traces. + pub fn init(mbus_url: String, service_name: &str) -> EventLayer { + let (send, recv) = tokio::sync::mpsc::channel::(MAX_BUFFER_MSGS); + initilize_source_component(service_name); + let layer = EventLayer::new(send); + tokio::spawn(async move { + MbusPublisher::run(&mbus_url, recv).await; + // TODO handle the closed channel situation. + }); + layer + } +} diff --git a/event-publisher/src/event_layer.rs b/event-publisher/src/event_layer.rs new file mode 100644 index 0000000..35b71a2 --- /dev/null +++ b/event-publisher/src/event_layer.rs @@ -0,0 +1,66 @@ +use mbus_api::event::EventMessage; +use std::fmt::Debug; +use tokio::sync::mpsc::{error::TrySendError, Sender}; +use tracing::field::{Field, Visit}; +use tracing_subscriber::{layer::Context, Layer}; + +/// Custom layer for recording the events for mbus. +pub struct EventLayer { + sender: Sender, +} + +impl EventLayer { + // Creates the new EventLayer. The sender handle for the buffer is required to store the events + // in buffer as the events are recorded. + pub fn new(sender: Sender) -> Self { + Self { sender } + } +} + +// Notifies the EventLayer that an event has occurred. +impl Layer for EventLayer +where + S: tracing::Subscriber, +{ + // Records an event, gets the EventMessage from the event and sends it to the buffer. + fn on_event(&self, event: &tracing::Event<'_>, _ctx: Context<'_, S>) { + let mut visitor = EventVisitor::default(); + + event.record(&mut visitor); + + let output = visitor.0; + + if let Err(err) = self.sender.try_send(output) { + match err { + TrySendError::Full(_) => { + tracing::trace!("Buffer is full"); + } + TrySendError::Closed(_) => { + tracing::warn!(%err, "Error sending message to buffer"); + // TODO handle the closed channel situation. + } + } + } + } +} + +// Custom visitor to visit all the fields on on the events being recorded. +#[derive(Default)] +struct EventVisitor(EventMessage); + +impl Visit for EventVisitor { + // Visit a string value. Deserializes the string to EventMessage. + fn record_str(&mut self, _field: &Field, value: &str) { + match serde_json::from_str::(value) { + Ok(value) => { + self.0 = value; + } + Err(err) => { + tracing::warn!("Error while getting event message: {:?}", err); + } + }; + } + + // Required method. + fn record_debug(&mut self, _field: &Field, _value: &dyn Debug) {} +} diff --git a/event-publisher/src/lib.rs b/event-publisher/src/lib.rs new file mode 100644 index 0000000..d2e3317 --- /dev/null +++ b/event-publisher/src/lib.rs @@ -0,0 +1,4 @@ +pub mod common; +pub mod event_handler; +pub mod event_layer; +pub mod publisher; diff --git a/event-publisher/src/publisher.rs b/event-publisher/src/publisher.rs new file mode 100644 index 0000000..dc1c168 --- /dev/null +++ b/event-publisher/src/publisher.rs @@ -0,0 +1,35 @@ +use mbus_api::{event::EventMessage, mbus_nats::message_bus_init, Bus}; + +/// Message bus publisher. +#[derive(Clone)] +pub(crate) struct MbusPublisher {} + +impl MbusPublisher { + /// Initializes mbus and runs the publisher. + pub async fn run(mbus_url: &str, recv: tokio::sync::mpsc::Receiver) { + let mbus = message_bus_init(mbus_url, None).await; + Self::publish_events(mbus, recv).await; + } + + /// Receives even messages from buffer through receiver handler of the buffer and publishes the + /// messages to the mbus. + async fn publish_events( + mut mbus: impl Bus, + mut recv: tokio::sync::mpsc::Receiver, + ) { + loop { + match recv.recv().await { + Some(event_msg) => { + if let Err(err) = mbus.publish(&event_msg).await { + tracing::debug!("Error publishing event message to mbus: {:?}", err); + // TODO retry the event publish when there is publish error if the buffer if + // not half full. + } + } + // Channel has been closed and there are no remaining messages in the channel's + // buffer. + None => break, + } + } + } +}