Skip to content

Commit

Permalink
feat(callhome): add publisher library
Browse files Browse the repository at this point in the history
Signed-off-by: Vandana Varakantham <[email protected]>
  • Loading branch information
Vandana Varakantham committed Jul 13, 2023
1 parent ae78761 commit 2e6136d
Show file tree
Hide file tree
Showing 15 changed files with 145 additions and 2 deletions.
2 changes: 1 addition & 1 deletion mbus-api/Cargo.toml → event-mbus-api/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "mbus-api"
name = "event-mbus-api"
version = "0.1.0"
edition = "2021"

Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub(crate) struct NatsMessageBus {

impl NatsMessageBus {
/// Connect to nats server.
pub async fn connect(server: &str) -> Client {
async fn connect(server: &str) -> Client {
tracing::debug!("Connecting to the nats server {}...", server);
// We retry in a loop until successful. Once connected the nats library will handle
// reconnections for us.
Expand Down
File renamed without changes.
11 changes: 11 additions & 0 deletions event-publisher/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
name = "event-publisher"
version = "0.1.0"
edition = "2021"

[dependencies]
tracing-subscriber = { version = "0.3.16", features = [ "env-filter" ] }
tokio = { version = "1.25.0"}
tracing = "0.1.37"
serde_json = "1.0.83"
event-mbus-api = { path = "../event-mbus-api" }
2 changes: 2 additions & 0 deletions event-publisher/src/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/// Constants module for constants.
pub(crate) mod constants;
3 changes: 3 additions & 0 deletions event-publisher/src/common/constants.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
/// Maximum number of message the channel can have at any instant. Once the channel is full sender
/// cannot send a message to channel untill a message is received by receiver.
pub const MAX_BUFFER_MSGS: usize = 500;
22 changes: 22 additions & 0 deletions event-publisher/src/event_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use crate::{
common::constants::MAX_BUFFER_MSGS, event_layer::EventLayer, publisher::MbusPublisher,
};
use mbus_api::{event::EventMessage, event_traits::initilize_source_component};

/// Event handle.
pub struct EventHandle {}

impl EventHandle {
/// Initilize the buffer, starts the event publisher and return the layer for handling the event
/// traces.
pub fn init(mbus_url: String, service_name: &str) -> EventLayer {
let (send, recv) = tokio::sync::mpsc::channel::<EventMessage>(MAX_BUFFER_MSGS);
initilize_source_component(service_name);
let layer = EventLayer::new(send);
tokio::spawn(async move {
MbusPublisher::run(&mbus_url, recv).await;
// TODO handle the closed channel situation.
});
layer
}
}
66 changes: 66 additions & 0 deletions event-publisher/src/event_layer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use mbus_api::event::EventMessage;
use std::fmt::Debug;
use tokio::sync::mpsc::{error::TrySendError, Sender};
use tracing::field::{Field, Visit};
use tracing_subscriber::{layer::Context, Layer};

/// Custom layer for recording the events for mbus.
pub struct EventLayer {
sender: Sender<EventMessage>,
}

impl EventLayer {
// Creates the new EventLayer. The sender handle for the buffer is required to store the events
// in buffer as the events are recorded.
pub fn new(sender: Sender<EventMessage>) -> Self {
Self { sender }
}
}

// Notifies the EventLayer that an event has occurred.
impl<S> Layer<S> for EventLayer
where
S: tracing::Subscriber,
{
// Records an event, gets the EventMessage from the event and sends it to the buffer.
fn on_event(&self, event: &tracing::Event<'_>, _ctx: Context<'_, S>) {
let mut visitor = EventVisitor::default();

event.record(&mut visitor);

let output = visitor.0;

if let Err(err) = self.sender.try_send(output) {
match err {
TrySendError::Full(_) => {
tracing::trace!("Buffer is full");
}
TrySendError::Closed(_) => {
tracing::warn!(%err, "Error sending message to buffer");
// TODO handle the closed channel situation.
}
}
}
}
}

// Custom visitor to visit all the fields on on the events being recorded.
#[derive(Default)]
struct EventVisitor(EventMessage);

impl Visit for EventVisitor {
// Visit a string value. Deserializes the string to EventMessage.
fn record_str(&mut self, _field: &Field, value: &str) {
match serde_json::from_str::<EventMessage>(value) {
Ok(value) => {
self.0 = value;
}
Err(err) => {
tracing::warn!("Error while getting event message: {:?}", err);
}
};
}

// Required method.
fn record_debug(&mut self, _field: &Field, _value: &dyn Debug) {}
}
4 changes: 4 additions & 0 deletions event-publisher/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub mod common;
pub mod event_handler;
pub mod event_layer;
pub mod publisher;
35 changes: 35 additions & 0 deletions event-publisher/src/publisher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use mbus_api::{event::EventMessage, mbus_nats::message_bus_init, Bus};

/// Message bus publisher.
#[derive(Clone)]
pub(crate) struct MbusPublisher {}

impl MbusPublisher {
/// Initializes mbus and runs the publisher.
pub async fn run(mbus_url: &str, recv: tokio::sync::mpsc::Receiver<EventMessage>) {
let mbus = message_bus_init(mbus_url, None).await;
Self::publish_events(mbus, recv).await;
}

/// Receives even messages from buffer through receiver handler of the buffer and publishes the
/// messages to the mbus.
async fn publish_events(
mut mbus: impl Bus,
mut recv: tokio::sync::mpsc::Receiver<EventMessage>,
) {
loop {
match recv.recv().await {
Some(event_msg) => {
if let Err(err) = mbus.publish(&event_msg).await {
tracing::debug!("Error publishing event message to mbus: {:?}", err);
// TODO retry the event publish when there is publish error if the buffer if
// not half full.
}
}
// Channel has been closed and there are no remaining messages in the channel's
// buffer.
None => break,
}
}
}
}

0 comments on commit 2e6136d

Please sign in to comment.