From 06c7c6809631428411370c6eadfde0f8d810922f Mon Sep 17 00:00:00 2001 From: Willem Melching Date: Wed, 27 Mar 2024 12:00:43 +0100 Subject: [PATCH] [SocketCAN] Handle buffer full and try again later --- src/can/async_can.rs | 6 +++--- src/can/mod.rs | 5 +++-- src/panda/mod.rs | 10 ++++++---- src/socketcan/mod.rs | 22 ++++++++++++++-------- 4 files changed, 26 insertions(+), 17 deletions(-) diff --git a/src/can/async_can.rs b/src/can/async_can.rs index fda17f9..8f7b5e8 100644 --- a/src/can/async_can.rs +++ b/src/can/async_can.rs @@ -23,7 +23,7 @@ fn process( rx_sender: broadcast::Sender, mut tx_receiver: mpsc::Receiver<(Frame, oneshot::Sender<()>)>, ) { - let mut buffer: Vec = Vec::new(); + let mut buffer: VecDeque = VecDeque::new(); let mut callbacks: HashMap> = HashMap::new(); while shutdown_receiver.try_recv().is_err() { @@ -72,10 +72,10 @@ fn process( debug! {"TX {:?}", frame}; } - buffer.push(frame); + buffer.push_back(frame); } if !buffer.is_empty() { - adapter.send(&buffer).unwrap(); + adapter.send(&mut buffer).unwrap(); } std::thread::sleep(std::time::Duration::from_millis(1)); } diff --git a/src/can/mod.rs b/src/can/mod.rs index 12a6562..d4c7d84 100644 --- a/src/can/mod.rs +++ b/src/can/mod.rs @@ -4,6 +4,7 @@ pub mod adapter; pub mod async_can; use std::fmt; +use std::collections::VecDeque; pub use adapter::get_adapter; pub use async_can::AsyncCanAdapter; @@ -118,8 +119,8 @@ impl fmt::Debug for Frame { /// Trait for a Blocking CAN Adapter pub trait CanAdapter { - fn send(&mut self, frames: &[Frame]) -> Result<(), crate::error::Error>; - fn recv(&mut self) -> Result, crate::error::Error>; + fn send(&mut self, frames: &mut VecDeque) -> crate::Result<()>; + fn recv(&mut self) -> crate::Result>; } #[cfg(test)] diff --git a/src/panda/mod.rs b/src/panda/mod.rs index 432ba69..cf58e4b 100644 --- a/src/panda/mod.rs +++ b/src/panda/mod.rs @@ -5,10 +5,11 @@ mod error; mod usb_protocol; pub use error::Error; -use std::vec; +use std::collections::VecDeque; use crate::can::AsyncCanAdapter; use crate::can::CanAdapter; +use crate::can::Frame; use crate::panda::constants::{Endpoint, HwType, SafetyModel}; use crate::Result; use tracing::{info, warn}; @@ -168,12 +169,13 @@ impl Panda { impl CanAdapter for Panda { /// Sends a buffer of CAN messages to the panda. - fn send(&mut self, frames: &[crate::can::Frame]) -> Result<()> { + fn send(&mut self, frames: &mut VecDeque) -> Result<()> { if frames.is_empty() { return Ok(()); } - let buf = usb_protocol::pack_can_buffer(frames)?; + let frames: Vec = frames.drain(..).collect(); + let buf = usb_protocol::pack_can_buffer(&frames)?; for chunk in buf { self.handle @@ -183,7 +185,7 @@ impl CanAdapter for Panda { } /// Reads the current buffer of available CAN messages from the panda. This function will return an empty vector if no messages are available. In case of a recoverable error (e.g. unpacking error), the buffer will be cleared and an empty vector will be returned. - fn recv(&mut self) -> Result> { + fn recv(&mut self) -> Result> { let mut buf: [u8; MAX_BULK_SIZE] = [0; MAX_BULK_SIZE]; let recv: usize = self diff --git a/src/socketcan/mod.rs b/src/socketcan/mod.rs index 833fbd7..b2a6d10 100644 --- a/src/socketcan/mod.rs +++ b/src/socketcan/mod.rs @@ -1,11 +1,12 @@ //! This module provides a [`CanAdapter`] implementation for the [`socketcan`] crate. use crate::can::AsyncCanAdapter; use crate::can::CanAdapter; -use crate::error::Error; +use crate::Result; use socketcan::socket::Socket; use socketcan::SocketOptions; use tracing::info; +use std::collections::VecDeque; mod frame; @@ -22,7 +23,7 @@ impl SocketCan { Self { socket } } - pub fn new_async_from_name(name: &str) -> Result { + pub fn new_async_from_name(name: &str) -> Result { if let Ok(socket) = socketcan::CanFdSocket::open(name) { SocketCan::new_async(socket) } else { @@ -30,7 +31,7 @@ impl SocketCan { } } - pub fn new_async(socket: socketcan::CanFdSocket) -> Result { + pub fn new_async(socket: socketcan::CanFdSocket) -> Result { let socket = SocketCan::new(socket); info!("Connected to SocketCan"); @@ -39,16 +40,21 @@ impl SocketCan { } impl CanAdapter for SocketCan { - fn send(&mut self, frames: &[crate::can::Frame]) -> Result<(), Error> { - for frame in frames { - let frame: socketcan::frame::CanAnyFrame = frame.clone().into(); - self.socket.write_frame(&frame).unwrap(); + fn send(&mut self, frames: &mut VecDeque) -> Result<()> { + while let Some(frame) = frames.pop_front() { + let to_send: socketcan::frame::CanAnyFrame = frame.clone().into(); + + if let Err(e) = self.socket.write_frame(&to_send) { + tracing::warn!("Failed to send frame {:?}", e); + frames.push_front(frame); + break; + } } Ok(()) } - fn recv(&mut self) -> Result, Error> { + fn recv(&mut self) -> Result> { let mut frames = vec![]; while let Ok((frame, meta)) = self.socket.read_frame_with_meta() { let mut frame: crate::can::Frame = frame.into();