Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: isotp discovery - try to discover servers with both padding options by default #527

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 68 additions & 12 deletions src/gallia/commands/discover/uds/isotp.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from gallia.services.uds import NegativeResponse, UDSClient, UDSRequest
from gallia.services.uds.core.utils import g_repr
from gallia.transports import ISOTPTransport, RawCANTransport, TargetURI
from gallia.utils import auto_int, can_id_repr, write_target_list
from gallia.utils import auto_int, can_id_repr, write_target_list, string_dict

logger = get_logger("gallia.discover.isotp")

Expand Down Expand Up @@ -44,9 +44,9 @@ def configure_parser(self) -> None:
)
self.parser.add_argument(
"--padding",
type=auto_int,
default=None,
help="set isotp padding",
type=string_dict,
default="auto",
help="set isotp padding (default: auto: try both)",
)
self.parser.add_argument(
"--pdu",
Expand Down Expand Up @@ -151,17 +151,24 @@ def build_isotp_frame(
frame += bytes([padding]) * pad_len

return frame

async def _scan_loop(self, args: Namespace, transport: RawCANTransport) -> list:
"""
Performs a scan loop on a CAN bus to discover UDS endpoints.

async def main(self, args: Namespace) -> None:
transport = await RawCANTransport.connect(args.target)
found = []
This function iterates through a range of CAN IDs specified by `args.start` and
`args.stop`, sending diagnostic requests and waiting for responses to identify
potential UDS servers.

sniff_time: int = args.sniff_time
logger.result(f"Recording idle bus communication for {sniff_time}s")
addr_idle = await transport.get_idle_traffic(sniff_time)
Args:
args: Namespace object containing scan parameters.
transport: RawCANTransport object representing the CAN bus connection.

logger.result(f"Found {len(addr_idle)} CAN Addresses on idle Bus")
transport.set_filter(addr_idle, inv_filter=True)
Returns:
A list of TargetURI objects representing discovered UDS endpoints.
"""

found = []

req = UDSRequest.parse_dynamic(args.pdu)
pdu = self.build_isotp_frame(req, padding=args.padding)
Expand Down Expand Up @@ -236,6 +243,55 @@ async def main(self, args: Namespace) -> None:
)
found.append(target)
break
return found

async def main(self, args: Namespace) -> None:
"""
The main entry point for the UDS ISOTP scanner.

This function performs the following steps:
1. Connects to the CAN bus using the provided target URI.
2. Records idle bus communication for a specified duration to identify
commonly used CAN IDs.
3. Sets a CAN filter to exclude those commonly used IDs from the scan.
4. Performs a scan loop with potentially two iterations:
- One scan with padding disabled (if `args.padding` is "auto").
- Another scan with padding set to 0x00 (if `args.padding` is "auto").
- Otherwise, a single scan is performed using the provided padding value.
5. Writes the discovered UDS endpoints to a file.
6. (Optional) Queries the discovered endpoints for descriptions using the
specified diagnostic information identifier (DID).

Args:
args: Namespace object containing scan parameters.
"""

transport = await RawCANTransport.connect(args.target)
found = []

sniff_time: int = args.sniff_time
logger.result(f"Recording idle bus communication for {sniff_time}s")
addr_idle = await transport.get_idle_traffic(sniff_time)

logger.result(f"Found {len(addr_idle)} CAN Addresses on idle Bus")
transport.set_filter(addr_idle, inv_filter=True)

if args.padding == "auto":
# Perform scan with padding off
args.padding = None
logger.trace("Scanning with padding off...")
found = await self._scan_loop(args, transport)

# Perform scan with padding 0x00
args.padding = 0x00
logger.trace("Scanning with padding 0x00...")
found += await self._scan_loop(args, transport)

# TODO: remove duplicity

else:
# Use the provided padding value
found = await self._scan_loop(args, transport)

logger.result(f"finished; found {len(found)} UDS endpoints")
ecus_file = self.artifacts_dir.joinpath("ECUs.txt")
Expand Down
15 changes: 15 additions & 0 deletions src/gallia/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,21 @@
def auto_int(arg: str) -> int:
return int(arg, 0)

def string_dict(arg: str) -> str:
"""Dictionary of specific string arguments

:param arg: Received argument
:type arg: str
:return: Returned argument
:rtype: str
"""
arg_lower = arg.lower()
if arg_lower == "auto":
return arg
elif arg_lower == "none":
return None
else:
auto_int(arg)

def strtobool(val: str) -> bool:
val = val.lower()
Expand Down