Skip to content

Commit

Permalink
fix(eventing): add init_ext to use spawner arg and upgrade async_nats
Browse files Browse the repository at this point in the history
Signed-off-by: Vandana Varakantham <[email protected]>
  • Loading branch information
Vandana Varakantham committed Aug 18, 2023
1 parent dbeb499 commit 02deda6
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 43 deletions.
28 changes: 18 additions & 10 deletions event-publisher/src/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,20 @@ pub struct EventHandle {}
impl EventHandle {
/// Initilize the buffer, starts the event publisher and return the layer for handling the event
/// traces.
pub fn init<T>(mbus_url: String, service_name: &str, spawn_option: Option<T>) -> EventLayer
pub fn init(mbus_url: String, service_name: &str) -> EventLayer {
let (send, recv) = tokio::sync::mpsc::channel::<EventMessage>(MAX_BUFFER_MSGS);
initilize_source_component(service_name);
let layer = EventLayer::new(send);
tokio::spawn(async move {
MbusPublisher::run(&mbus_url, recv).await;
// TODO handle the closed channel situation.
});
layer
}

/// Initilize the buffer, starts the event publisher on the spawner specified in the args and
/// return the layer for handling the event traces.
pub fn init_ext<T>(mbus_url: String, service_name: &str, spawn_fn: T) -> EventLayer
where
T: Fn(std::pin::Pin<Box<dyn core::future::Future<Output = ()> + Send>>),
{
Expand All @@ -20,15 +33,10 @@ impl EventHandle {
MbusPublisher::run(&mbus_url, recv).await;
// TODO handle the closed channel situation.
});
match spawn_option {
Some(spawn_fn) => {
// Spawn the publisher on the spawner specified in the args.
spawn_fn(publisher_future);
}
None => {
tokio::spawn(publisher_future);
}
}

// Spawn the publisher on the spawner specified in the args.
spawn_fn(publisher_future);

layer
}
}
2 changes: 1 addition & 1 deletion events-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ edition = "2021"
tonic-build = "0.8.4"

[dependencies]
async-nats = "0.29.0"
async-nats = "0.31.0"
bytes = "1.4.0"
serde = "1.0.183"
serde_json = "1.0.104"
Expand Down
11 changes: 7 additions & 4 deletions events-api/src/common/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ pub enum Error {
#[snafu(display(
"Jetstream Publish Error. Retried '{}' times. Error: {}. Message : {}",
retries,
error,
source,
payload
))]
PublishError {
retries: u32,
payload: String,
error: String,
source: async_nats::jetstream::context::PublishError,
},
/// Failed to get consumer messages.
#[snafu(display(
Expand All @@ -28,9 +28,12 @@ pub enum Error {
#[snafu(display(
"Jetstream Error while getting/creating stream '{}': {}",
stream,
error
source
))]
StreamError { stream: String, error: String },
StreamError {
stream: String,
source: async_nats::jetstream::context::CreateStreamError,
},
/// Invalid event message id.
#[snafu(display("Error while generating subject: {}", error_msg))]
InvalidMessageId { error_msg: String },
Expand Down
2 changes: 1 addition & 1 deletion events-api/src/common/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl BackoffOptions {
}

/// Specify a max number of retries before giving up.
pub fn with_max_retries(mut self, max_retries: u32) -> Self {
pub fn _with_max_retries(mut self, max_retries: u32) -> Self {
self.max_retries = max_retries;
self
}
Expand Down
55 changes: 29 additions & 26 deletions events-api/src/mbus_nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ use async_nats::{
jetstream::{
self,
consumer::{
push::{Config, Messages},
push::{Config, Messages, MessagesErrorKind},
DeliverPolicy,
},
context::PublishErrorKind,
stream::Stream,
Context,
},
Expand All @@ -23,7 +24,7 @@ use async_trait::async_trait;
use bytes::Bytes;
use futures::StreamExt;
use serde::{de::DeserializeOwned, Serialize};
use std::{io::ErrorKind, marker::PhantomData, time::Duration};
use std::{marker::PhantomData, time::Duration};

/// Result wrapper for Jetstream requests.
pub type BusResult<T> = Result<T, Error>;
Expand Down Expand Up @@ -99,18 +100,6 @@ impl NatsMessageBus {
}
}

/// Ensures that the stream is created on jetstream.
pub async fn verify_stream_exists(&mut self) -> BusResult<()> {
let options = BackoffOptions::new().with_max_retries(0);
if let Err(err) = self.get_or_create_stream(Some(options), None).await {
tracing::warn!(%err,
"Error while getting/creating stream '{}'",
STREAM_NAME
);
}
Ok(())
}

/// Creates consumer and returns an iterator for the messages on the stream.
async fn create_consumer_and_get_messages(&mut self, stream: Stream) -> BusResult<Messages> {
tracing::debug!("Getting/creating consumer for stats '{}'", CONSUMER_NAME);
Expand Down Expand Up @@ -140,16 +129,19 @@ impl NatsMessageBus {
);
return Ok(messages);
}
Err(error) => error,
Err(error) => Error::ConsumerError {
consumer: CONSUMER_NAME.to_string(),
error: error.to_string(),
},
},
Err(error) => Error::ConsumerError {
consumer: CONSUMER_NAME.to_string(),
error: error.to_string(),
},
Err(error) => error,
};

if tries == options.max_retries {
return Err(Error::ConsumerError {
consumer: CONSUMER_NAME.to_string(),
error: err.to_string(),
});
return Err(err);
}
if log_error {
tracing::warn!(%err,
Expand Down Expand Up @@ -200,7 +192,7 @@ impl NatsMessageBus {
if tries == options.max_retries {
return Err(Error::StreamError {
stream: STREAM_NAME.to_string(),
error: err.to_string(),
source: err,
});
}
if log_error {
Expand Down Expand Up @@ -265,23 +257,29 @@ impl Bus for NatsMessageBus {
tracing::warn!(%err,
"Error publishing message to jetstream. Retrying..."
);
let _stream = self.verify_stream_exists().await; // Check and create a stream if necessary. Useful when the stream is deleted.
log_error = false;
}

if tries == options.max_retries {
return Err(Error::PublishError {
retries: options.max_retries,
payload: format!("{message:?}"),
error: err.to_string(),
source: err,
});
}
if let Some(error) = err.downcast_ref::<std::io::Error>() {
if error.kind() == ErrorKind::TimedOut {

match err.kind() {
PublishErrorKind::TimedOut => {
tries += 1;
continue;
}
PublishErrorKind::StreamNotFound => {
let _ = self.get_or_create_stream(None, None).await?;
continue;
}
_ => (),
}

backoff_with_options(&mut tries, &options).await;
}
}
Expand Down Expand Up @@ -320,7 +318,12 @@ impl<T: Serialize + DeserializeOwned> BusSubscription<T> {
let message = match message {
Ok(message) => message,
Err(error) => {
tracing::warn!(%error, "Error accessing jetstream message");
match error.kind() {
MessagesErrorKind::ConsumerDeleted => (), // TODO create consumer again
MessagesErrorKind::MissingHeartbeat => (), /* TODO check if stream */
// and consumer exists,
_ => tracing::warn!(%error, "Error accessing jetstream message"),
};
continue;
}
};
Expand Down
2 changes: 1 addition & 1 deletion h2
Submodule h2 updated 53 files
+0 −1 .github/FUNDING.yml
+47 −29 .github/workflows/CI.yml
+0 −43 CHANGELOG.md
+1 −2 Cargo.toml
+4 −1 examples/akamai.rs
+9 −58 src/client.rs
+5 −5 src/codec/framed_read.rs
+4 −16 src/error.rs
+8 −2 src/frame/data.rs
+2 −10 src/frame/go_away.rs
+5 −8 src/frame/headers.rs
+1 −11 src/frame/mod.rs
+3 −3 src/frame/settings.rs
+16 −13 src/hpack/decoder.rs
+4 −4 src/hpack/encoder.rs
+13 −13 src/hpack/header.rs
+5 −4 src/hpack/huffman/mod.rs
+4 −4 src/hpack/table.rs
+6 −6 src/hpack/test/fixture.rs
+1 −1 src/hpack/test/fuzz.rs
+43 −8 src/lib.rs
+4 −19 src/proto/connection.rs
+2 −6 src/proto/error.rs
+4 −0 src/proto/go_away.rs
+1 −2 src/proto/mod.rs
+4 −1 src/proto/ping_pong.rs
+8 −43 src/proto/streams/counts.rs
+36 −47 src/proto/streams/flow_control.rs
+0 −5 src/proto/streams/mod.rs
+51 −73 src/proto/streams/prioritize.rs
+68 −131 src/proto/streams/recv.rs
+60 −75 src/proto/streams/send.rs
+42 −44 src/proto/streams/state.rs
+7 −5 src/proto/streams/store.rs
+22 −59 src/proto/streams/stream.rs
+24 −45 src/proto/streams/streams.rs
+10 −71 src/server.rs
+8 −25 src/share.rs
+1 −1 tests/h2-support/src/client_ext.rs
+3 −22 tests/h2-support/src/frames.rs
+2 −2 tests/h2-support/src/mock.rs
+2 −2 tests/h2-support/src/prelude.rs
+2 −2 tests/h2-support/src/util.rs
+1 −1 tests/h2-tests/Cargo.toml
+8 −10 tests/h2-tests/tests/client_request.rs
+0 −197 tests/h2-tests/tests/flow_control.rs
+1 −1 tests/h2-tests/tests/hammer.rs
+3 −3 tests/h2-tests/tests/ping_pong.rs
+1 −1 tests/h2-tests/tests/push_promise.rs
+4 −78 tests/h2-tests/tests/server.rs
+8 −142 tests/h2-tests/tests/stream_states.rs
+6 −6 util/genfixture/src/main.rs
+5 −5 util/genhuff/src/main.rs

0 comments on commit 02deda6

Please sign in to comment.