From 8b389326e9187f019de84b2a3744a3c5f46a450e Mon Sep 17 00:00:00 2001 From: Vandana Varakantham Date: Mon, 26 Jun 2023 20:04:27 +0000 Subject: [PATCH] feat(callhome): add protobuf event message Signed-off-by: Vandana Varakantham --- mbus-api/Cargo.toml | 9 +++++ mbus-api/build.rs | 7 ++++ mbus-api/protobuf/v1/event.proto | 65 ++++++++++++++++++++++++++++++++ mbus-api/src/common/errors.rs | 3 ++ mbus-api/src/event_traits.rs | 52 +++++++++++++++++++++++++ mbus-api/src/lib.rs | 11 +++++- mbus-api/src/mbus_nats.rs | 37 +++++++++++------- mbus-api/src/message.rs | 62 ------------------------------ 8 files changed, 168 insertions(+), 78 deletions(-) create mode 100644 mbus-api/build.rs create mode 100644 mbus-api/protobuf/v1/event.proto create mode 100644 mbus-api/src/event_traits.rs delete mode 100644 mbus-api/src/message.rs diff --git a/mbus-api/Cargo.toml b/mbus-api/Cargo.toml index 36b75de..85a63d5 100644 --- a/mbus-api/Cargo.toml +++ b/mbus-api/Cargo.toml @@ -3,6 +3,10 @@ name = "mbus-api" version = "0.1.0" edition = "2021" + +[build-dependencies] +tonic-build = "0.8.4" + [dependencies] async-nats = "0.29.0" bytes = "1.4.0" @@ -14,3 +18,8 @@ uuid = { version = "1.3.0", features = ["v4"] } async-trait = "0.1.64" futures = "0.3.26" snafu = "0.7.4" +tonic = "0.8.3" +prost = "0.11.6" +prost-wkt-types = "0.4.0" +chrono = "0.4.23" +once_cell = "1.17.0" diff --git a/mbus-api/build.rs b/mbus-api/build.rs new file mode 100644 index 0000000..be1d0ab --- /dev/null +++ b/mbus-api/build.rs @@ -0,0 +1,7 @@ +fn main() { + tonic_build::configure() + .type_attribute(".", "#[derive(serde::Deserialize, serde::Serialize)]") + .extern_path(".google.protobuf.Timestamp", "::prost_wkt_types::Timestamp") + .compile(&["protobuf/v1/event.proto"], &["protobuf/"]) + .unwrap_or_else(|e| panic!("event v1 protobuf compilation failed: {e}")); +} diff --git a/mbus-api/protobuf/v1/event.proto b/mbus-api/protobuf/v1/event.proto new file mode 100644 index 0000000..e49799f --- /dev/null +++ b/mbus-api/protobuf/v1/event.proto @@ -0,0 +1,65 @@ +syntax = "proto3"; + +import "google/protobuf/wrappers.proto"; + +import "google/protobuf/timestamp.proto"; + +package v1.event; + +// Event Message +message EventMessage { + // Event category + EventCategory category = 1; + // Event action + EventAction action = 2; + // Target id for the category against which action is performed + string target = 3; + // Event meta data + EventMeta metadata = 4; +} + +// Event Category +enum EventCategory { + UnknownCategory = 0; + Pool = 1; + Volume = 2; +} + +// Event Action +enum EventAction { + UnknownAction = 0; + Create = 1; + Delete = 2; +} + +// Event meta data +message EventMeta { + // Something that uniquely identifies events. UUIDv4. GUID. + string id = 1; + EventSource source = 2; + // Event timestamp + google.protobuf.Timestamp timestamp = 3; + // Version of the event message + Version version = 4; +} + +// Event source +message EventSource { + // Io-engine or core-agent + Component component = 1; + // Node name + string node = 2; +} + +// Source Component +enum Component { + UnknownComponent = 0; + CoreAgent = 1; + IoEngine = 2; +} + +// Event message version +enum Version { + UnknownVersion = 0; + V1 = 1; +} diff --git a/mbus-api/src/common/errors.rs b/mbus-api/src/common/errors.rs index b33a7c5..c6322b0 100644 --- a/mbus-api/src/common/errors.rs +++ b/mbus-api/src/common/errors.rs @@ -31,6 +31,9 @@ pub enum Error { error ))] StreamError { stream: String, error: String }, + /// Invalid event message id. + #[snafu(display("Error while generating subject: {}", error_msg))] + InvalidMessageId { error_msg: String }, /// Failed to serialise value. #[snafu(display("Failed to serialise value. Error {}", source))] SerdeSerializeError { source: serde_json::Error }, diff --git a/mbus-api/src/event_traits.rs b/mbus-api/src/event_traits.rs new file mode 100644 index 0000000..e7949a3 --- /dev/null +++ b/mbus-api/src/event_traits.rs @@ -0,0 +1,52 @@ +use crate::event::{Component, EventMessage, EventMeta, EventSource, Version}; +use chrono::Utc; +use once_cell::sync::OnceCell; +use std::str::FromStr; + +/// Once cell static variable to store the component field of the event source. +static COMPONENT: OnceCell = OnceCell::new(); + +/// Initialize the event source component with the service name. +pub fn initilize_source_component(comp: &str) { + COMPONENT.get_or_init(|| Component::from_str(comp).unwrap_or_default()); +} + +impl EventMeta { + /// New event metadata with default values. + pub fn new() -> Self { + Self { + id: uuid::Uuid::new_v4().to_string(), + source: Some(EventSource::new("".to_string())), + timestamp: Some(Utc::now().into()), + version: Version::V1.into(), + } + } +} + +impl EventSource { + /// New event source with default values. + pub fn new(node: String) -> Self { + let component = COMPONENT.get().cloned().unwrap_or_default().into(); + Self { component, node } + } +} + +impl EventMessage { + /// Generate the event trace with event message. + pub fn generate(&self) { + let event_data = serde_json::to_string(&self).unwrap_or_default(); + tracing::event!(target: "target-mbus", tracing::Level::INFO, event_data); + } +} + +// Get Component from service name. +impl FromStr for Component { + type Err = String; + fn from_str(c: &str) -> Result { + match c { + "agent-core" => Ok(Self::CoreAgent), + "io-engine" => Ok(Self::IoEngine), + _ => Err(format!("Received event from unknown component {c}")), + } + } +} diff --git a/mbus-api/src/lib.rs b/mbus-api/src/lib.rs index adcd5a3..e11f9cf 100644 --- a/mbus-api/src/lib.rs +++ b/mbus-api/src/lib.rs @@ -1,11 +1,11 @@ +use crate::event::EventMessage; use async_trait::async_trait; use mbus_nats::{BusResult, BusSubscription}; -use message::EventMessage; use serde::{de::DeserializeOwned, Serialize}; mod common; +pub mod event_traits; pub mod mbus_nats; -pub mod message; #[async_trait] pub trait Bus { @@ -16,3 +16,10 @@ pub trait Bus { async fn subscribe(&mut self) -> BusResult>; } + +/// Event module for the autogenerated event code. +pub mod event { + #![allow(clippy::derive_partial_eq_without_eq)] + #![allow(clippy::large_enum_variant)] + tonic::include_proto!("v1.event"); +} diff --git a/mbus-api/src/mbus_nats.rs b/mbus-api/src/mbus_nats.rs index 9d41735..943f8fa 100644 --- a/mbus-api/src/mbus_nats.rs +++ b/mbus-api/src/mbus_nats.rs @@ -4,7 +4,7 @@ use crate::{ errors::Error, retry::{backoff_with_options, BackoffOptions}, }, - message::EventMessage, + event::EventMessage, Bus, }; use async_nats::{ @@ -29,8 +29,10 @@ use std::{io::ErrorKind, marker::PhantomData, time::Duration}; pub type BusResult = Result; /// Initialise the Nats Message Bus. -pub async fn message_bus_init(server: &str) -> impl crate::Bus { - NatsMessageBus::new(server).await +pub async fn message_bus_init(server: &str, msg_replicas: Option) -> impl crate::Bus { + let bus = NatsMessageBus::new(server).await; + let _ = bus.get_or_create_stream(None, msg_replicas).await; + bus } /// Nats implementation of the Bus. @@ -100,7 +102,7 @@ impl NatsMessageBus { /// Ensures that the stream is created on jetstream. pub async fn verify_stream_exists(&mut self) -> BusResult<()> { let options = BackoffOptions::new().with_max_retries(0); - if let Err(err) = self.get_or_create_stream(Some(options)).await { + if let Err(err) = self.get_or_create_stream(Some(options), None).await { tracing::warn!(%err, "Error while getting/creating stream '{}'", STREAM_NAME @@ -164,6 +166,7 @@ impl NatsMessageBus { pub async fn get_or_create_stream( &self, retry_options: Option, + msg_replicas: Option, ) -> BusResult { tracing::debug!("Getting/creating stream '{}'", STREAM_NAME); let options = retry_options.unwrap_or(BackoffOptions::new()); @@ -177,7 +180,7 @@ impl NatsMessageBus { storage: async_nats::jetstream::stream::StorageType::Memory, /* The type of storage * backend, `File` * (default) */ - num_replicas: NUM_STREAM_REPLICAS, + num_replicas: msg_replicas.unwrap_or(NUM_STREAM_REPLICAS), ..async_nats::jetstream::stream::Config::default() }; @@ -212,13 +215,19 @@ impl NatsMessageBus { } /// Returns subject for the message. Should be unique for each message. - fn subject(msg: &EventMessage) -> String { - format!("events.{}.{}", msg.category.to_string(), msg.metadata.id) // If category is volume - // and id is - // 'id', then the subject - // for the - // message is - // 'events.volume.id' + fn subject(msg: &EventMessage) -> BusResult { + let event_id = match &msg.metadata { + Some(event_meta) => &event_meta.id, + None => { + return Err(Error::InvalidMessageId { + error_msg: "the message id must not be empty".to_string(), + }) + } + }; + // If category is volume and id is 'id', then the subject for the message is + // 'events.volume.id' + let subject = format!("events.{}.{}", msg.category, event_id); + Ok(subject) } /// The payload for mbus publish from the message. @@ -237,7 +246,7 @@ impl Bus for NatsMessageBus { let mut tries = 0; let mut log_error = true; - let subject = NatsMessageBus::subject(message); + let subject = NatsMessageBus::subject(message)?; let payload = NatsMessageBus::payload(message)?; loop { @@ -283,7 +292,7 @@ impl Bus for NatsMessageBus { &mut self, ) -> BusResult> { tracing::trace!("Subscribing to Nats message bus"); - let stream = self.get_or_create_stream(None).await?; + let stream = self.get_or_create_stream(None, None).await?; let messages = self.create_consumer_and_get_messages(stream).await?; tracing::trace!("Subscribed to Nats message bus successfully"); diff --git a/mbus-api/src/message.rs b/mbus-api/src/message.rs deleted file mode 100644 index 4f4c842..0000000 --- a/mbus-api/src/message.rs +++ /dev/null @@ -1,62 +0,0 @@ -use serde::{Deserialize, Serialize}; - -/// General structure of message bus event. -#[derive(Serialize, Deserialize, Debug)] -pub struct EventMessage { - /// Event Category. - pub category: Category, - /// Event Action. - pub action: Action, - /// Target id for the category against which action is performed. - pub target: String, - /// Event meta data. - pub metadata: EventMeta, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct EventMeta { - /// Something that uniquely identifies events. - /// UUIDv4. - /// GUID. - pub id: String, - pub source: EventSource, - /// Event timestamp. - pub event_timestamp: String, - /// Version of the event message. - pub version: String, -} - -/// Event source. -#[derive(Serialize, Deserialize, Debug)] -pub struct EventSource { - /// Io-engine or core-agent. - pub component: String, - /// Node name - pub node: String, -} - -/// Event action. -#[derive(Serialize, Deserialize, Debug)] -pub enum Action { - CreateEvent, - DeleteEvent, - Unknown, -} - -/// Event category. -#[derive(Serialize, Deserialize, Debug)] -pub enum Category { - Pool, - Volume, - Unknown, -} - -impl ToString for Category { - fn to_string(&self) -> String { - match self { - Category::Pool => "pool".to_string(), - Category::Volume => "volume".to_string(), - Category::Unknown => "".to_string(), - } - } -}