diff --git a/event-publisher/src/event_handler.rs b/event-publisher/src/event_handler.rs index c3bdd4c..76a147d 100644 --- a/event-publisher/src/event_handler.rs +++ b/event-publisher/src/event_handler.rs @@ -9,7 +9,20 @@ pub struct EventHandle {} impl EventHandle { /// Initilize the buffer, starts the event publisher and return the layer for handling the event /// traces. - pub fn init(mbus_url: String, service_name: &str, spawn_option: Option) -> EventLayer + pub fn init(mbus_url: String, service_name: &str) -> EventLayer { + let (send, recv) = tokio::sync::mpsc::channel::(MAX_BUFFER_MSGS); + initilize_source_component(service_name); + let layer = EventLayer::new(send); + tokio::spawn(async move { + MbusPublisher::run(&mbus_url, recv).await; + // TODO handle the closed channel situation. + }); + layer + } + + /// Initilize the buffer, starts the event publisher on the spawner specified in the args and + /// return the layer for handling the event traces. + pub fn init_ext(mbus_url: String, service_name: &str, spawn_fn: T) -> EventLayer where T: Fn(std::pin::Pin + Send>>), { @@ -20,15 +33,10 @@ impl EventHandle { MbusPublisher::run(&mbus_url, recv).await; // TODO handle the closed channel situation. }); - match spawn_option { - Some(spawn_fn) => { - // Spawn the publisher on the spawner specified in the args. - spawn_fn(publisher_future); - } - None => { - tokio::spawn(publisher_future); - } - } + + // Spawn the publisher on the spawner specified in the args. + spawn_fn(publisher_future); + layer } } diff --git a/events-api/Cargo.toml b/events-api/Cargo.toml index df59393..5650a76 100644 --- a/events-api/Cargo.toml +++ b/events-api/Cargo.toml @@ -8,7 +8,7 @@ edition = "2021" tonic-build = "0.8.4" [dependencies] -async-nats = "0.29.0" +async-nats = "0.31.0" bytes = "1.4.0" serde = "1.0.183" serde_json = "1.0.104" diff --git a/events-api/src/common/errors.rs b/events-api/src/common/errors.rs index c6322b0..d918784 100644 --- a/events-api/src/common/errors.rs +++ b/events-api/src/common/errors.rs @@ -9,13 +9,13 @@ pub enum Error { #[snafu(display( "Jetstream Publish Error. Retried '{}' times. Error: {}. Message : {}", retries, - error, + source, payload ))] PublishError { retries: u32, payload: String, - error: String, + source: async_nats::jetstream::context::PublishError, }, /// Failed to get consumer messages. #[snafu(display( @@ -28,9 +28,12 @@ pub enum Error { #[snafu(display( "Jetstream Error while getting/creating stream '{}': {}", stream, - error + source ))] - StreamError { stream: String, error: String }, + StreamError { + stream: String, + source: async_nats::jetstream::context::CreateStreamError, + }, /// Invalid event message id. #[snafu(display("Error while generating subject: {}", error_msg))] InvalidMessageId { error_msg: String }, diff --git a/events-api/src/common/retry.rs b/events-api/src/common/retry.rs index 2c8b389..5f6801c 100644 --- a/events-api/src/common/retry.rs +++ b/events-api/src/common/retry.rs @@ -60,7 +60,7 @@ impl BackoffOptions { } /// Specify a max number of retries before giving up. - pub fn with_max_retries(mut self, max_retries: u32) -> Self { + pub fn _with_max_retries(mut self, max_retries: u32) -> Self { self.max_retries = max_retries; self } diff --git a/events-api/src/mbus_nats.rs b/events-api/src/mbus_nats.rs index 16b198c..bb56cca 100644 --- a/events-api/src/mbus_nats.rs +++ b/events-api/src/mbus_nats.rs @@ -11,9 +11,10 @@ use async_nats::{ jetstream::{ self, consumer::{ - push::{Config, Messages}, + push::{Config, Messages, MessagesErrorKind}, DeliverPolicy, }, + context::PublishErrorKind, stream::Stream, Context, }, @@ -23,7 +24,7 @@ use async_trait::async_trait; use bytes::Bytes; use futures::StreamExt; use serde::{de::DeserializeOwned, Serialize}; -use std::{io::ErrorKind, marker::PhantomData, time::Duration}; +use std::{marker::PhantomData, time::Duration}; /// Result wrapper for Jetstream requests. pub type BusResult = Result; @@ -99,18 +100,6 @@ impl NatsMessageBus { } } - /// Ensures that the stream is created on jetstream. - pub async fn verify_stream_exists(&mut self) -> BusResult<()> { - let options = BackoffOptions::new().with_max_retries(0); - if let Err(err) = self.get_or_create_stream(Some(options), None).await { - tracing::warn!(%err, - "Error while getting/creating stream '{}'", - STREAM_NAME - ); - } - Ok(()) - } - /// Creates consumer and returns an iterator for the messages on the stream. async fn create_consumer_and_get_messages(&mut self, stream: Stream) -> BusResult { tracing::debug!("Getting/creating consumer for stats '{}'", CONSUMER_NAME); @@ -140,16 +129,19 @@ impl NatsMessageBus { ); return Ok(messages); } - Err(error) => error, + Err(error) => Error::ConsumerError { + consumer: CONSUMER_NAME.to_string(), + error: error.to_string(), + }, + }, + Err(error) => Error::ConsumerError { + consumer: CONSUMER_NAME.to_string(), + error: error.to_string(), }, - Err(error) => error, }; if tries == options.max_retries { - return Err(Error::ConsumerError { - consumer: CONSUMER_NAME.to_string(), - error: err.to_string(), - }); + return Err(err); } if log_error { tracing::warn!(%err, @@ -200,7 +192,7 @@ impl NatsMessageBus { if tries == options.max_retries { return Err(Error::StreamError { stream: STREAM_NAME.to_string(), - error: err.to_string(), + source: err, }); } if log_error { @@ -265,7 +257,6 @@ impl Bus for NatsMessageBus { tracing::warn!(%err, "Error publishing message to jetstream. Retrying..." ); - let _stream = self.verify_stream_exists().await; // Check and create a stream if necessary. Useful when the stream is deleted. log_error = false; } @@ -273,15 +264,22 @@ impl Bus for NatsMessageBus { return Err(Error::PublishError { retries: options.max_retries, payload: format!("{message:?}"), - error: err.to_string(), + source: err, }); } - if let Some(error) = err.downcast_ref::() { - if error.kind() == ErrorKind::TimedOut { + + match err.kind() { + PublishErrorKind::TimedOut => { tries += 1; continue; } + PublishErrorKind::StreamNotFound => { + let _ = self.get_or_create_stream(None, None).await?; + continue; + } + _ => (), } + backoff_with_options(&mut tries, &options).await; } } @@ -320,7 +318,12 @@ impl BusSubscription { let message = match message { Ok(message) => message, Err(error) => { - tracing::warn!(%error, "Error accessing jetstream message"); + match error.kind() { + MessagesErrorKind::ConsumerDeleted => (), // TODO create consumer again + MessagesErrorKind::MissingHeartbeat => (), /* TODO check if stream */ + // and consumer exists, + _ => tracing::warn!(%error, "Error accessing jetstream message"), + }; continue; } }; diff --git a/h2 b/h2 index f894725..929f268 160000 --- a/h2 +++ b/h2 @@ -1 +1 @@ -Subproject commit f89472550eef0593c6d61b05953503e9dde2ab99 +Subproject commit 929f2688911b5816039325c3dae2c2378155c917