diff --git a/loki/args.py b/loki/args.py index d2de9507..7808d52b 100644 --- a/loki/args.py +++ b/loki/args.py @@ -1,15 +1,16 @@ # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. -from argparse import ArgumentParser +from argparse import ArgumentParser, Namespace from pathlib import Path +from typing import List, Optional from loki import Loki __author__ = "Tyson Smith" -def parse_args(argv=None): +def parse_args(argv: Optional[List[str]] = None) -> Namespace: parser = ArgumentParser(description="Loki fuzzing library") parser.add_argument( "input", type=Path, help="Output will be generated based on this file" @@ -24,7 +25,7 @@ def parse_args(argv=None): parser.add_argument( "-b", "--byte-order", - choices=Loki.BYTE_ORDERS, + choices=sorted(Loki.BYTE_ORDERS), default=None, help="Byte order to use when mutating multiple bytes at once. " "Use '>' for big-endian or '<' for little-endian (default: random)", diff --git a/loki/loki.py b/loki/loki.py index 537f3de1..c5c475b6 100644 --- a/loki/loki.py +++ b/loki/loki.py @@ -4,6 +4,8 @@ """ Loki fuzzing library """ + +from argparse import Namespace from logging import ERROR, INFO, basicConfig, getLogger from os import SEEK_END from pathlib import Path @@ -12,6 +14,7 @@ from struct import pack, unpack from tempfile import SpooledTemporaryFile, mkdtemp from time import perf_counter, strftime +from typing import IO, Optional __author__ = "Tyson Smith" @@ -19,16 +22,37 @@ class Loki: - BYTE_ORDERS = ("<", ">", "@", "!", "=") + BYTE_ORDERS = {"<", ">", "@", "!", "="} __slots__ = ("aggr", "byte_order") - def __init__(self, aggression=0.0, byte_order=None): + def __init__( + self, aggression: Optional[float] = 0.0, byte_order: Optional[str] = None + ) -> None: + """ + Arguments: + aggression: Amount of fuzzing to perform. 0 for 0% up to 1.0 for 100%. + byte_order: Used to indicate big or little endian when mutating multiple + bytes at once. + """ + assert aggression is not None + assert aggression >= 0 + assert aggression <= 1 + assert byte_order is None or byte_order in self.BYTE_ORDERS self.aggr = min(max(aggression, 0.0), 1.0) self.byte_order = byte_order @staticmethod - def _fuzz_data(in_data, byte_order): + def _fuzz_data(in_data: bytes, byte_order: str) -> bytes: + """Fuzz data. + + Args: + in_data: Data to fuzz. + byte_order: Byte order to use. + + Returns: + Fuzzed data. + """ data_size = len(in_data) if data_size == 1: pack_unit = "B" @@ -40,7 +64,7 @@ def _fuzz_data(in_data, byte_order): pack_unit = f"{byte_order}I" mask = 0xFFFFFFFF else: - raise AssertionError(f"Unsupported data size: {data_size}") + raise RuntimeError(f"Unsupported data size: {data_size}") fuzz_op = randint(0, 4) if fuzz_op == 0: # boundary @@ -61,7 +85,15 @@ def _fuzz_data(in_data, byte_order): return pack(pack_unit, out_data & mask) - def _fuzz(self, tgt_fp): + def _fuzz(self, tgt_fp: IO[bytes]) -> None: + """Fuzz file data. + + Args: + tgt_fp: Open file object. + + Returns: + None + """ tgt_fp.seek(0, SEEK_END) length = tgt_fp.tell() if length < 1: @@ -74,10 +106,11 @@ def _fuzz(self, tgt_fp): "%d of a possible %d mutations will be performed", mutations, max_mutations ) if self.byte_order is not None: - assert self.byte_order in ("<", ">", "@", "!", "=") + assert self.byte_order in self.BYTE_ORDERS byte_order = self.byte_order else: LOG.debug("using random byte order") + byte_order = "<" if getrandbits(1) else ">" for count, idx in enumerate(sample(range(length), k=mutations)): # every few iterations randomly attempt multi-byte mutations if count % 10 == 0 and getrandbits(1): @@ -92,15 +125,21 @@ def _fuzz(self, tgt_fp): else: # target single byte size = 1 - if self.byte_order is None: - byte_order = "<" if getrandbits(1) else ">" # perform mutation tgt_fp.seek(idx) out_data = self._fuzz_data(tgt_fp.read(size), byte_order) tgt_fp.seek(idx) tgt_fp.write(out_data) - def fuzz_data(self, data): + def fuzz_data(self, data: bytes) -> bytes: + """Create a fuzzed copy of the provided data. + + Args: + data: Data to be fuzzed. + + Returns: + Fuzzed data. + """ assert isinstance(data, bytes) # open a temp file in memory for fuzzing with SpooledTemporaryFile(max_size=0x800000, mode="r+b") as tmp_fp: @@ -109,25 +148,38 @@ def fuzz_data(self, data): tmp_fp.seek(0) return tmp_fp.read() - def fuzz_file(self, in_file, count, out_dir, ext=None): + def fuzz_file( + self, src: Path, count: int, dst: Path, ext: Optional[str] = None + ) -> bool: + """Create fuzzed copies of the provided file. + + Args: + src: Template file containing data to be fuzzed. + count: Number of fuzzed copies to create. + dst: Directory to store output. + ext: File extension to use. + + Returns: + True if successful otherwise False. + """ try: - if in_file.stat().st_size < 1: + if src.stat().st_size < 1: LOG.error("Input must be at least 1 byte long") return False except FileNotFoundError: - LOG.error("'%s' does not exist!", in_file) + LOG.error("'%s' does not exist!", src) return False if ext is None: - ext = in_file.suffix + ext = src.suffix for i in range(count): - out_file = out_dir / f"{i:06d}_fuzzed{ext}" - copy(in_file, out_file) + out_file = dst / f"{i:06d}_fuzzed{ext}" + copy(src, out_file) with out_file.open("r+b") as out_fp: self._fuzz(out_fp) return True @classmethod - def main(cls, args): + def main(cls, args: Namespace) -> int: basicConfig(format="", level=INFO if not args.quiet else ERROR) LOG.info("Starting Loki @ %s", strftime("%Y-%m-%d %H:%M:%S")) LOG.info("Target template is '%s'", args.input.resolve()) diff --git a/loki/test_loki.py b/loki/test_loki.py index 9e957a7b..7fb2e1c0 100644 --- a/loki/test_loki.py +++ b/loki/test_loki.py @@ -128,7 +128,7 @@ def test_loki_fuzz_02(mocker): fake_randint.side_effect = (4, 1) Loki._fuzz_data(b"1234", ">") # invalid data size - with raises(AssertionError, match=r"Unsupported data size:"): + with raises(RuntimeError, match=r"Unsupported data size:"): Loki._fuzz_data(b"", ">")