Skip to content

Commit

Permalink
Update TestCase.load() to load in place
Browse files Browse the repository at this point in the history
- Remove TestCase.purge_optional() since it now automatic.
- TestCase.load() now only loads a single test case.
- Break up test case loading in to smaller more testable parts.
  • Loading branch information
tysmith committed Nov 24, 2023
1 parent 3e00f32 commit 3b7737e
Show file tree
Hide file tree
Showing 28 changed files with 1,275 additions and 1,417 deletions.
6 changes: 2 additions & 4 deletions grizzly/adapter/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ class AdapterError(Exception):


class Adapter(metaclass=ABCMeta):
"""An Adapter is an interface between Grizzly and a fuzzer. A subclass must
"""An Adapter is the interface between Grizzly and a fuzzer. A subclass must
be created in order to add support for additional fuzzers. The Adapter is
responsible for handling input/output data and executing the fuzzer.
It is expected that any processes launched or file created on file system
in the adapter will also be cleaned up in the adapter.
by the adapter will also be cleaned up by the adapter.
NOTE: Some methods must not be overloaded doing so will prevent Grizzly from
operating correctly.
Expand All @@ -34,8 +34,6 @@ class Adapter(metaclass=ABCMeta):
remaining to process.
"""

# Only report test cases with served content.
IGNORE_UNSERVED = True
# Maximum iterations between Target relaunches (<1 use default)
RELAUNCH = 0
# Maximum execution time per test (used as minimum timeout). The iteration is
Expand Down
10 changes: 5 additions & 5 deletions grizzly/adapter/no_op_adapter/test_no_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ def test_no_op_01():
"""test a simple Adapter"""
adapter = NoOpAdapter("no-op")
adapter.setup(None, None)
test = TestCase("a", adapter.name)
assert not test.data_size
assert "a" not in test.contents
adapter.generate(test, None)
assert "a" in test.contents
with TestCase("a", adapter.name) as test:
assert not test.data_size
assert "a" not in test.contents
adapter.generate(test, None)
assert "a" in test.contents
10 changes: 5 additions & 5 deletions grizzly/adapter/test_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,24 +54,24 @@ def test_adapter_03(tmp_path):
# empty path
assert not any(SimpleAdapter.scan_path(str(tmp_path)))
# missing path
assert not any(SimpleAdapter.scan_path(str(tmp_path / "none")))
assert not any(SimpleAdapter.scan_path(tmp_path / "none"))
# path to file
file1 = tmp_path / "test1.txt"
file1.touch()
found = tuple(SimpleAdapter.scan_path(str(file1)))
found = tuple(SimpleAdapter.scan_path(file1))
assert str(file1) in found
assert len(found) == 1
# path to directory
assert len(tuple(SimpleAdapter.scan_path(str(tmp_path)))) == 1
assert len(tuple(SimpleAdapter.scan_path(tmp_path))) == 1
# path to directory (w/ ignored)
(tmp_path / ".ignored").touch()
nested = tmp_path / "nested"
nested.mkdir()
file2 = nested / "test2.bin"
file2.touch()
assert len(tuple(SimpleAdapter.scan_path(str(tmp_path)))) == 1
assert len(tuple(SimpleAdapter.scan_path(tmp_path))) == 1
# path to directory (recursive)
found = tuple(SimpleAdapter.scan_path(str(tmp_path), recursive=True))
found = tuple(SimpleAdapter.scan_path(tmp_path, recursive=True))
assert str(file1) in found
assert str(file2) in found
assert len(found) == 2
112 changes: 78 additions & 34 deletions grizzly/common/fuzzmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
import json
from contextlib import contextmanager
from logging import getLogger
from os import unlink
from pathlib import Path
from shutil import rmtree
from tempfile import mkdtemp, mkstemp
from tempfile import NamedTemporaryFile, mkdtemp
from zipfile import ZipFile

from Collector.Collector import Collector
from FTB.ProgramConfiguration import ProgramConfiguration
Expand Down Expand Up @@ -197,6 +197,16 @@ class CrashEntry:

RAW_FIELDS = frozenset({"rawCrashData", "rawStderr", "rawStdout"})

__slots__ = (
"_crash_id",
"_coll",
"_contents",
"_data",
"_storage",
"_sig_filename",
"_url",
)

def __init__(self, crash_id):
"""Initialize CrashEntry.
Expand All @@ -206,13 +216,14 @@ def __init__(self, crash_id):
assert isinstance(crash_id, int)
self._crash_id = crash_id
self._coll = Collector()
self._contents = None
self._data = None
self._storage = None
self._sig_filename = None
self._url = (
f"{self._coll.serverProtocol}://{self._coll.serverHost}:"
f"{self._coll.serverPort}/crashmanager/rest/crashes/{crash_id}/"
)
self._data = None
self._tc_filename = None
self._sig_filename = None

@property
def crash_id(self):
Expand Down Expand Up @@ -257,46 +268,79 @@ def cleanup(self):
Returns:
None
"""
if self._tc_filename is not None:
self._tc_filename.unlink()
if self._storage is not None:
rmtree(self._storage, ignore_errors=True)
if self._sig_filename is not None:
rmtree(self._sig_filename.parent)

def testcase_path(self):
"""Download the testcase data from CrashManager.
@staticmethod
def _subset(tests, subset):
"""Select a subset of tests directories. Subset values are sanitized to
avoid raising.
Arguments:
None
tests list(Path): Directories on disk where testcases exists.
subset list(int): Indices of corresponding directories to select.
Returns:
Path: Path on disk where testcase exists_
list(Path): Directories that have been selected.
"""
if self._tc_filename is not None:
return self._tc_filename
assert isinstance(subset, list)
assert tests
count = len(tests)
# deduplicate and limit requested indices to valid range
keep = {max(count + x, 0) if x < 0 else min(x, count - 1) for x in subset}
LOG.debug("using TestCase(s) with index %r", keep)
# build list of items to preserve
return [tests[i] for i in sorted(keep)]

def testcases(self, subset=None):
"""Download the testcase data from CrashManager.
Arguments:
subset list(int): Indices of corresponding directories to select.
dlurl = self._url + "download/"
response = self._coll.get(dlurl)
Returns:
list(Path): Directories on disk where testcases exists.
"""
if self._contents is None:
assert self._storage is None
response = self._coll.get(f"{self._url}download/")

if "content-disposition" not in response.headers:
raise RuntimeError(
f"Server sent malformed response: {response!r}"
) # pragma: no cover
if "content-disposition" not in response.headers:
raise RuntimeError(
f"Server sent malformed response: {response!r}"
) # pragma: no cover

with NamedTemporaryFile(
dir=grz_tmp("fuzzmanager"),
prefix=f"crash-{self.crash_id}-",
suffix=Path(self.testcase).suffix,
) as archive:
archive.write(response.content)
archive.seek(0)
# self._storage should be removed when self.cleanup() is called
self._storage = Path(
mkdtemp(
prefix=f"crash-{self.crash_id}-", dir=grz_tmp("fuzzmanager")
)
)
with ZipFile(archive) as zip_fp:
zip_fp.extractall(path=self._storage)
# test case directories are named sequentially, a zip with three test
# directories would have:
# - 'foo-2' (oldest)
# - 'foo-1'
# - 'foo-0' (most recent)
# see FuzzManagerReporter for more info
self._contents = sorted(
(x.parent for x in self._storage.rglob("test_info.json")),
reverse=True,
)

handle, filename = mkstemp(
dir=grz_tmp("fuzzmanager"),
prefix=f"crash-{self.crash_id}-",
suffix=Path(self.testcase).suffix,
)
try:
with open(handle, "wb") as output:
output.write(response.content)
result = Path(filename)
filename = None
finally: # pragma: no cover
if filename:
unlink(filename)
self._tc_filename = result
return self._tc_filename
if subset and self._contents:
return self._subset(self._contents, subset)
return self._contents

def create_signature(self, binary):
"""Create a CrashManager signature from this crash.
Expand Down
17 changes: 10 additions & 7 deletions grizzly/common/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,9 @@ def run(
# overwrite instead of replace 'grz_next_test' for consistency
server_map.set_redirect("grz_next_test", "grz_empty", required=True)
server_map.set_dynamic_response("grz_empty", lambda _: b"", required=True)
# clear optional contents from test case
# it will be repopulated with served contests
testcase.clear_optional()
# serve the test case
serve_start = time()
server_status, served = self._server.serve_path(
Expand All @@ -293,13 +296,13 @@ def run(
attempted=testcase.entry_point in served,
timeout=server_status == Served.TIMEOUT,
)
# add all include files that were served to test case
if server_map.include:
existing = set(testcase.contents)
for url, local_file in served.items():
if url in existing:
continue
testcase.add_from_file(local_file, file_name=url, copy=True)
# add all files that were served (includes, etc...) to test
existing = set(testcase.required)
for url, local_file in served.items():
if url in existing:
continue
# use copy here so include files are copied
testcase.add_from_file(local_file, file_name=url, copy=True)
# record use of https in testcase
testcase.https = self._server.scheme == "https"
if result.timeout:
Expand Down
Loading

0 comments on commit 3b7737e

Please sign in to comment.