Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow nested parsers on MessageProducer level (v4) #2087

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions application/apps/indexer/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion application/apps/indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ thiserror = "1.0"
lazy_static = "1.4"
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
dlt-core = "0.14"
# dlt-core = "0.14"
# TODO https://github.com/esrlabs/dlt-core/pull/24
dlt-core = { git = "https://github.com/kruss/dlt-core.git", branch = "dlt_network_traces" }
crossbeam-channel = "0.5"
futures = "0.3"
tokio-util = "0.7"
Expand Down
21 changes: 18 additions & 3 deletions application/apps/indexer/indexer_cli/src/interactive.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::{duration_report, Instant};
use futures::{pin_mut, stream::StreamExt};
use parsers::{dlt::DltParser, MessageStreamItem, ParseYield};
use parsers::{
dlt::DltParser, nested_parser::ParseRestResolver, LogMessage, MessageStreamItem,
ParseLogMsgError, ParseYield,
};
use processor::grabber::LineRange;
use rustyline::{error::ReadlineError, DefaultEditor};
use session::session::Session;
Expand Down Expand Up @@ -46,6 +49,8 @@ pub(crate) async fn handle_interactive_session(input: Option<PathBuf>) {
let udp_source = UdpSource::new(RECEIVER, vec![]).await.unwrap();
let dlt_parser = DltParser::new(None, None, None, false);
let mut dlt_msg_producer = MessageProducer::new(dlt_parser, udp_source, None);
//TODO AAZ: Make sure we need to provide the resolver in indexer CLI.
let mut parse_reslover = ParseRestResolver::new();
let msg_stream = dlt_msg_producer.as_stream();
pin_mut!(msg_stream);
loop {
Expand All @@ -56,10 +61,12 @@ pub(crate) async fn handle_interactive_session(input: Option<PathBuf>) {
}
item = msg_stream.next() => {
match item {
Some((_, MessageStreamItem::Item(ParseYield::Message(msg)))) => {
Some((_, MessageStreamItem::Item(ParseYield::Message(item)))) => {
let msg = parse_log_msg_lossy(item, &mut parse_reslover);
println!("msg: {msg}");
}
Some((_, MessageStreamItem::Item(ParseYield::MessageAndAttachment((msg, attachment))))) => {
Some((_, MessageStreamItem::Item(ParseYield::MessageAndAttachment((item, attachment))))) => {
let msg = parse_log_msg_lossy(item, &mut parse_reslover);
println!("msg: {msg}, attachment: {attachment:?}");
}
Some((_, MessageStreamItem::Item(ParseYield::Attachment(attachment)))) => {
Expand Down Expand Up @@ -194,3 +201,11 @@ async fn collect_user_input(tx: mpsc::UnboundedSender<Command>) -> JoinHandle<()
println!("done with readline loop");
})
}

/// Parse log messages without registering errors and calling [`ParseLogMsgError::parse_lossy()`] on errors
pub fn parse_log_msg_lossy<T: LogMessage>(item: T, err_resolver: &mut ParseRestResolver) -> String {
match item.try_resolve(Some(err_resolver)) {
Ok(item) => item.to_string(),
Err(err) => err.parse_lossy(),
}
}
4 changes: 3 additions & 1 deletion application/apps/indexer/parsers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ rand.workspace = true
# someip-messages = { path = "../../../../../someip"}
someip-messages = { git = "https://github.com/esrlabs/someip" }
# someip-payload = { path = "../../../../../someip-payload" }
someip-payload = { git = "https://github.com/esrlabs/someip-payload" }
# someip-payload = { git = "https://github.com/esrlabs/someip-payload" }
# TODO
someip-payload = { git = "https://github.com/kruss/someip-payload.git", branch = "robustness" }

[dev-dependencies]
stringreader = "0.1.1"
136 changes: 116 additions & 20 deletions application/apps/indexer/parsers/src/dlt/fmt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,15 @@ use log::trace;
use serde::ser::{Serialize, SerializeStruct, Serializer};

use std::{
fmt::{self, Formatter},
fmt::{self, Display, Formatter, Write},
str,
};

use crate::{
nested_parser::ParseRestResolver, GeneralParseLogError, LogMessage, ParseLogSeverity,
ResolveParseHint,
};

const DLT_COLUMN_SENTINAL: char = '\u{0004}';
const DLT_ARGUMENT_SENTINAL: char = '\u{0005}';
const DLT_NEWLINE_SENTINAL_SLICE: &[u8] = &[0x6];
Expand Down Expand Up @@ -281,6 +286,17 @@ impl<'a> Serialize for FormattableMessage<'a> {
None => state.serialize_field("payload", "[Unknown CtrlCommand]")?,
}
}
PayloadContent::NetworkTrace(slices) => {
state.serialize_field("app-id", &ext_header_app_id)?;
state.serialize_field("context-id", &ext_header_context_id)?;
state.serialize_field("message-type", &ext_header_msg_type)?;
let arg_string = slices
.iter()
.map(|slice| format!("{:02X?}", slice))
.collect::<Vec<String>>()
.join("|");
state.serialize_field("payload", &arg_string)?;
}
}
state.end()
}
Expand Down Expand Up @@ -386,12 +402,25 @@ impl<'a> FormattableMessage<'a> {
payload_string,
))
}
PayloadContent::NetworkTrace(slices) => {
let payload_string = slices
.iter()
.map(|slice| format!("{:02X?}", slice))
.collect::<Vec<String>>()
.join("|");
Ok(PrintableMessage::new(
ext_h_app_id,
eh_ctx_id,
ext_h_msg_type,
payload_string,
))
}
}
}

fn write_app_id_context_id_and_message_type(
&self,
f: &mut fmt::Formatter,
f: &mut impl std::fmt::Write,
) -> Result<(), fmt::Error> {
match self.message.extended_header.as_ref() {
Some(ext) => {
Expand Down Expand Up @@ -419,7 +448,7 @@ impl<'a> FormattableMessage<'a> {
&self,
id: u32,
data: &[u8],
f: &mut fmt::Formatter,
f: &mut impl std::fmt::Write,
) -> fmt::Result {
trace!("format_nonverbose_data");
let mut fibex_info_added = false;
Expand Down Expand Up @@ -511,7 +540,16 @@ impl<'a> FormattableMessage<'a> {
}
}

impl<'a> fmt::Display for FormattableMessage<'a> {
impl LogMessage for FormattableMessage<'_> {
type ParseError = GeneralParseLogError;

fn to_writer<W: std::io::Write>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
let bytes = self.message.as_bytes();
let len = bytes.len();
writer.write_all(&bytes)?;
Ok(len)
}

/// will format dlt Message with those fields:
/// ********* storage-header ********
/// date-time
Expand All @@ -528,43 +566,101 @@ impl<'a> fmt::Display for FormattableMessage<'a> {
/// context-id
///
/// payload
fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
fn try_resolve(
&self,
resolver: Option<&mut ParseRestResolver>,
) -> Result<impl Display, Self::ParseError> {
let mut msg = String::new();
// Taken from Documentation: string formatting is considered an infallible operation.
// Thus we can ignore `fmt::Error` errors.
// Link from Clippy: 'https://rust-lang.github.io/rust-clippy/master/index.html#/format_push_string'
// TODO: Consider another way of concatenating the string after prototyping.
if let Some(h) = &self.message.storage_header {
let tz = self.options.map(|o| o.tz);
match tz {
Some(Some(tz)) => {
write_tz_string(f, &h.timestamp, &tz)?;
write!(f, "{DLT_COLUMN_SENTINAL}{}", h.ecu_id)?;
let _ = write_tz_string(&mut msg, &h.timestamp, &tz);
let _ = write!(msg, "{DLT_COLUMN_SENTINAL}{}", h.ecu_id);
}
_ => {
let _ = write!(msg, "{}", DltStorageHeader(h));
}
_ => write!(f, "{}", DltStorageHeader(h))?,
};
}
let header = DltStandardHeader(&self.message.header);
write!(f, "{DLT_COLUMN_SENTINAL}",)?;
write!(f, "{header}")?;
write!(f, "{DLT_COLUMN_SENTINAL}",)?;
write!(msg, "{DLT_COLUMN_SENTINAL}",).unwrap();
write!(msg, "{header}").unwrap();
write!(msg, "{DLT_COLUMN_SENTINAL}",).unwrap();

match &self.message.payload {
PayloadContent::Verbose(arguments) => {
self.write_app_id_context_id_and_message_type(f)?;
arguments
.iter()
.try_for_each(|arg| write!(f, "{}{}", DLT_ARGUMENT_SENTINAL, DltArgument(arg)))
let _ = self.write_app_id_context_id_and_message_type(&mut msg);
arguments.iter().for_each(|arg| {
let _ = write!(msg, "{}{}", DLT_ARGUMENT_SENTINAL, DltArgument(arg));
});
}
PayloadContent::NonVerbose(id, data) => {
let _ = self.format_nonverbose_data(*id, data, &mut msg);
}
PayloadContent::NonVerbose(id, data) => self.format_nonverbose_data(*id, data, f),
PayloadContent::ControlMsg(ctrl_id, _data) => {
self.write_app_id_context_id_and_message_type(f)?;
let _ = self.write_app_id_context_id_and_message_type(&mut msg);
match service_id_lookup(ctrl_id.value()) {
Some((name, _desc)) => write!(f, "[{name}]"),
None => write!(f, "[Unknown CtrlCommand]"),
Some((name, _desc)) => {
let _ = write!(msg, "[{name}]");
}
None => {
let _ = write!(msg, "[Unknown CtrlCommand]");
}
}
}
PayloadContent::NetworkTrace(slices) => {
let _ = self.write_app_id_context_id_and_message_type(&mut msg);
let is_someip = self
.message
.extended_header
.as_ref()
.is_some_and(|ext_header| {
matches!(
ext_header.message_type,
MessageType::NetworkTrace(NetworkTraceType::Ipc)
| MessageType::NetworkTrace(NetworkTraceType::Someip)
)
});

if is_someip {
if let Some(resolver) = resolver {
if let Some(slice) = slices.get(1) {
match resolver.try_resolve(slice, ResolveParseHint::SomeIP) {
Some(Ok(resolved)) => {
let _ = write!(msg, "{resolved}");
return Ok(msg);
}
Some(Err(_)) | None => {
//TODO: Ignore nested Error while prototyping
}
}
}
}
}

slices.iter().for_each(|slice| {
let _ = write!(msg, "{}{:02X?}", DLT_ARGUMENT_SENTINAL, slice);
});

return Err(GeneralParseLogError::new(
msg,
"Error while resolving Network trace payload".into(),
ParseLogSeverity::Error,
));
}
}

Ok(msg)
}
}

fn write_tz_string(
f: &mut Formatter,
f: &mut impl std::fmt::Write,
time_stamp: &DltTimeStamp,
tz: &Tz,
) -> Result<(), fmt::Error> {
Expand Down
29 changes: 19 additions & 10 deletions application/apps/indexer/parsers/src/dlt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,10 @@ use dlt_core::{
parse::{dlt_consume_msg, dlt_message},
};
use serde::Serialize;
use std::{io::Write, ops::Range};
use std::{convert::Infallible, fmt::Display, io::Write, ops::Range};

use self::{attachment::FtScanner, fmt::FormatOptions};

impl LogMessage for FormattableMessage<'_> {
fn to_writer<W: Write>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
let bytes = self.message.as_bytes();
let len = bytes.len();
writer.write_all(&bytes)?;
Ok(len)
}
}

#[derive(Debug, Serialize)]
pub struct RawMessage {
pub content: Vec<u8>,
Expand All @@ -48,20 +39,38 @@ impl std::fmt::Display for RawMessage {
}

impl LogMessage for RangeMessage {
type ParseError = Infallible;

/// A RangeMessage only has range information and cannot serialize to bytes
fn to_writer<W: Write>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
writer.write_u64::<BigEndian>(self.range.start as u64)?;
writer.write_u64::<BigEndian>(self.range.end as u64)?;
Ok(8 + 8)
}

fn try_resolve(
&self,
_resolver: Option<&mut crate::nested_parser::ParseRestResolver>,
) -> Result<impl Display, Self::ParseError> {
Ok(self)
}
}

impl LogMessage for RawMessage {
type ParseError = Infallible;

fn to_writer<W: Write>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
let len = self.content.len();
writer.write_all(&self.content)?;
Ok(len)
}

fn try_resolve(
&self,
_resolver: Option<&mut crate::nested_parser::ParseRestResolver>,
) -> Result<impl Display, Self::ParseError> {
Ok(self)
}
}

#[derive(Default)]
Expand Down
Loading