Skip to content

Commit

Permalink
feat(callhome): add protobuff event message
Browse files Browse the repository at this point in the history
Signed-off-by: Vandana Varakantham <[email protected]>
  • Loading branch information
Vandana Varakantham committed Jul 14, 2023
1 parent ae78761 commit 4b25f55
Show file tree
Hide file tree
Showing 11 changed files with 194 additions and 79 deletions.
11 changes: 10 additions & 1 deletion mbus-api/Cargo.toml → events-api/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
[package]
name = "mbus-api"
name = "events-api"
version = "0.1.0"
edition = "2021"


[build-dependencies]
tonic-build = "0.8.4"

[dependencies]
async-nats = "0.29.0"
bytes = "1.4.0"
Expand All @@ -14,3 +18,8 @@ uuid = { version = "1.3.0", features = ["v4"] }
async-trait = "0.1.64"
futures = "0.3.26"
snafu = "0.7.4"
tonic = "0.8.3"
prost = "0.11.6"
prost-wkt-types = "0.4.0"
chrono = "0.4.23"
once_cell = "1.17.0"
7 changes: 7 additions & 0 deletions events-api/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
fn main() {
tonic_build::configure()
.type_attribute(".", "#[derive(serde::Deserialize, serde::Serialize)]")
.extern_path(".google.protobuf.Timestamp", "::prost_wkt_types::Timestamp")
.compile(&["protobuf/v1/event.proto"], &["protobuf/"])
.unwrap_or_else(|e| panic!("event v1 protobuf compilation failed: {e}"));
}
65 changes: 65 additions & 0 deletions events-api/protobuf/v1/event.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
syntax = "proto3";

import "google/protobuf/wrappers.proto";

import "google/protobuf/timestamp.proto";

package v1.event;

// Event Message
message EventMessage {
// Event category
EventCategory category = 1;
// Event action
EventAction action = 2;
// Target id for the category against which action is performed
string target = 3;
// Event meta data
EventMeta metadata = 4;
}

// Event Category
enum EventCategory {
UnknownCategory = 0;
Pool = 1;
Volume = 2;
}

// Event Action
enum EventAction {
UnknownAction = 0;
Create = 1;
Delete = 2;
}

// Event meta data
message EventMeta {
// Something that uniquely identifies events. UUIDv4. GUID.
string id = 1;
EventSource source = 2;
// Event timestamp
google.protobuf.Timestamp timestamp = 3;
// Version of the event message
Version version = 4;
}

// Event source
message EventSource {
// Io-engine or core-agent
Component component = 1;
// Node name
string node = 2;
}

// Source Component
enum Component {
UnknownComponent = 0;
CoreAgent = 1;
IoEngine = 2;
}

// Event message version
enum Version {
UnknownVersion = 0;
V1 = 1;
}
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ pub enum Error {
error
))]
StreamError { stream: String, error: String },
/// Invalid event message id.
#[snafu(display("Error while generating subject: {}", error_msg))]
InvalidMessageId { error_msg: String },
/// Failed to serialise value.
#[snafu(display("Failed to serialise value. Error {}", source))]
SerdeSerializeError { source: serde_json::Error },
Expand Down
File renamed without changes.
77 changes: 77 additions & 0 deletions events-api/src/event_traits.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use crate::event::{Component, EventMessage, EventMeta, EventSource, Version};
use chrono::Utc;
use once_cell::sync::OnceCell;
use std::str::FromStr;

/// Once cell static variable to store the component field of the event source.
static COMPONENT: OnceCell<Component> = OnceCell::new();

/// Initialize the event source component with the service name.
pub fn initilize_source_component(comp: &str) {
COMPONENT.get_or_init(|| Component::from_str(comp).unwrap_or_default());
}

impl EventMeta {
/// New event metadata with default values.
pub fn new() -> Self {
Self {
id: uuid::Uuid::new_v4().to_string(),
source: Some(EventSource::new("".to_string())),
timestamp: Some(Utc::now().into()),
version: Version::V1.into(),
}
}
}

impl EventSource {
/// New event source with default values.
pub fn new(node: String) -> Self {
let component = COMPONENT.get().cloned().unwrap_or_default().into();
Self { component, node }
}
}

impl EventMessage {
/// Generate the event trace with event message.
pub fn generate(&self) {
let event_data = serde_json::to_string(&self).unwrap_or_default();
tracing::event!(target: "target-mbus", tracing::Level::INFO, event_data);
}
}

// Get Component from service name.
impl FromStr for Component {
type Err = String;
fn from_str(c: &str) -> Result<Self, Self::Err> {
match c {
"agent-core" => Ok(Self::CoreAgent),
"io-engine" => Ok(Self::IoEngine),
_ => Err(format!("Received event from unknown component {c}")),
}
}
}

#[cfg(test)]
mod test {
use crate::{event::*, event_traits::initilize_source_component};

#[test]
fn component_initialization_with_unknown_input() {
initilize_source_component("component");
let event_meta = EventMeta::new();
assert_eq!(
event_meta.source.unwrap().component,
Component::UnknownComponent as i32
)
}

#[test]
fn metadata_for_new_event() {
initilize_source_component("component");
let event_meta = EventMeta::new();
assert!(!event_meta.id.is_empty());
assert!(!event_meta.timestamp.unwrap().to_string().is_empty());
assert_eq!(event_meta.version, Version::V1 as i32);
assert_ne!(event_meta.source, None);
}
}
11 changes: 9 additions & 2 deletions mbus-api/src/lib.rs → events-api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::event::EventMessage;
use async_trait::async_trait;
use mbus_nats::{BusResult, BusSubscription};
use message::EventMessage;
use serde::{de::DeserializeOwned, Serialize};

mod common;
pub mod event_traits;
pub mod mbus_nats;
pub mod message;

#[async_trait]
pub trait Bus {
Expand All @@ -16,3 +16,10 @@ pub trait Bus {
async fn subscribe<T: Serialize + DeserializeOwned>(&mut self)
-> BusResult<BusSubscription<T>>;
}

/// Event module for the autogenerated event code.
pub mod event {
#![allow(clippy::derive_partial_eq_without_eq)]
#![allow(clippy::large_enum_variant)]
tonic::include_proto!("v1.event");
}
37 changes: 23 additions & 14 deletions mbus-api/src/mbus_nats.rs → events-api/src/mbus_nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
errors::Error,
retry::{backoff_with_options, BackoffOptions},
},
message::EventMessage,
event::EventMessage,
Bus,
};
use async_nats::{
Expand All @@ -29,8 +29,10 @@ use std::{io::ErrorKind, marker::PhantomData, time::Duration};
pub type BusResult<T> = Result<T, Error>;

/// Initialise the Nats Message Bus.
pub async fn message_bus_init(server: &str) -> impl crate::Bus {
NatsMessageBus::new(server).await
pub async fn message_bus_init(server: &str, msg_replicas: Option<usize>) -> impl crate::Bus {
let bus = NatsMessageBus::new(server).await;
let _ = bus.get_or_create_stream(None, msg_replicas).await;
bus
}

/// Nats implementation of the Bus.
Expand Down Expand Up @@ -100,7 +102,7 @@ impl NatsMessageBus {
/// Ensures that the stream is created on jetstream.
pub async fn verify_stream_exists(&mut self) -> BusResult<()> {
let options = BackoffOptions::new().with_max_retries(0);
if let Err(err) = self.get_or_create_stream(Some(options)).await {
if let Err(err) = self.get_or_create_stream(Some(options), None).await {
tracing::warn!(%err,
"Error while getting/creating stream '{}'",
STREAM_NAME
Expand Down Expand Up @@ -164,6 +166,7 @@ impl NatsMessageBus {
pub async fn get_or_create_stream(
&self,
retry_options: Option<BackoffOptions>,
msg_replicas: Option<usize>,
) -> BusResult<Stream> {
tracing::debug!("Getting/creating stream '{}'", STREAM_NAME);
let options = retry_options.unwrap_or(BackoffOptions::new());
Expand All @@ -177,7 +180,7 @@ impl NatsMessageBus {
storage: async_nats::jetstream::stream::StorageType::Memory, /* The type of storage
* backend, `File`
* (default) */
num_replicas: NUM_STREAM_REPLICAS,
num_replicas: msg_replicas.unwrap_or(NUM_STREAM_REPLICAS),
..async_nats::jetstream::stream::Config::default()
};

Expand Down Expand Up @@ -212,13 +215,19 @@ impl NatsMessageBus {
}

/// Returns subject for the message. Should be unique for each message.
fn subject(msg: &EventMessage) -> String {
format!("events.{}.{}", msg.category.to_string(), msg.metadata.id) // If category is volume
// and id is
// 'id', then the subject
// for the
// message is
// 'events.volume.id'
fn subject(msg: &EventMessage) -> BusResult<String> {
let event_id = match &msg.metadata {
Some(event_meta) => &event_meta.id,
None => {
return Err(Error::InvalidMessageId {
error_msg: "the message id must not be empty".to_string(),
})
}
};
// If category is volume and id is 'id', then the subject for the message is
// 'events.volume.id'
let subject = format!("events.{}.{}", msg.category, event_id);
Ok(subject)
}

/// The payload for mbus publish from the message.
Expand All @@ -237,7 +246,7 @@ impl Bus for NatsMessageBus {
let mut tries = 0;
let mut log_error = true;

let subject = NatsMessageBus::subject(message);
let subject = NatsMessageBus::subject(message)?;
let payload = NatsMessageBus::payload(message)?;

loop {
Expand Down Expand Up @@ -283,7 +292,7 @@ impl Bus for NatsMessageBus {
&mut self,
) -> BusResult<BusSubscription<T>> {
tracing::trace!("Subscribing to Nats message bus");
let stream = self.get_or_create_stream(None).await?;
let stream = self.get_or_create_stream(None, None).await?;

let messages = self.create_consumer_and_get_messages(stream).await?;
tracing::trace!("Subscribed to Nats message bus successfully");
Expand Down
62 changes: 0 additions & 62 deletions mbus-api/src/message.rs

This file was deleted.

0 comments on commit 4b25f55

Please sign in to comment.