Skip to content

Commit

Permalink
remove send, replace by stream function
Browse files Browse the repository at this point in the history
  • Loading branch information
pd0wm committed Mar 15, 2024
1 parent 536f90b commit bd2ca64
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 62 deletions.
12 changes: 6 additions & 6 deletions examples/isotp.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use automotive::can::Identifier;
use automotive::isotp::{IsoTPAdapter, IsoTPConfig};
use futures_util::stream::StreamExt;
use tracing_subscriber;

#[tokio::main]
Expand All @@ -10,20 +11,19 @@ async fn main() {
let config = IsoTPConfig::new(0, Identifier::Standard(0x7a1));
let isotp = IsoTPAdapter::new(&adapter, config);

let response = isotp.recv();
let mut stream = isotp.recv();

isotp.send(&[0x3e, 0x00]).await.unwrap();
let response = response.await.unwrap();
let response = stream.next().await.unwrap().unwrap();
println!("RX: {}", hex::encode(response));

let response = isotp.recv();
isotp.send(&[0x22, 0xf1, 0x81]).await.unwrap();
let response = response.await.unwrap();
let response = stream.next().await.unwrap().unwrap();
println!("RX: {}", hex::encode(response));

let mut long_request: [u8; 32] = [0; 32];
long_request[0] = 0x10;
let response = isotp.recv();
isotp.send(&long_request).await.unwrap();
let response = response.await.unwrap();
let response = stream.next().await.unwrap().unwrap();
println!("RX: {}", hex::encode(response));
}
98 changes: 43 additions & 55 deletions src/isotp/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
//! ISO Transport Protocol (ISO-TP) implementation, implements ISO 15765-2
//! ## Example:
//! ```rust
//! use futures_util::stream::StreamExt;
//! async fn isotp_example() {
//! let adapter = automotive::adapter::get_adapter().unwrap();
//! let config = automotive::isotp::IsoTPConfig::new(0, automotive::can::Identifier::Standard(0x7a1));
//! let isotp = automotive::isotp::IsoTPAdapter::new(&adapter, config);
//!
//! let response = isotp.recv(); // Create receiver before sending request
//! let mut stream = isotp.recv(); // Create receiver before sending request
//! isotp.send(&[0x3e, 0x00]).await.unwrap();
//! let response = response.await.unwrap();
//! let response = stream.next().await.unwrap().unwrap();
//! }
//! ```

Expand Down Expand Up @@ -180,38 +181,26 @@ impl<'a> IsoTPAdapter<'a> {

Ok(())
}
async fn recv_single_frame(
&self,
frame: Frame,
buf: &mut Vec<u8>,
len: &mut usize,
) -> Result<(), Error> {
*len = (frame.data[0] & 0xF) as usize;
if *len == 0 {
async fn recv_single_frame(&self, frame: Frame) -> Result<Vec<u8>, Error> {
let len = (frame.data[0] & 0xF) as usize;
if len == 0 {
// unimplemented!("CAN FD escape sequence for single frame not supported");
return Err(Error::IsoTPError(
crate::isotp::error::Error::MalformedFrame,
));
}

debug!("RX SF, length: {} data {}", *len, hex::encode(&frame.data));
debug!("RX SF, length: {} data {}", len, hex::encode(&frame.data));

buf.extend(&frame.data[1..*len + 1]);

Ok(())
Ok(frame.data[1..len + 1].to_vec())
}

async fn recv_first_frame(
&self,
frame: Frame,
buf: &mut Vec<u8>,
len: &mut usize,
) -> Result<(), Error> {
async fn recv_first_frame(&self, frame: Frame, buf: &mut Vec<u8>) -> Result<usize, Error> {
let b0 = frame.data[0] as u16;
let b1 = frame.data[1] as u16;
*len = ((b0 << 8 | b1) & 0xFFF) as usize;
let len = ((b0 << 8 | b1) & 0xFFF) as usize;

debug!("RX FF, length: {}, data {}", *len, hex::encode(&frame.data));
debug!("RX FF, length: {}, data {}", len, hex::encode(&frame.data));

buf.extend(&frame.data[2..]);

Expand All @@ -224,18 +213,18 @@ impl<'a> IsoTPAdapter<'a> {
let frame = Frame::new(self.config.bus, self.config.tx_id, &flow_control);
self.adapter.send(&frame).await;

Ok(())
Ok(len)
}

async fn recv_consecutive_frame(
&self,
frame: Frame,
buf: &mut Vec<u8>,
len: &mut usize,
idx: &mut u8,
) -> Result<(), Error> {
len: usize,
idx: u8,
) -> Result<u8, Error> {
let msg_idx = frame.data[0] & 0xF;
let remaining_len = *len - buf.len();
let remaining_len = len - buf.len();
let end_idx = std::cmp::min(remaining_len + 1, frame.data.len());

buf.extend(&frame.data[1..end_idx]);
Expand All @@ -246,13 +235,12 @@ impl<'a> IsoTPAdapter<'a> {
hex::encode(&buf)
);

if msg_idx != *idx {
if msg_idx != idx {
return Err(Error::IsoTPError(crate::isotp::error::Error::OutOfOrder));
}

*idx = if *idx == 0xF { 0 } else { *idx + 1 };

Ok(())
let new_idx = if idx == 0xF { 0 } else { idx + 1 };
Ok(new_idx)
}

/// Helper function to receive a single ISO-TP packet from the provided CAN stream.
Expand All @@ -261,47 +249,47 @@ impl<'a> IsoTPAdapter<'a> {
stream: &mut std::pin::Pin<&mut Timeout<impl Stream<Item = Frame>>>,
) -> Result<Vec<u8>, Error> {
let mut buf = Vec::new();
let mut len: usize = 0;
let mut len: Option<usize> = None;
let mut idx: u8 = 1;

while let Some(frame) = stream.next().await {
let frame = frame?;
match (frame.data[0] & FRAME_TYPE_MASK).into() {
FrameType::Single => self.recv_single_frame(frame, &mut buf, &mut len).await?,
FrameType::First => self.recv_first_frame(frame, &mut buf, &mut len).await?,
FrameType::Single => {
return Ok(self.recv_single_frame(frame).await?);
}
FrameType::First => {
// If we already received a first frame, something went wrong
if len.is_some() {
return Err(Error::IsoTPError(crate::isotp::error::Error::OutOfOrder));
}
len = Some(self.recv_first_frame(frame, &mut buf).await?);
}
FrameType::Consecutive => {
self.recv_consecutive_frame(frame, &mut buf, &mut len, &mut idx)
.await?
if let Some(len) = len {
idx = self
.recv_consecutive_frame(frame, &mut buf, len, idx)
.await?;
if buf.len() >= len {
return Ok(buf);
}
} else {
return Err(Error::IsoTPError(crate::isotp::error::Error::OutOfOrder));
}
}
FrameType::FlowControl => {} // Ignore flow control frames, these are from a simultaneous transmission
_ => {
return Err(Error::IsoTPError(
crate::isotp::error::Error::UnknownFrameType,
));
}
};

debug!("{} {}", len, buf.len());

if buf.len() >= len {
break;
}
}
Ok(buf)
}

/// Asynchronously receive an ISO-TP packet. Returns [`Error::Timeout`] if the timeout is exceeded between individual ISO-TP frames. Note the total time to receive a packet may be longer than the timeout.
pub async fn recv(&self) -> Result<Vec<u8>, Error> {
let stream = self
.adapter
.recv_filter(|frame| frame.id == self.config.rx_id && !frame.loopback)
.timeout(self.config.timeout);
tokio::pin!(stream);

self.recv_from_stream(&mut stream).await
unreachable!();
}

/// Stream of ISO-TP packets. Can be used if multiple responses are expected from a single request. Returns [`Error::Timeout`] if the timeout is exceeded between individual ISO-TP frames. Note the total time to receive a packet may be longer than the timeout.
pub fn stream(&self) -> impl Stream<Item = Result<Vec<u8>, Error>> + '_ {
pub fn recv(&self) -> impl Stream<Item = Result<Vec<u8>, Error>> + '_ {
let stream = self
.adapter
.recv_filter(|frame| frame.id == self.config.rx_id && !frame.loopback)
Expand Down
2 changes: 1 addition & 1 deletion src/uds/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl<'a> UDSClient<'a> {
request.extend(data);
}

let mut stream = self.adapter.stream();
let mut stream = self.adapter.recv();

self.adapter.send(&request).await?;

Expand Down

0 comments on commit bd2ca64

Please sign in to comment.