diff --git a/examples/isotp.rs b/examples/isotp.rs index fbe8ec4..be3c95f 100644 --- a/examples/isotp.rs +++ b/examples/isotp.rs @@ -1,5 +1,6 @@ use automotive::can::Identifier; use automotive::isotp::{IsoTPAdapter, IsoTPConfig}; +use futures_util::stream::StreamExt; use tracing_subscriber; #[tokio::main] @@ -10,20 +11,19 @@ async fn main() { let config = IsoTPConfig::new(0, Identifier::Standard(0x7a1)); let isotp = IsoTPAdapter::new(&adapter, config); - let response = isotp.recv(); + let mut stream = isotp.recv(); + isotp.send(&[0x3e, 0x00]).await.unwrap(); - let response = response.await.unwrap(); + let response = stream.next().await.unwrap().unwrap(); println!("RX: {}", hex::encode(response)); - let response = isotp.recv(); isotp.send(&[0x22, 0xf1, 0x81]).await.unwrap(); - let response = response.await.unwrap(); + let response = stream.next().await.unwrap().unwrap(); println!("RX: {}", hex::encode(response)); let mut long_request: [u8; 32] = [0; 32]; long_request[0] = 0x10; - let response = isotp.recv(); isotp.send(&long_request).await.unwrap(); - let response = response.await.unwrap(); + let response = stream.next().await.unwrap().unwrap(); println!("RX: {}", hex::encode(response)); } diff --git a/src/isotp/mod.rs b/src/isotp/mod.rs index b745269..8ac066b 100644 --- a/src/isotp/mod.rs +++ b/src/isotp/mod.rs @@ -1,14 +1,15 @@ //! ISO Transport Protocol (ISO-TP) implementation, implements ISO 15765-2 //! ## Example: //! ```rust +//! use futures_util::stream::StreamExt; //! async fn isotp_example() { //! let adapter = automotive::adapter::get_adapter().unwrap(); //! let config = automotive::isotp::IsoTPConfig::new(0, automotive::can::Identifier::Standard(0x7a1)); //! let isotp = automotive::isotp::IsoTPAdapter::new(&adapter, config); //! -//! let response = isotp.recv(); // Create receiver before sending request +//! let mut stream = isotp.recv(); // Create receiver before sending request //! isotp.send(&[0x3e, 0x00]).await.unwrap(); -//! let response = response.await.unwrap(); +//! let response = stream.next().await.unwrap().unwrap(); //! } //! ``` @@ -180,38 +181,26 @@ impl<'a> IsoTPAdapter<'a> { Ok(()) } - async fn recv_single_frame( - &self, - frame: Frame, - buf: &mut Vec, - len: &mut usize, - ) -> Result<(), Error> { - *len = (frame.data[0] & 0xF) as usize; - if *len == 0 { + async fn recv_single_frame(&self, frame: Frame) -> Result, Error> { + let len = (frame.data[0] & 0xF) as usize; + if len == 0 { // unimplemented!("CAN FD escape sequence for single frame not supported"); return Err(Error::IsoTPError( crate::isotp::error::Error::MalformedFrame, )); } - debug!("RX SF, length: {} data {}", *len, hex::encode(&frame.data)); + debug!("RX SF, length: {} data {}", len, hex::encode(&frame.data)); - buf.extend(&frame.data[1..*len + 1]); - - Ok(()) + Ok(frame.data[1..len + 1].to_vec()) } - async fn recv_first_frame( - &self, - frame: Frame, - buf: &mut Vec, - len: &mut usize, - ) -> Result<(), Error> { + async fn recv_first_frame(&self, frame: Frame, buf: &mut Vec) -> Result { let b0 = frame.data[0] as u16; let b1 = frame.data[1] as u16; - *len = ((b0 << 8 | b1) & 0xFFF) as usize; + let len = ((b0 << 8 | b1) & 0xFFF) as usize; - debug!("RX FF, length: {}, data {}", *len, hex::encode(&frame.data)); + debug!("RX FF, length: {}, data {}", len, hex::encode(&frame.data)); buf.extend(&frame.data[2..]); @@ -224,18 +213,18 @@ impl<'a> IsoTPAdapter<'a> { let frame = Frame::new(self.config.bus, self.config.tx_id, &flow_control); self.adapter.send(&frame).await; - Ok(()) + Ok(len) } async fn recv_consecutive_frame( &self, frame: Frame, buf: &mut Vec, - len: &mut usize, - idx: &mut u8, - ) -> Result<(), Error> { + len: usize, + idx: u8, + ) -> Result { let msg_idx = frame.data[0] & 0xF; - let remaining_len = *len - buf.len(); + let remaining_len = len - buf.len(); let end_idx = std::cmp::min(remaining_len + 1, frame.data.len()); buf.extend(&frame.data[1..end_idx]); @@ -246,13 +235,12 @@ impl<'a> IsoTPAdapter<'a> { hex::encode(&buf) ); - if msg_idx != *idx { + if msg_idx != idx { return Err(Error::IsoTPError(crate::isotp::error::Error::OutOfOrder)); } - *idx = if *idx == 0xF { 0 } else { *idx + 1 }; - - Ok(()) + let new_idx = if idx == 0xF { 0 } else { idx + 1 }; + Ok(new_idx) } /// Helper function to receive a single ISO-TP packet from the provided CAN stream. @@ -261,47 +249,47 @@ impl<'a> IsoTPAdapter<'a> { stream: &mut std::pin::Pin<&mut Timeout>>, ) -> Result, Error> { let mut buf = Vec::new(); - let mut len: usize = 0; + let mut len: Option = None; let mut idx: u8 = 1; while let Some(frame) = stream.next().await { let frame = frame?; match (frame.data[0] & FRAME_TYPE_MASK).into() { - FrameType::Single => self.recv_single_frame(frame, &mut buf, &mut len).await?, - FrameType::First => self.recv_first_frame(frame, &mut buf, &mut len).await?, + FrameType::Single => { + return Ok(self.recv_single_frame(frame).await?); + } + FrameType::First => { + // If we already received a first frame, something went wrong + if len.is_some() { + return Err(Error::IsoTPError(crate::isotp::error::Error::OutOfOrder)); + } + len = Some(self.recv_first_frame(frame, &mut buf).await?); + } FrameType::Consecutive => { - self.recv_consecutive_frame(frame, &mut buf, &mut len, &mut idx) - .await? + if let Some(len) = len { + idx = self + .recv_consecutive_frame(frame, &mut buf, len, idx) + .await?; + if buf.len() >= len { + return Ok(buf); + } + } else { + return Err(Error::IsoTPError(crate::isotp::error::Error::OutOfOrder)); + } } + FrameType::FlowControl => {} // Ignore flow control frames, these are from a simultaneous transmission _ => { return Err(Error::IsoTPError( crate::isotp::error::Error::UnknownFrameType, )); } }; - - debug!("{} {}", len, buf.len()); - - if buf.len() >= len { - break; - } } - Ok(buf) - } - - /// Asynchronously receive an ISO-TP packet. Returns [`Error::Timeout`] if the timeout is exceeded between individual ISO-TP frames. Note the total time to receive a packet may be longer than the timeout. - pub async fn recv(&self) -> Result, Error> { - let stream = self - .adapter - .recv_filter(|frame| frame.id == self.config.rx_id && !frame.loopback) - .timeout(self.config.timeout); - tokio::pin!(stream); - - self.recv_from_stream(&mut stream).await + unreachable!(); } /// Stream of ISO-TP packets. Can be used if multiple responses are expected from a single request. Returns [`Error::Timeout`] if the timeout is exceeded between individual ISO-TP frames. Note the total time to receive a packet may be longer than the timeout. - pub fn stream(&self) -> impl Stream, Error>> + '_ { + pub fn recv(&self) -> impl Stream, Error>> + '_ { let stream = self .adapter .recv_filter(|frame| frame.id == self.config.rx_id && !frame.loopback) diff --git a/src/uds/mod.rs b/src/uds/mod.rs index 159a8f4..ac4b7fd 100644 --- a/src/uds/mod.rs +++ b/src/uds/mod.rs @@ -51,7 +51,7 @@ impl<'a> UDSClient<'a> { request.extend(data); } - let mut stream = self.adapter.stream(); + let mut stream = self.adapter.recv(); self.adapter.send(&request).await?;