Skip to content

Commit

Permalink
Add UDS tests using Scapy (#41)
Browse files Browse the repository at this point in the history
* add uds tests

* add test for negative response code
  • Loading branch information
pd0wm authored Mar 18, 2024
1 parent dbcbd43 commit 6b47053
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 9 deletions.
5 changes: 0 additions & 5 deletions scripts/vecu_isotp.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
#!/usr/bin/env python3
import argparse
import threading
import time

from scapy.all import *
from scapy.ansmachine import AnsweringMachine


def forwarding1(pkt):
Expand Down Expand Up @@ -38,8 +35,6 @@ def forwarding2(pkt):
'bs': args.bs,
}

print(config)

with ISOTPSocket(args.iface, tx_id=args.tx, rx_id=args.rx, **config) as sock1:
with ISOTPSocket(args.iface, tx_id=args.tx, rx_id=args.rx, listen_only=True, **config) as sock2:
sock1.send(b'\xAA') # Signal to test that ECU is ready
Expand Down
32 changes: 32 additions & 0 deletions scripts/vecu_uds.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!/usr/bin/env python3
import argparse
import threading

from scapy.all import *

if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--iface", type=str, default="vcan0")
parser.add_argument("--rx", type=int, default=0x7a1)
parser.add_argument("--tx", type=int, default=0x7a9)
parser.add_argument("--timeout", type=int, default=10)
parser.add_argument("--kernel-isotp", type=bool, default=False)

args = parser.parse_args()

conf.contribs['ISOTP'] = {'use-can-isotp-kernel-module': args.kernel_isotp}
load_contrib('isotp')
load_contrib('automotive.uds')
load_contrib('automotive.ecu')

with ISOTPSocket(args.iface, tx_id=args.tx, rx_id=args.rx, basecls=UDS) as sock:
sock.send(b'\xAA') # Signal to test that ECU is ready

resp = [
EcuResponse([EcuState(session=range(0,255))], responses=UDS() / UDS_TPPR()),
EcuResponse([EcuState(session=range(0,255))], responses=UDS() / UDS_RDBIPR(dataIdentifier=0x1234) / Raw(b"deadbeef")),
EcuResponse([EcuState(session=range(0,255))], responses=UDS() / UDS_NR(negativeResponseCode=0x33, requestServiceId=0x10)),
]
ecu = EcuAnsweringMachine(supported_responses=resp, main_socket=sock, basecls=UDS, verbose=False)
sim = threading.Thread(target=ecu, kwargs={'count': 4, 'timeout': args.timeout})
sim.start()
7 changes: 3 additions & 4 deletions tests/isotp_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use automotive::async_can::AsyncCanAdapter;
use automotive::can::Identifier;
use automotive::isotp::{IsoTPAdapter, IsoTPConfig};
use std::process::{Child, Command};
use std::{default, vec};
use tokio_stream::StreamExt;

static VECU_STARTUP_TIMEOUT_MS: u64 = 1000;
Expand Down Expand Up @@ -94,7 +93,7 @@ async fn isotp_test_flow_control() {
async fn isotp_test_padding() {
let config = VECUConfig {
padding: Some(0xCC),
..default::Default::default()
..Default::default()
};
isotp_test_echo(5, config).await;
isotp_test_echo(64, config).await;
Expand All @@ -107,7 +106,7 @@ async fn isotp_test_stmin() {
let stmin = std::time::Duration::from_millis(50);
let config = VECUConfig {
stmin: stmin.as_millis() as u32,
..default::Default::default()
..Default::default()
};

let start = std::time::Instant::now();
Expand All @@ -121,7 +120,7 @@ async fn isotp_test_stmin() {
async fn isotp_test_bs() {
let config = VECUConfig {
bs: 4,
..default::Default::default()
..Default::default()
};

// TODO: can we ensure that we actually wait for the
Expand Down
50 changes: 50 additions & 0 deletions tests/uds_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#![allow(dead_code, unused_imports)]
use automotive::async_can::AsyncCanAdapter;
use automotive::isotp::IsoTPAdapter;
use automotive::uds::error::Error as UDSError;
use automotive::uds::error::NegativeResponseCode;
use automotive::uds::UDSClient;
use std::process::{Child, Command};
use tokio_stream::StreamExt;

static VECU_STARTUP_TIMEOUT_MS: u64 = 1000;

struct ChildGuard(Child);
impl Drop for ChildGuard {
fn drop(&mut self) {
self.0.kill().unwrap()
}
}

async fn vecu_spawn(adapter: &AsyncCanAdapter) -> ChildGuard {
let stream = adapter
.recv()
.timeout(std::time::Duration::from_millis(VECU_STARTUP_TIMEOUT_MS));
tokio::pin!(stream);

let vecu = ChildGuard(Command::new("scripts/vecu_uds.py").spawn().unwrap());
stream.next().await.unwrap().expect("vecu did not start");

vecu
}

#[cfg(feature = "test_vcan")]
#[tokio::test]
#[serial_test::serial]
async fn uds_test_sids() {
let adapter = automotive::socketcan::SocketCan::new_async_from_name("vcan0").unwrap();
let _vecu = vecu_spawn(&adapter).await;

let isotp = IsoTPAdapter::from_id(&adapter, 0x7a1);
let uds = UDSClient::new(&isotp);

uds.tester_present().await.unwrap();

let data = uds.read_data_by_identifier(0x1234).await.unwrap();
assert_eq!(data, b"deadbeef".to_vec());

let resp = uds.diagnostic_session_control(0x2).await;
let security_access_denied =
UDSError::NegativeResponse(NegativeResponseCode::SecurityAccessDenied);
assert_eq!(resp, Err(security_access_denied.into()));
}

0 comments on commit 6b47053

Please sign in to comment.