From 0ad3b885619317691bcebcf41985d34e8ae0cee1 Mon Sep 17 00:00:00 2001 From: fabian Date: Fri, 26 Jan 2024 21:21:49 +0100 Subject: [PATCH] ... --- src/gallia/command/base.py | 72 +++--------------------- src/gallia/command/uds.py | 26 +-------- src/gallia/commands/primitive/uds/dtc.py | 60 +++++--------------- 3 files changed, 23 insertions(+), 135 deletions(-) diff --git a/src/gallia/command/base.py b/src/gallia/command/base.py index 5ac019be..0c642597 100644 --- a/src/gallia/command/base.py +++ b/src/gallia/command/base.py @@ -1,7 +1,6 @@ # SPDX-FileCopyrightText: AISEC Pentesting Team # # SPDX-License-Identifier: Apache-2.0 - import argparse import asyncio import fcntl @@ -96,7 +95,6 @@ class BaseCommand(ABC): SHORT_HELP: str | None = None #: The string which is shown at the bottom of --help. EPILOG: str | None = None - #: Enable a artifacts_dir. Setting this property to #: True enables the creation of a logfile. HAS_ARTIFACTS_DIR: bool = False @@ -115,9 +113,7 @@ def __init__(self, parser: ArgumentParser, config: Config = Config()) -> None: self.run_meta = RunMeta( command=sys.argv, command_meta=CommandMeta( - command=self.COMMAND, - group=self.GROUP, - subgroup=self.SUBGROUP, + command=self.COMMAND, group=self.GROUP, subgroup=self.SUBGROUP ), start_time=datetime.now(tz).isoformat(), exit_code=0, @@ -134,17 +130,12 @@ def run(self, args: Namespace) -> int: ... def run_hook( - self, - variant: HookVariant, - args: Namespace, - exit_code: int | None = None, + self, variant: HookVariant, args: Namespace, exit_code: int | None = None ) -> None: script = args.pre_hook if variant == HookVariant.PRE else args.post_hook if script is None or script == "": return - hook_id = f"{variant.value}-hook" - argv = sys.argv[:] argv[0] = Path(argv[0]).name env = { @@ -152,10 +143,8 @@ def run_hook( "GALLIA_HOOK": variant.value, "GALLIA_INVOCATION": " ".join(argv), } | os.environ - if variant == HookVariant.POST: env["GALLIA_META"] = self.run_meta.json() - if self.COMMAND is not None: env["GALLIA_COMMAND"] = self.COMMAND if self.GROUP is not None: @@ -164,15 +153,9 @@ def run_hook( env["GALLIA_GROUP"] = self.SUBGROUP if exit_code is not None: env["GALLIA_EXIT_CODE"] = str(exit_code) - try: p = run( - script, - env=env, - text=True, - capture_output=True, - shell=True, - check=True, + script, env=env, text=True, capture_output=True, shell=True, check=True ) stdout = p.stdout stderr = p.stderr @@ -180,7 +163,6 @@ def run_hook( logger.warning(f"{variant.value}-hook failed (exit code: {p.returncode})") stdout = e.stdout stderr = e.stderr - if stdout: logger.info(p.stdout.strip(), extra={"tags": [hook_id, "stdout"]}) if stderr: @@ -238,7 +220,6 @@ def configure_class_parser(self) -> None: type=Path, help="Path to sqlite3 database", ) - if self.HAS_ARTIFACTS_DIR: mutex_group = group.add_mutually_exclusive_group() mutex_group.add_argument( @@ -266,7 +247,6 @@ async def _db_insert_run_meta(self, args: Namespace) -> None: if args.db is not None: self.db_handler = DBHandler(args.db) await self.db_handler.connect() - await self.db_handler.insert_run_meta( script=sys.argv[0].split()[-1], arguments=sys.argv[1:], @@ -287,7 +267,6 @@ async def _db_finish_run_meta(self) -> None: ) except Exception as e: logger.warning(f"Could not write the run meta to the database: {e!r}") - try: await self.db_handler.disconnect() except Exception as e: @@ -301,25 +280,19 @@ def _dump_environment(self, path: Path) -> None: def _add_latest_link(self, path: Path) -> None: dirs = list(path.glob("run-*")) dirs.sort(key=lambda x: x.name) - latest_dir = dirs[-1].relative_to(path) - symlink = path.joinpath("LATEST") symlink.unlink(missing_ok=True) symlink.symlink_to(latest_dir) def prepare_artifactsdir( - self, - base_dir: Path | None = None, - force_path: Path | None = None, + self, base_dir: Path | None = None, force_path: Path | None = None ) -> Path: if force_path is not None: if force_path.is_dir(): return force_path - force_path.mkdir(parents=True) return force_path - if base_dir is not None: _command_dir = "" if self.GROUP is not None: @@ -328,28 +301,21 @@ def prepare_artifactsdir( _command_dir += f"_{self.SUBGROUP}" if self.COMMAND is not None: _command_dir += f"_{self.COMMAND}" - # When self.GROUP is None, then # _command_dir starts with "_"; remove it. if _command_dir.startswith("_"): _command_dir = _command_dir.removeprefix("_") - # If self.GROUP, self.SUBGROUP, and # self.COMMAND are None, then fallback to self.id. if _command_dir == "": _command_dir = self.id - command_dir = base_dir.joinpath(_command_dir) - _run_dir = f"run-{datetime.now().strftime('%Y%m%d-%H%M%S.%f')}" artifacts_dir = command_dir.joinpath(_run_dir).absolute() artifacts_dir.mkdir(parents=True) - self._dump_environment(artifacts_dir.joinpath(FileNames.ENV.value)) self._add_latest_link(command_dir) - return artifacts_dir.absolute() - raise ValueError("base_dir or force_path must be different from None") def _aquire_flock(self, path: Path) -> None: @@ -377,11 +343,9 @@ def entry_point(self, args: Namespace) -> int: except OSError as e: logger.critical(f"Unable to lock {p}: {e}") return exitcode.OSFILE - if self.HAS_ARTIFACTS_DIR: self.artifacts_dir = self.prepare_artifactsdir( - args.artifacts_base, - args.artifacts_dir, + args.artifacts_base, args.artifacts_dir ) self.log_file_handlers.append( add_zst_log_handler( @@ -390,12 +354,9 @@ def entry_point(self, args: Namespace) -> int: file_log_level=get_file_log_level(args), ) ) - if args.hooks: self.run_hook(HookVariant.PRE, args) - asyncio.run(self._db_insert_run_meta(args)) - exit_code = 0 try: exit_code = self.run(args) @@ -423,21 +384,16 @@ def entry_point(self, args: Namespace) -> int: finally: self.run_meta.exit_code = exit_code self.run_meta.end_time = datetime.now(tz).isoformat() - asyncio.run(self._db_finish_run_meta()) - if self.HAS_ARTIFACTS_DIR: self.artifacts_dir.joinpath(FileNames.META.value).write_text( self.run_meta.json() + "\n" ) logger.notice(f"Stored artifacts at {self.artifacts_dir}") - if args.hooks: self.run_hook(HookVariant.POST, args, exit_code) - if self._lock_file_fd is not None: self._release_flock() - return exit_code @@ -464,7 +420,6 @@ def run(self, args: Namespace) -> int: self.main(args) finally: self.teardown(args) - return exitcode.OK @@ -516,10 +471,7 @@ class Scanner(AsyncScript, ABC): GROUP = "scan" HAS_ARTIFACTS_DIR = True - CATCHED_EXCEPTIONS: list[type[Exception]] = [ - ConnectionError, - UDSException, - ] + CATCHED_EXCEPTIONS: list[type[Exception]] = [ConnectionError, UDSException] def __init__(self, parser: ArgumentParser, config: Config = Config()) -> None: super().__init__(parser, config) @@ -534,7 +486,6 @@ async def main(self, args: Namespace) -> None: async def setup(self, args: Namespace) -> None: if args.target is None: self.parser.error("--target is required") - if args.power_supply is not None: self.power_supply = await PowerSupply.connect(args.power_supply) if args.power_cycle is True: @@ -543,7 +494,6 @@ async def setup(self, args: Namespace) -> None: ) elif args.power_cycle is True: self.parser.error("--power-cycle needs --power-supply") - # Start dumpcap as the first subprocess; otherwise network # traffic might be missing. if args.dumpcap: @@ -554,18 +504,15 @@ async def setup(self, args: Namespace) -> None: logger.error("Dumpcap could not be started!") else: await self.dumpcap.sync() - self.transport = await load_transport(args.target).connect(args.target) async def teardown(self, args: Namespace) -> None: await self.transport.close() - if self.dumpcap: await self.dumpcap.stop() def configure_class_parser(self) -> None: super().configure_class_parser() - group = self.parser.add_argument_group("scanner related arguments") group.add_argument( "--dumpcap", @@ -573,7 +520,6 @@ def configure_class_parser(self) -> None: default=self.config.get_value("gallia.scanner.dumpcap", default=True), help="Enable/Disable creating a pcap file", ) - group = self.parser.add_argument_group("transport mode related arguments") group.add_argument( "--target", @@ -582,7 +528,6 @@ def configure_class_parser(self) -> None: type=TargetURI, help="URI that describes the target", ) - group = self.parser.add_argument_group("power supply related arguments") group.add_argument( "--power-supply", @@ -595,10 +540,7 @@ def configure_class_parser(self) -> None: "--power-cycle", action=argparse.BooleanOptionalAction, default=self.config.get_value("gallia.scanner.power_cycle", False), - help=( - "use the configured power supply to power-cycle the ECU when needed " - "(e.g. before starting the scan, or to recover bad state during scanning)" - ), + help="use the configured power supply to power-cycle the ECU when needed (e.g. before starting the scan, or to recover bad state during scanning)", ) group.add_argument( "--power-cycle-sleep", diff --git a/src/gallia/command/uds.py b/src/gallia/command/uds.py index dfe9d4e1..c3a0815f 100644 --- a/src/gallia/command/uds.py +++ b/src/gallia/command/uds.py @@ -1,7 +1,6 @@ # SPDX-FileCopyrightText: AISEC Pentesting Team # # SPDX-License-Identifier: Apache-2.0 - import json from argparse import ArgumentParser, BooleanOptionalAction, Namespace @@ -36,13 +35,11 @@ def __init__(self, parser: ArgumentParser, config: Config = Config()) -> None: def configure_class_parser(self) -> None: super().configure_class_parser() - group = self.parser.add_argument_group("UDS scanner related arguments") - choices = ["default"] + [x.OEM for x in load_ecu_plugins()] group.add_argument( "--ecu-reset", - const=0x01, + const=1, nargs="?", default=self.config.get_value("gallia.protocols.uds.ecu_reset"), help="Trigger an initial ecu_reset via UDS; reset level is optional", @@ -107,7 +104,6 @@ def implicit_logging(self) -> bool: @implicit_logging.setter def implicit_logging(self, value: bool) -> None: self._implicit_logging = value - if self.db_handler is not None: self._apply_implicit_logging_setting() @@ -116,16 +112,13 @@ def _apply_implicit_logging_setting(self) -> None: async def setup(self, args: Namespace) -> None: await super().setup(args) - self.ecu = load_ecu(args.oem)( self.transport, timeout=args.timeout, max_retry=args.max_retries, power_supply=self.power_supply, ) - self.ecu.db_handler = self.db_handler - if self.db_handler is not None: try: # No idea, but str(args.target) fails with a strange traceback. @@ -134,33 +127,27 @@ async def setup(self, args: Namespace) -> None: self._apply_implicit_logging_setting() except Exception as e: logger.warning(f"Could not write the scan run to the database: {e:!r}") - if args.ecu_reset is not None: resp: UDSResponse = await self.ecu.ecu_reset(args.ecu_reset) if isinstance(resp, NegativeResponse): logger.warning(f"ECUReset failed: {resp}") logger.warning("Switching to default session") - raise_for_error(await self.ecu.set_session(0x01)) + raise_for_error(await self.ecu.set_session(1)) resp = await self.ecu.ecu_reset(args.ecu_reset) if isinstance(resp, NegativeResponse): logger.warning(f"ECUReset in session 0x01 failed: {resp}") - # Handles connecting to the target and waits # until it is ready. if args.ping: await self.ecu.wait_for_ecu() - await self.ecu.connect() - if args.tester_present: await self.ecu.start_cyclic_tester_present(args.tester_present_interval) - if args.properties is True: path = self.artifacts_dir.joinpath(FileNames.PROPERTIES_PRE.value) async with aiofiles.open(path, "w") as file: await file.write(json.dumps(await self.ecu.properties(True), indent=4)) await file.write("\n") - if self.db_handler is not None: try: await self.db_handler.insert_scan_run_properties_pre(await self.ecu.properties()) @@ -169,28 +156,23 @@ async def setup(self, args: Namespace) -> None: logger.warning(f"Could not write the properties_pre to the database: {e!r}") async def teardown(self, args: Namespace) -> None: - if args.properties is True and not self.ecu.transport.is_closed: + if args.properties is True and (not self.ecu.transport.is_closed): path = self.artifacts_dir.joinpath(FileNames.PROPERTIES_POST.value) async with aiofiles.open(path, "w") as file: await file.write(json.dumps(await self.ecu.properties(True), indent=4)) await file.write("\n") - path_pre = self.artifacts_dir.joinpath(FileNames.PROPERTIES_PRE.value) async with aiofiles.open(path_pre, "r") as file: prop_pre = json.loads(await file.read()) - if args.compare_properties and await self.ecu.properties(False) != prop_pre: logger.warning("ecu properties differ, please investigate!") - if self.db_handler is not None: try: await self.db_handler.complete_scan_run(await self.ecu.properties(False)) except Exception as e: logger.warning(f"Could not write the scan run to the database: {e!r}") - if args.tester_present: await self.ecu.stop_cyclic_tester_present() - # This must be the last one. await super().teardown(args) @@ -200,7 +182,6 @@ class UDSDiscoveryScanner(Scanner): def configure_class_parser(self) -> None: super().configure_class_parser() - self.parser.add_argument( "--timeout", type=float, @@ -210,7 +191,6 @@ def configure_class_parser(self) -> None: async def setup(self, args: Namespace) -> None: await super().setup(args) - if self.db_handler is not None: try: await self.db_handler.insert_discovery_run(args.target.url.scheme) diff --git a/src/gallia/commands/primitive/uds/dtc.py b/src/gallia/commands/primitive/uds/dtc.py index a510f5a9..fbf85245 100644 --- a/src/gallia/commands/primitive/uds/dtc.py +++ b/src/gallia/commands/primitive/uds/dtc.py @@ -1,7 +1,6 @@ # SPDX-FileCopyrightText: AISEC Pentesting Team # # SPDX-License-Identifier: Apache-2.0 - import sys from argparse import Namespace from functools import partial @@ -31,7 +30,6 @@ class DTCPrimitive(UDSScanner): def configure_parser(self) -> None: self.parser.set_defaults(properties=False) - self.parser.add_argument( "--session", default=DiagnosticSessionControlSubFuncs.defaultSession.value, @@ -45,9 +43,8 @@ def configure_parser(self) -> None: read_parser.add_argument( "--mask", type=partial(int, base=16), - default=0xFF, - help="The bitmask which is sent to the ECU in order to select the relevant DTCs according to their " - "error state. By default, all error codes are returned (c.f. ISO 14229-1,D.2).", + default=255, + help="The bitmask which is sent to the ECU in order to select the relevant DTCs according to their error state. By default, all error codes are returned (c.f. ISO 14229-1,D.2).", ) read_parser.add_argument( "--show-legend", @@ -70,13 +67,12 @@ def configure_parser(self) -> None: clear_parser.add_argument( "--group-of-dtc", type=int, - default=0xFFFFFF, - help="Only clear a particular DTC or the DTCs belonging to the given group. " - "By default, all error codes are cleared.", + default=16777215, + help="Only clear a particular DTC or the DTCs belonging to the given group. By default, all error codes are cleared.", ) control_parser = sub_parser.add_parser( "control", - help="Stop or resume the setting of DTCs using the " "ControlDTCSetting service", + help="Stop or resume the setting of DTCs using the ControlDTCSetting service", ) control_group = control_parser.add_mutually_exclusive_group(required=True) control_group.add_argument( @@ -93,19 +89,15 @@ def configure_parser(self) -> None: async def fetch_error_codes(self, mask: int, split: bool = True) -> dict[int, int]: ecu_response = await self.ecu.read_dtc_information_report_dtc_by_status_mask(mask) dtcs = {} - if isinstance(ecu_response, NegativeResponse): if ecu_response.response_code == UDSErrorCodes.responseTooLong: logger.error( - f"There are too many codes for (sub)mask {mask}. Consider setting --mask " - f"with a parameter that excludes one or more of the corresponding bits." + f"There are too many codes for (sub)mask {mask}. Consider setting --mask with a parameter that excludes one or more of the corresponding bits." ) if split: logger.warning("Trying to fetch the error codes iteratively.") - for i in range(8): sub_mask = mask & 2**i - if sub_mask > 0: logger.info(f"Trying to fetch with mask {g_repr(sub_mask)}") dtcs.update(await self.fetch_error_codes(sub_mask, False)) @@ -114,42 +106,35 @@ async def fetch_error_codes(self, mask: int, split: bool = True) -> dict[int, in sys.exit(1) else: dtcs = ecu_response.dtc_and_status_record - return dtcs async def read(self, args: Namespace) -> None: dtcs = await self.fetch_error_codes(args.mask) - failed_dtcs: list[list[str]] = [] uncompleted_dtcs: list[list[str]] = [] - for dtc, error_state in dtcs.items(): raw_output = f"{dtc:06X} {error_state:02X}" - bit_string = bin(error_state + 0x100)[ + bit_string = bin(error_state + 256)[ 3: ] # Transform error_state into a bit-string with leading zeros table_output = [f"{dtc:06X}", f"{error_state:02X}"] + [ "X" if b == "1" else "" for b in bit_string ] - # if any kind of test failure - if error_state & 0xAF: + if error_state & 175: logger.warning(raw_output) failed_dtcs.append(table_output) # if not failed but also not completed yet (i.e. not yet in this cycle or since last clear) - elif error_state & 0x50: + elif error_state & 80: logger.result(raw_output) uncompleted_dtcs.append(table_output) - if args.show_legend: logger.result("") self.show_bit_legend() - if args.show_failed: logger.result("") logger.result("Failed codes:") self.show_summary(failed_dtcs) - if args.show_uncompleted: logger.result("") logger.result("Uncompleted codes:") @@ -166,44 +151,26 @@ def show_bit_legend(self) -> None: "6 = testNotCompletedThisOperationCycle: not completed in current cycle", "7 = warningIndicatorRequested: existing warning indicators (e.g. lamp, display)", ] - - for line in ( - tabulate([[d] for d in bit_descriptions], headers=["bit descriptions"]) + for line in tabulate( + [[d] for d in bit_descriptions], headers=["bit descriptions"] ).splitlines(): logger.result(line) def show_summary(self, dtcs: list[list[str]]) -> None: dtcs.sort() - - header = [ - "DTC", - "error state", - "0", - "1", - "2", - "3", - "4", - "5", - "6", - "7", - ] - + header = ["DTC", "error state", "0", "1", "2", "3", "4", "5", "6", "7"] for line in tabulate(dtcs, headers=header, tablefmt="fancy_grid").splitlines(): logger.result(line) async def clear(self, args: Namespace) -> None: group_of_dtc: int = args.group_of_dtc - min_group_of_dtc = 0 - max_group_of_dtc = 0xFFFFFF - + max_group_of_dtc = 16777215 if not min_group_of_dtc <= group_of_dtc <= max_group_of_dtc: logger.error( f"The parameter group_of_dtc must be in the range {g_repr(min_group_of_dtc)}-{g_repr(max_group_of_dtc)}" ) - resp = await self.ecu.clear_diagnostic_information(group_of_dtc) - if isinstance(resp, NegativeResponse): logger.error(resp) else: @@ -217,7 +184,6 @@ async def control(self, args: Namespace) -> None: async def main(self, args: Namespace) -> None: await self.ecu.set_session(args.session) - if args.cmd == "clear": await self.clear(args) elif args.cmd == "control":