Skip to content

Commit

Permalink
...
Browse files Browse the repository at this point in the history
  • Loading branch information
fkglr committed Feb 23, 2024
1 parent a564d3e commit 0ad3b88
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 135 deletions.
72 changes: 7 additions & 65 deletions src/gallia/command/base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# SPDX-FileCopyrightText: AISEC Pentesting Team
#
# SPDX-License-Identifier: Apache-2.0

import argparse
import asyncio
import fcntl
Expand Down Expand Up @@ -96,7 +95,6 @@ class BaseCommand(ABC):
SHORT_HELP: str | None = None
#: The string which is shown at the bottom of --help.
EPILOG: str | None = None

#: Enable a artifacts_dir. Setting this property to
#: True enables the creation of a logfile.
HAS_ARTIFACTS_DIR: bool = False
Expand All @@ -115,9 +113,7 @@ def __init__(self, parser: ArgumentParser, config: Config = Config()) -> None:
self.run_meta = RunMeta(
command=sys.argv,
command_meta=CommandMeta(
command=self.COMMAND,
group=self.GROUP,
subgroup=self.SUBGROUP,
command=self.COMMAND, group=self.GROUP, subgroup=self.SUBGROUP
),
start_time=datetime.now(tz).isoformat(),
exit_code=0,
Expand All @@ -134,28 +130,21 @@ def run(self, args: Namespace) -> int:
...

def run_hook(
self,
variant: HookVariant,
args: Namespace,
exit_code: int | None = None,
self, variant: HookVariant, args: Namespace, exit_code: int | None = None
) -> None:
script = args.pre_hook if variant == HookVariant.PRE else args.post_hook
if script is None or script == "":
return

hook_id = f"{variant.value}-hook"

argv = sys.argv[:]
argv[0] = Path(argv[0]).name
env = {
"GALLIA_ARTIFACTS_DIR": str(self.artifacts_dir),
"GALLIA_HOOK": variant.value,
"GALLIA_INVOCATION": " ".join(argv),
} | os.environ

if variant == HookVariant.POST:
env["GALLIA_META"] = self.run_meta.json()

if self.COMMAND is not None:
env["GALLIA_COMMAND"] = self.COMMAND
if self.GROUP is not None:
Expand All @@ -164,23 +153,16 @@ def run_hook(
env["GALLIA_GROUP"] = self.SUBGROUP
if exit_code is not None:
env["GALLIA_EXIT_CODE"] = str(exit_code)

try:
p = run(
script,
env=env,
text=True,
capture_output=True,
shell=True,
check=True,
script, env=env, text=True, capture_output=True, shell=True, check=True
)
stdout = p.stdout
stderr = p.stderr
except CalledProcessError as e:
logger.warning(f"{variant.value}-hook failed (exit code: {p.returncode})")
stdout = e.stdout
stderr = e.stderr

if stdout:
logger.info(p.stdout.strip(), extra={"tags": [hook_id, "stdout"]})
if stderr:
Expand Down Expand Up @@ -238,7 +220,6 @@ def configure_class_parser(self) -> None:
type=Path,
help="Path to sqlite3 database",
)

if self.HAS_ARTIFACTS_DIR:
mutex_group = group.add_mutually_exclusive_group()
mutex_group.add_argument(
Expand Down Expand Up @@ -266,7 +247,6 @@ async def _db_insert_run_meta(self, args: Namespace) -> None:
if args.db is not None:
self.db_handler = DBHandler(args.db)
await self.db_handler.connect()

await self.db_handler.insert_run_meta(
script=sys.argv[0].split()[-1],
arguments=sys.argv[1:],
Expand All @@ -287,7 +267,6 @@ async def _db_finish_run_meta(self) -> None:
)
except Exception as e:
logger.warning(f"Could not write the run meta to the database: {e!r}")

try:
await self.db_handler.disconnect()
except Exception as e:
Expand All @@ -301,25 +280,19 @@ def _dump_environment(self, path: Path) -> None:
def _add_latest_link(self, path: Path) -> None:
dirs = list(path.glob("run-*"))
dirs.sort(key=lambda x: x.name)

latest_dir = dirs[-1].relative_to(path)

symlink = path.joinpath("LATEST")
symlink.unlink(missing_ok=True)
symlink.symlink_to(latest_dir)

def prepare_artifactsdir(
self,
base_dir: Path | None = None,
force_path: Path | None = None,
self, base_dir: Path | None = None, force_path: Path | None = None
) -> Path:
if force_path is not None:
if force_path.is_dir():
return force_path

force_path.mkdir(parents=True)
return force_path

if base_dir is not None:
_command_dir = ""
if self.GROUP is not None:
Expand All @@ -328,28 +301,21 @@ def prepare_artifactsdir(
_command_dir += f"_{self.SUBGROUP}"
if self.COMMAND is not None:
_command_dir += f"_{self.COMMAND}"

# When self.GROUP is None, then
# _command_dir starts with "_"; remove it.
if _command_dir.startswith("_"):
_command_dir = _command_dir.removeprefix("_")

# If self.GROUP, self.SUBGROUP, and
# self.COMMAND are None, then fallback to self.id.
if _command_dir == "":
_command_dir = self.id

command_dir = base_dir.joinpath(_command_dir)

_run_dir = f"run-{datetime.now().strftime('%Y%m%d-%H%M%S.%f')}"
artifacts_dir = command_dir.joinpath(_run_dir).absolute()
artifacts_dir.mkdir(parents=True)

self._dump_environment(artifacts_dir.joinpath(FileNames.ENV.value))
self._add_latest_link(command_dir)

return artifacts_dir.absolute()

raise ValueError("base_dir or force_path must be different from None")

def _aquire_flock(self, path: Path) -> None:
Expand Down Expand Up @@ -377,11 +343,9 @@ def entry_point(self, args: Namespace) -> int:
except OSError as e:
logger.critical(f"Unable to lock {p}: {e}")
return exitcode.OSFILE

if self.HAS_ARTIFACTS_DIR:
self.artifacts_dir = self.prepare_artifactsdir(
args.artifacts_base,
args.artifacts_dir,
args.artifacts_base, args.artifacts_dir
)
self.log_file_handlers.append(
add_zst_log_handler(
Expand All @@ -390,12 +354,9 @@ def entry_point(self, args: Namespace) -> int:
file_log_level=get_file_log_level(args),
)
)

if args.hooks:
self.run_hook(HookVariant.PRE, args)

asyncio.run(self._db_insert_run_meta(args))

exit_code = 0
try:
exit_code = self.run(args)
Expand Down Expand Up @@ -423,21 +384,16 @@ def entry_point(self, args: Namespace) -> int:
finally:
self.run_meta.exit_code = exit_code
self.run_meta.end_time = datetime.now(tz).isoformat()

asyncio.run(self._db_finish_run_meta())

if self.HAS_ARTIFACTS_DIR:
self.artifacts_dir.joinpath(FileNames.META.value).write_text(
self.run_meta.json() + "\n"
)
logger.notice(f"Stored artifacts at {self.artifacts_dir}")

if args.hooks:
self.run_hook(HookVariant.POST, args, exit_code)

if self._lock_file_fd is not None:
self._release_flock()

return exit_code


Expand All @@ -464,7 +420,6 @@ def run(self, args: Namespace) -> int:
self.main(args)
finally:
self.teardown(args)

return exitcode.OK


Expand Down Expand Up @@ -516,10 +471,7 @@ class Scanner(AsyncScript, ABC):

GROUP = "scan"
HAS_ARTIFACTS_DIR = True
CATCHED_EXCEPTIONS: list[type[Exception]] = [
ConnectionError,
UDSException,
]
CATCHED_EXCEPTIONS: list[type[Exception]] = [ConnectionError, UDSException]

def __init__(self, parser: ArgumentParser, config: Config = Config()) -> None:
super().__init__(parser, config)
Expand All @@ -534,7 +486,6 @@ async def main(self, args: Namespace) -> None:
async def setup(self, args: Namespace) -> None:
if args.target is None:
self.parser.error("--target is required")

if args.power_supply is not None:
self.power_supply = await PowerSupply.connect(args.power_supply)
if args.power_cycle is True:
Expand All @@ -543,7 +494,6 @@ async def setup(self, args: Namespace) -> None:
)
elif args.power_cycle is True:
self.parser.error("--power-cycle needs --power-supply")

# Start dumpcap as the first subprocess; otherwise network
# traffic might be missing.
if args.dumpcap:
Expand All @@ -554,26 +504,22 @@ async def setup(self, args: Namespace) -> None:
logger.error("Dumpcap could not be started!")
else:
await self.dumpcap.sync()

self.transport = await load_transport(args.target).connect(args.target)

async def teardown(self, args: Namespace) -> None:
await self.transport.close()

if self.dumpcap:
await self.dumpcap.stop()

def configure_class_parser(self) -> None:
super().configure_class_parser()

group = self.parser.add_argument_group("scanner related arguments")
group.add_argument(
"--dumpcap",
action=argparse.BooleanOptionalAction,
default=self.config.get_value("gallia.scanner.dumpcap", default=True),
help="Enable/Disable creating a pcap file",
)

group = self.parser.add_argument_group("transport mode related arguments")
group.add_argument(
"--target",
Expand All @@ -582,7 +528,6 @@ def configure_class_parser(self) -> None:
type=TargetURI,
help="URI that describes the target",
)

group = self.parser.add_argument_group("power supply related arguments")
group.add_argument(
"--power-supply",
Expand All @@ -595,10 +540,7 @@ def configure_class_parser(self) -> None:
"--power-cycle",
action=argparse.BooleanOptionalAction,
default=self.config.get_value("gallia.scanner.power_cycle", False),
help=(
"use the configured power supply to power-cycle the ECU when needed "
"(e.g. before starting the scan, or to recover bad state during scanning)"
),
help="use the configured power supply to power-cycle the ECU when needed (e.g. before starting the scan, or to recover bad state during scanning)",
)
group.add_argument(
"--power-cycle-sleep",
Expand Down
Loading

0 comments on commit 0ad3b88

Please sign in to comment.