Skip to content

Commit

Permalink
feat(callhome): event publisher library
Browse files Browse the repository at this point in the history
Signed-off-by: Vandana Varakantham <[email protected]>
  • Loading branch information
Vandana Varakantham committed Jun 30, 2023
1 parent ae78761 commit 5873698
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 1 deletion.
11 changes: 11 additions & 0 deletions event-publisher/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
name = "event-publisher"
version = "0.1.0"
edition = "2021"

[dependencies]
tracing-subscriber = { version = "0.3.16", features = [ "env-filter" ] }
tokio = { version = "1.25.0"}
tracing = "0.1.37"
serde_json = "1.0.83"
mbus-api = { path = "../mbus-api" }
2 changes: 2 additions & 0 deletions event-publisher/src/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/// Constants module for constants.
pub(crate) mod constants;
3 changes: 3 additions & 0 deletions event-publisher/src/common/constants.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
/// Maximum number of message the channel can have at any instant. Once the channel is full sender
/// cannot send a message to channel untill a message is received by receiver.
pub const MAX_BUFFER_MSGS: usize = 500;
22 changes: 22 additions & 0 deletions event-publisher/src/event_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use crate::{
common::constants::MAX_BUFFER_MSGS, event_layer::EventLayer, publisher::MbusPublisher,
};
use mbus_api::{event::EventMessage, event_traits::initilize_source_component};

/// Event handle.
pub struct EventHandle {}

impl EventHandle {
/// Initilize the buffer, starts the event publisher and return the layer for handling the event
/// traces.
pub fn init(mbus_url: String, service_name: String) -> EventLayer {
let (send, recv) = tokio::sync::mpsc::channel::<EventMessage>(MAX_BUFFER_MSGS);
initilize_source_component(service_name);
let layer = EventLayer::new(send);
tokio::spawn(async move {
MbusPublisher::run(&mbus_url, recv).await;
// TODO handle the closed channel situation.
});
layer
}
}
66 changes: 66 additions & 0 deletions event-publisher/src/event_layer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use mbus_api::event::EventMessage;
use std::fmt::Debug;
use tokio::sync::mpsc::{error::TrySendError, Sender};
use tracing::field::{Field, Visit};
use tracing_subscriber::{layer::Context, Layer};

/// Custom layer for recording the events for mbus.
pub struct EventLayer {
sender: Sender<EventMessage>,
}

impl EventLayer {
// Creates the new EventLayer. The sender handle for the buffer is required to store the events
// in buffer as the events are recorded.
pub fn new(sender: Sender<EventMessage>) -> Self {
Self { sender }
}
}

// Notifies the EventLayer that an event has occurred.
impl<S> Layer<S> for EventLayer
where
S: tracing::Subscriber,
{
// Records an event, gets the EventMessage from the event and sends it to the buffer.
fn on_event(&self, event: &tracing::Event<'_>, _ctx: Context<'_, S>) {
let mut visitor = EventVisitor::default();

event.record(&mut visitor);

let output = visitor.0;

if let Err(err) = self.sender.try_send(output) {
match err {
TrySendError::Full(_) => {
tracing::warn!("Buffer is full");
}
TrySendError::Closed(_) => {
tracing::warn!(%err, "Error sending message to buffer");
// TODO handle the closed channel situation.
}
}
}
}
}

// Custom visitor to visit all the fields on on the events being recorded.
#[derive(Default)]
struct EventVisitor(EventMessage);

impl Visit for EventVisitor {
// Visit a string value. Deserializes the string to EventMessage.
fn record_str(&mut self, _field: &Field, value: &str) {
match serde_json::from_str::<EventMessage>(value) {
Ok(value) => {
self.0 = value;
}
Err(err) => {
tracing::warn!("Error while getting event message: {:?}", err);
}
};
}

// Required method.
fn record_debug(&mut self, _field: &Field, _value: &dyn Debug) {}
}
4 changes: 4 additions & 0 deletions event-publisher/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub mod common;
pub mod event_handler;
pub mod event_layer;
pub mod publisher;
35 changes: 35 additions & 0 deletions event-publisher/src/publisher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use mbus_api::{event::EventMessage, mbus_nats::message_bus_init, Bus};

/// Message bus publisher.
#[derive(Clone)]
pub(crate) struct MbusPublisher {}

impl MbusPublisher {
/// Initializes mbus and runs the publisher.
pub async fn run(mbus_url: &str, recv: tokio::sync::mpsc::Receiver<EventMessage>) {
let mbus = message_bus_init(mbus_url).await;
Self::publish_events(mbus, recv).await;
}

/// Receives even messages from buffer through receiver handler of the buffer and publishes the
/// messages to the mbus.
async fn publish_events(
mut mbus: impl Bus,
mut recv: tokio::sync::mpsc::Receiver<EventMessage>,
) {
loop {
match recv.recv().await {
Some(event_msg) => {
if let Err(err) = mbus.publish(&event_msg).await {
tracing::warn!("Error publishing event message to mbus: {:?}", err);
// TODO retry the event publish when there is publish error if the buffer if
// not half full.
}
}
// Channel has been closed and there are no remaining messages in the channel's
// buffer.
None => break,
}
}
}
}
2 changes: 1 addition & 1 deletion mbus-api/src/mbus_nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub(crate) struct NatsMessageBus {

impl NatsMessageBus {
/// Connect to nats server.
pub async fn connect(server: &str) -> Client {
async fn connect(server: &str) -> Client {
tracing::debug!("Connecting to the nats server {}...", server);
// We retry in a loop until successful. Once connected the nats library will handle
// reconnections for us.
Expand Down

0 comments on commit 5873698

Please sign in to comment.