Skip to content

Commit

Permalink
feat(eventing): eventing integration
Browse files Browse the repository at this point in the history
Signed-off-by: Vandana Varakantham <[email protected]>
  • Loading branch information
Vandana Varakantham committed Aug 23, 2023
1 parent 0ddaae3 commit 4ba075f
Show file tree
Hide file tree
Showing 8 changed files with 573 additions and 30 deletions.
519 changes: 509 additions & 10 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion io-engine-tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ pub fn mayastor_test_init_ex(log_format: LogFormat) {
}
});

logger::init_ex("info,io_engine=DEBUG", log_format);
logger::init_ex("info,io_engine=DEBUG", log_format, None);

io_engine::CPS_INIT!();
}
Expand Down
4 changes: 3 additions & 1 deletion io-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ tracing = "0.1.26"
tracing-core = "0.1.19"
tracing-futures = "0.2.5"
tracing-log = "0.1.2"
tracing-subscriber = "0.2.20"
tracing-subscriber = "0.3.16"
udev = "0.6.2"
url = "2.2.2"
gettid = "0.1.2"
Expand All @@ -111,6 +111,8 @@ mayastor-api = { path = "../rpc/mayastor-api" }
spdk-rs = { path = "../spdk-rs" }
sysfs = { path = "../sysfs" }
version-info = { path = "../utils/io-engine-dependencies/version-info" }
events-api = { path = "../utils/io-engine-dependencies/events-api" }
event-publisher = { path = "../utils/io-engine-dependencies/event-publisher" }

[dependencies.serde]
features = ["derive"]
Expand Down
4 changes: 2 additions & 2 deletions io-engine/src/bin/io-engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,9 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
// automatically. trace maps to debug at FFI level. If RUST_LOG is
// passed, we will use it regardless.
if !args.log_components.is_empty() {
logger::init_ex("TRACE", log_format);
logger::init_ex("TRACE", log_format, args.events_url.clone());
} else {
logger::init_ex("INFO", log_format);
logger::init_ex("INFO", log_format, args.events_url.clone());
}

info!("{}", fmt_package_info!());
Expand Down
6 changes: 6 additions & 0 deletions io-engine/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,9 @@ pub const NVME_CONTROLLER_MODEL_ID: &str = "Mayastor NVMe controller";

/// NVMe NQN prefix.
pub const NVME_NQN_PREFIX: &str = "nqn.2019-05.io.openebs";

/// Target to filter eventing traces.
pub const EVENTING_TARGET: &str = "mbus-events-target";

/// Service/ source component generating events for eventing.
pub const SERVICE_NAME: &str = "io-engine";
4 changes: 4 additions & 0 deletions io-engine/src/core/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ pub struct MayastorCliArgs {
/// termination.
#[structopt(long, hidden = true)]
pub skip_sig_handler: bool,
/// Events message-bus endpoint url.
#[structopt(long)]
pub events_url: Option<url::Url>,
}

/// Mayastor features.
Expand Down Expand Up @@ -247,6 +250,7 @@ impl Default for MayastorCliArgs {
reactor_freeze_detection: false,
reactor_freeze_timeout: None,
skip_sig_handler: false,
events_url: None,
}
}
}
Expand Down
62 changes: 47 additions & 15 deletions io-engine/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,24 @@ use ansi_term::{Colour, Style};
use once_cell::sync::OnceCell;
use std::{ffi::CStr, fmt::Write, os::raw::c_char, path::Path, str::FromStr};

use crate::{
constants::{EVENTING_TARGET, SERVICE_NAME},
core::spawn,
};
use event_publisher::event_handler::EventHandle;
use tracing_core::{event::Event, Level, Metadata};
use tracing_log::{LogTracer, NormalizeEvent};
use tracing_subscriber::{
filter::{filter_fn, Targets},
fmt::{
format::{FmtSpan, FormatEvent, FormatFields},
format::{FmtSpan, FormatEvent, FormatFields, Writer},
FmtContext,
FormattedFields,
},
layer::{Layer, SubscriberExt},
registry::LookupSpan,
EnvFilter,
Registry,
};

/// Returns hostname.
Expand Down Expand Up @@ -101,7 +109,7 @@ impl<'a> FormatLevel<'a> {
}
}

fn fmt_line(&self, f: &mut dyn Write, line: &str) -> std::fmt::Result {
fn fmt_line(&self, mut f: Writer<'_>, line: &str) -> std::fmt::Result {
if self.ansi {
write!(
f,
Expand Down Expand Up @@ -304,7 +312,7 @@ where
fn format_event(
&self,
ctx: &FmtContext<'_, S, N>,
w: &mut dyn Write,
w: Writer<'_>,
evt: &Event<'_>,
) -> std::fmt::Result {
match self.style {
Expand All @@ -327,7 +335,7 @@ impl LogFormat {
fn default_style<S, N>(
&self,
context: &FmtContext<'_, S, N>,
writer: &mut dyn Write,
mut writer: Writer<'_>,
event: &Event<'_>,
) -> std::fmt::Result
where
Expand All @@ -353,7 +361,7 @@ impl LogFormat {
Location::new(meta)
)?;

context.format_fields(writer, event)?;
context.format_fields(writer.by_ref(), event)?;

writeln!(writer)
}
Expand All @@ -362,7 +370,7 @@ impl LogFormat {
fn compact_style<S, N>(
&self,
context: &FmtContext<'_, S, N>,
writer: &mut dyn Write,
mut writer: Writer<'_>,
event: &Event<'_>,
) -> std::fmt::Result
where
Expand Down Expand Up @@ -396,9 +404,9 @@ impl LogFormat {
write!(buf, "{}: ", &ctx[1 ..])?;
}

context.format_fields(&mut buf, event)?;
fmt.fmt_line(writer.by_ref(), &buf)?;

fmt.fmt_line(writer, &buf)?;
context.format_fields(writer.by_ref(), event)?;

writeln!(writer)
}
Expand All @@ -420,26 +428,50 @@ impl LogFormat {
///
/// We might want to suppress certain messages, as some of them are redundant,
/// in particular, the NOTICE messages as such, they are mapped to debug.
pub fn init_ex(level: &str, format: LogFormat) {
pub fn init_ex(level: &str, format: LogFormat, events_url: Option<url::Url>) {
// Set up a "logger" that simply translates any "log" messages it receives
// to trace events. This is for our custom spdk log messages, but also
// for any other third party crates still using the logging facade.

LogTracer::init().expect("failed to initialise LogTracer");

// Create a default subscriber.
let builder = tracing_subscriber::fmt::Subscriber::builder()
let builder = tracing_subscriber::fmt::layer()
.with_span_events(FmtSpan::FULL)
.event_format(format);
.event_format(format)
.with_filter(filter_fn(|metadata| {
// Exclude spans or events that have the target
// "mbus-events-target".
metadata.target() != EVENTING_TARGET
}));

let filter = match EnvFilter::try_from_default_env() {
Ok(filter) => filter,
Err(_) => tracing_subscriber::EnvFilter::new(level),
};

let subscriber = match EnvFilter::try_from_default_env() {
Ok(filter) => builder.with_env_filter(filter).finish(),
Err(_) => builder.with_env_filter(level).finish(),
// Get the optional eventing layer.
let events_layer = match events_url {
Some(url) => {
let events_filter =
Targets::new().with_target(EVENTING_TARGET, Level::INFO);
Some(
EventHandle::init(url.to_string(), SERVICE_NAME, Some(spawn))
.with_filter(events_filter),
)
}
None => None,
};

let subscriber = Registry::default()
.with(filter)
.with(Some(builder))
.with(events_layer);

tracing::subscriber::set_global_default(subscriber)
.expect("failed to set default subscriber");
}

pub fn init(level: &str) {
init_ex(level, Default::default())
init_ex(level, Default::default(), None)
}

0 comments on commit 4ba075f

Please sign in to comment.