Skip to content

Commit

Permalink
[SocketCAN] Handle buffer full and try again later
Browse files Browse the repository at this point in the history
  • Loading branch information
pd0wm committed Mar 28, 2024
1 parent f79018f commit 06c7c68
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 17 deletions.
6 changes: 3 additions & 3 deletions src/can/async_can.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ fn process<T: CanAdapter>(
rx_sender: broadcast::Sender<Frame>,
mut tx_receiver: mpsc::Receiver<(Frame, oneshot::Sender<()>)>,
) {
let mut buffer: Vec<Frame> = Vec::new();
let mut buffer: VecDeque<Frame> = VecDeque::new();
let mut callbacks: HashMap<BusIdentifier, VecDeque<FrameCallback>> = HashMap::new();

while shutdown_receiver.try_recv().is_err() {
Expand Down Expand Up @@ -72,10 +72,10 @@ fn process<T: CanAdapter>(
debug! {"TX {:?}", frame};
}

buffer.push(frame);
buffer.push_back(frame);
}
if !buffer.is_empty() {
adapter.send(&buffer).unwrap();
adapter.send(&mut buffer).unwrap();
}
std::thread::sleep(std::time::Duration::from_millis(1));
}
Expand Down
5 changes: 3 additions & 2 deletions src/can/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod adapter;
pub mod async_can;

use std::fmt;
use std::collections::VecDeque;

pub use adapter::get_adapter;
pub use async_can::AsyncCanAdapter;
Expand Down Expand Up @@ -118,8 +119,8 @@ impl fmt::Debug for Frame {

/// Trait for a Blocking CAN Adapter
pub trait CanAdapter {
fn send(&mut self, frames: &[Frame]) -> Result<(), crate::error::Error>;
fn recv(&mut self) -> Result<Vec<Frame>, crate::error::Error>;
fn send(&mut self, frames: &mut VecDeque<crate::can::Frame>) -> crate::Result<()>;
fn recv(&mut self) -> crate::Result<Vec<Frame>>;
}

#[cfg(test)]
Expand Down
10 changes: 6 additions & 4 deletions src/panda/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ mod error;
mod usb_protocol;

pub use error::Error;
use std::vec;
use std::collections::VecDeque;

use crate::can::AsyncCanAdapter;
use crate::can::CanAdapter;
use crate::can::Frame;
use crate::panda::constants::{Endpoint, HwType, SafetyModel};
use crate::Result;
use tracing::{info, warn};
Expand Down Expand Up @@ -168,12 +169,13 @@ impl Panda {

impl CanAdapter for Panda {
/// Sends a buffer of CAN messages to the panda.
fn send(&mut self, frames: &[crate::can::Frame]) -> Result<()> {
fn send(&mut self, frames: &mut VecDeque<Frame>) -> Result<()> {
if frames.is_empty() {
return Ok(());
}

let buf = usb_protocol::pack_can_buffer(frames)?;
let frames: Vec<Frame> = frames.drain(..).collect();
let buf = usb_protocol::pack_can_buffer(&frames)?;

for chunk in buf {
self.handle
Expand All @@ -183,7 +185,7 @@ impl CanAdapter for Panda {
}

/// Reads the current buffer of available CAN messages from the panda. This function will return an empty vector if no messages are available. In case of a recoverable error (e.g. unpacking error), the buffer will be cleared and an empty vector will be returned.
fn recv(&mut self) -> Result<Vec<crate::can::Frame>> {
fn recv(&mut self) -> Result<Vec<Frame>> {
let mut buf: [u8; MAX_BULK_SIZE] = [0; MAX_BULK_SIZE];

let recv: usize = self
Expand Down
22 changes: 14 additions & 8 deletions src/socketcan/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
//! This module provides a [`CanAdapter`] implementation for the [`socketcan`] crate.
use crate::can::AsyncCanAdapter;
use crate::can::CanAdapter;
use crate::error::Error;
use crate::Result;

use socketcan::socket::Socket;
use socketcan::SocketOptions;
use tracing::info;
use std::collections::VecDeque;

mod frame;

Expand All @@ -22,15 +23,15 @@ impl SocketCan {
Self { socket }
}

pub fn new_async_from_name(name: &str) -> Result<AsyncCanAdapter, Error> {
pub fn new_async_from_name(name: &str) -> Result<AsyncCanAdapter> {
if let Ok(socket) = socketcan::CanFdSocket::open(name) {
SocketCan::new_async(socket)
} else {
Err(crate::error::Error::NotFound)
}
}

pub fn new_async(socket: socketcan::CanFdSocket) -> Result<AsyncCanAdapter, Error> {
pub fn new_async(socket: socketcan::CanFdSocket) -> Result<AsyncCanAdapter> {
let socket = SocketCan::new(socket);

info!("Connected to SocketCan");
Expand All @@ -39,16 +40,21 @@ impl SocketCan {
}

impl CanAdapter for SocketCan {
fn send(&mut self, frames: &[crate::can::Frame]) -> Result<(), Error> {
for frame in frames {
let frame: socketcan::frame::CanAnyFrame = frame.clone().into();
self.socket.write_frame(&frame).unwrap();
fn send(&mut self, frames: &mut VecDeque<crate::can::Frame>) -> Result<()> {
while let Some(frame) = frames.pop_front() {
let to_send: socketcan::frame::CanAnyFrame = frame.clone().into();

if let Err(e) = self.socket.write_frame(&to_send) {
tracing::warn!("Failed to send frame {:?}", e);
frames.push_front(frame);
break;
}
}

Ok(())
}

fn recv(&mut self) -> Result<Vec<crate::can::Frame>, Error> {
fn recv(&mut self) -> Result<Vec<crate::can::Frame>> {
let mut frames = vec![];
while let Ok((frame, meta)) = self.socket.read_frame_with_meta() {
let mut frame: crate::can::Frame = frame.into();
Expand Down

0 comments on commit 06c7c68

Please sign in to comment.