Skip to content

Commit

Permalink
[loki] Add type hints
Browse files Browse the repository at this point in the history
- Add docs
- Minor nits
  • Loading branch information
tysmith committed Mar 25, 2024
1 parent ed957d1 commit 301510d
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 20 deletions.
7 changes: 4 additions & 3 deletions loki/args.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from argparse import ArgumentParser
from argparse import ArgumentParser, Namespace
from pathlib import Path
from typing import List, Optional

from loki import Loki

__author__ = "Tyson Smith"


def parse_args(argv=None):
def parse_args(argv: Optional[List[str]] = None) -> Namespace:
parser = ArgumentParser(description="Loki fuzzing library")
parser.add_argument(
"input", type=Path, help="Output will be generated based on this file"
Expand All @@ -24,7 +25,7 @@ def parse_args(argv=None):
parser.add_argument(
"-b",
"--byte-order",
choices=Loki.BYTE_ORDERS,
choices=sorted(Loki.BYTE_ORDERS),
default=None,
help="Byte order to use when mutating multiple bytes at once. "
"Use '>' for big-endian or '<' for little-endian (default: random)",
Expand Down
84 changes: 68 additions & 16 deletions loki/loki.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
"""
Loki fuzzing library
"""

from argparse import Namespace
from logging import ERROR, INFO, basicConfig, getLogger
from os import SEEK_END
from pathlib import Path
Expand All @@ -12,23 +14,45 @@
from struct import pack, unpack
from tempfile import SpooledTemporaryFile, mkdtemp
from time import perf_counter, strftime
from typing import IO, Optional

__author__ = "Tyson Smith"

LOG = getLogger(__name__)


class Loki:
BYTE_ORDERS = ("<", ">", "@", "!", "=")
BYTE_ORDERS = {"<", ">", "@", "!", "="}

__slots__ = ("aggr", "byte_order")

def __init__(self, aggression=0.0, byte_order=None):
def __init__(
self, aggression: Optional[float] = 0.0, byte_order: Optional[str] = None
) -> None:
"""
Arguments:
aggression: Amount of fuzzing to perform. 0 for 0% up to 1.0 for 100%.
byte_order: Used to indicate big or little endian when mutating multiple
bytes at once.
"""
assert aggression is not None
assert aggression >= 0
assert aggression <= 1
assert byte_order is None or byte_order in self.BYTE_ORDERS
self.aggr = min(max(aggression, 0.0), 1.0)
self.byte_order = byte_order

@staticmethod
def _fuzz_data(in_data, byte_order):
def _fuzz_data(in_data: bytes, byte_order: str) -> bytes:
"""Fuzz data.
Args:
in_data: Data to fuzz.
byte_order: Byte order to use.
Returns:
Fuzzed data.
"""
data_size = len(in_data)
if data_size == 1:
pack_unit = "B"
Expand All @@ -40,7 +64,7 @@ def _fuzz_data(in_data, byte_order):
pack_unit = f"{byte_order}I"
mask = 0xFFFFFFFF
else:
raise AssertionError(f"Unsupported data size: {data_size}")
raise RuntimeError(f"Unsupported data size: {data_size}")

fuzz_op = randint(0, 4)
if fuzz_op == 0: # boundary
Expand All @@ -61,7 +85,15 @@ def _fuzz_data(in_data, byte_order):

return pack(pack_unit, out_data & mask)

def _fuzz(self, tgt_fp):
def _fuzz(self, tgt_fp: IO[bytes]) -> None:
"""Fuzz file data.
Args:
tgt_fp: Open file object.
Returns:
None
"""
tgt_fp.seek(0, SEEK_END)
length = tgt_fp.tell()
if length < 1:
Expand All @@ -74,10 +106,11 @@ def _fuzz(self, tgt_fp):
"%d of a possible %d mutations will be performed", mutations, max_mutations
)
if self.byte_order is not None:
assert self.byte_order in ("<", ">", "@", "!", "=")
assert self.byte_order in self.BYTE_ORDERS
byte_order = self.byte_order
else:
LOG.debug("using random byte order")
byte_order = "<" if getrandbits(1) else ">"
for count, idx in enumerate(sample(range(length), k=mutations)):
# every few iterations randomly attempt multi-byte mutations
if count % 10 == 0 and getrandbits(1):
Expand All @@ -92,15 +125,21 @@ def _fuzz(self, tgt_fp):
else:
# target single byte
size = 1
if self.byte_order is None:
byte_order = "<" if getrandbits(1) else ">"
# perform mutation
tgt_fp.seek(idx)
out_data = self._fuzz_data(tgt_fp.read(size), byte_order)
tgt_fp.seek(idx)
tgt_fp.write(out_data)

def fuzz_data(self, data):
def fuzz_data(self, data: bytes) -> bytes:
"""Create a fuzzed copy of the provided data.
Args:
data: Data to be fuzzed.
Returns:
Fuzzed data.
"""
assert isinstance(data, bytes)
# open a temp file in memory for fuzzing
with SpooledTemporaryFile(max_size=0x800000, mode="r+b") as tmp_fp:
Expand All @@ -109,25 +148,38 @@ def fuzz_data(self, data):
tmp_fp.seek(0)
return tmp_fp.read()

def fuzz_file(self, in_file, count, out_dir, ext=None):
def fuzz_file(
self, src: Path, count: int, dst: Path, ext: Optional[str] = None
) -> bool:
"""Create fuzzed copies of the provided file.
Args:
src: Template file containing data to be fuzzed.
count: Number of fuzzed copies to create.
dst: Directory to store output.
ext: File extension to use.
Returns:
True if successful otherwise False.
"""
try:
if in_file.stat().st_size < 1:
if src.stat().st_size < 1:
LOG.error("Input must be at least 1 byte long")
return False
except FileNotFoundError:
LOG.error("'%s' does not exist!", in_file)
LOG.error("'%s' does not exist!", src)
return False
if ext is None:
ext = in_file.suffix
ext = src.suffix
for i in range(count):
out_file = out_dir / f"{i:06d}_fuzzed{ext}"
copy(in_file, out_file)
out_file = dst / f"{i:06d}_fuzzed{ext}"
copy(src, out_file)
with out_file.open("r+b") as out_fp:
self._fuzz(out_fp)
return True

@classmethod
def main(cls, args):
def main(cls, args: Namespace) -> int:
basicConfig(format="", level=INFO if not args.quiet else ERROR)
LOG.info("Starting Loki @ %s", strftime("%Y-%m-%d %H:%M:%S"))
LOG.info("Target template is '%s'", args.input.resolve())
Expand Down
2 changes: 1 addition & 1 deletion loki/test_loki.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def test_loki_fuzz_02(mocker):
fake_randint.side_effect = (4, 1)
Loki._fuzz_data(b"1234", ">")
# invalid data size
with raises(AssertionError, match=r"Unsupported data size:"):
with raises(RuntimeError, match=r"Unsupported data size:"):
Loki._fuzz_data(b"", ">")


Expand Down

0 comments on commit 301510d

Please sign in to comment.