Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify corpus management code #38

Merged
merged 5 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ maintainers = [
dynamic = ["readme", "version"]
dependencies = [
"FuzzManager>=0.6.0",
"boto",
"boto3",
"fasteners",
"google-cloud-storage",
"psutil",
]

Expand Down
122 changes: 81 additions & 41 deletions src/guided_fuzzing_daemon/afl.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
from __future__ import annotations

import os
import sys
from argparse import Namespace
from logging import getLogger
from pathlib import Path
from random import choice
from shutil import copy, rmtree
from subprocess import STDOUT, Popen, TimeoutExpired
from subprocess import DEVNULL, STDOUT, Popen, TimeoutExpired
from tempfile import mkdtemp
from time import sleep, time

Expand All @@ -18,8 +18,8 @@
from FTB.Running.AutoRunner import AutoRunner
from FTB.Signatures.CrashInfo import CrashInfo

from .s3 import S3Manager
from .stats import (
STATS_UPLOAD_PERIOD,
GeneratedField,
MaxTimeField,
MeanField,
Expand All @@ -29,10 +29,17 @@
SumMinMaxField,
ValueCounterField,
)
from .storage import (
QUEUE_UPLOAD_PERIOD,
CloudStorageProvider,
Corpus,
CorpusRefreshContext,
CorpusSyncer,
)
from .utils import LogTee, create_envs, warn_local

LOG = getLogger("gfd.afl")
POWER_SCHEDS = ("explore", "coe", "lin", "quad", "exploit", "rare")
QUEUE_UPLOAD_PERIOD = 7200


class AFLStats(StatAggregator):
Expand Down Expand Up @@ -86,7 +93,7 @@ def convert_num(num: str) -> float | int:
try:
(field_name, field_val) = line.split(":", 1)
except ValueError:
print(f"error parsing status line: {line!r}", file=sys.stderr)
LOG.error("error parsing status line: %r", line)
continue
field_name = field_name.strip()
field_val = field_val.strip()
Expand All @@ -101,9 +108,8 @@ def convert_num(num: str) -> float | int:
self.fields[field_name].update(convert_num(field_val))
except ValueError as exc:
# ignore errors
print(
f"error reading {field_name} from {stats_path}: {exc}",
file=sys.stderr,
LOG.error(
"error reading %s from %s: %s", field_name, stats_path, exc
)
continue

Expand All @@ -118,7 +124,7 @@ def convert_num(num: str) -> float | int:


def afl_main(
opts: Namespace, collector: Collector | None, s3m: S3Manager | None
opts: Namespace, collector: Collector | None, storage: CloudStorageProvider
) -> int:
assert opts.aflbindir.is_dir()
assert opts.rargs, "--afl expects at least one positional arg (target binary)"
Expand All @@ -129,7 +135,55 @@ def afl_main(

binary = Path(opts.rargs[0]).resolve()
assert binary.is_file()

if opts.corpus_refresh:
# Run afl-cmin
afl_cmin = Path(opts.aflbindir) / "afl-cmin"
if not afl_cmin.exists():
LOG.error("error: Unable to locate afl-cmin binary.")
return 2

with CorpusRefreshContext(opts, storage) as merger:
tysmith marked this conversation as resolved.
Show resolved Hide resolved

env = os.environ.copy()
env["LD_LIBRARY_PATH"] = f"{binary.parent / 'gtest'}:{binary.parent}"

afl_cmdline = [
str(afl_cmin),
"-e",
"-i",
str(merger.queues_dir),
"-o",
str(merger.updated_tests_dir),
"-t",
str(opts.afl_timeout),
"-m",
"none",
str(binary),
]

LOG.info("Running afl-cmin")
# pylint: disable=consider-using-with
proc: Popen[str] | None = Popen(
afl_cmdline, stdout=None if opts.debug else DEVNULL, text=True, env=env
)
last_stats_report = 0.0
assert proc is not None
while proc.poll() is None:
# Calculate stats
if opts.stats and last_stats_report < time() - STATS_UPLOAD_PERIOD:
merger.refresh_stats.write_file(opts.stats, [])
last_stats_report = time()
sleep(0.1)
assert not proc.wait()

assert merger.exit_code is not None
return merger.exit_code

opts.corpus_out.mkdir(parents=True, exist_ok=True)
corpus_syncer = CorpusSyncer(
storage, Corpus(opts.corpus_out / "0" / "queue"), opts.project
)

if opts.max_runtime == 0.0:
opts.max_runtime = float("inf")
Expand Down Expand Up @@ -160,6 +214,9 @@ def afl_main(
procs: list[Popen[str] | None] = [None] * opts.instances
log_tee = LogTee(opts.afl_hide_logs, opts.instances)

# Memorize the original corpus, so we can exclude it from uploading later
original_corpus = {item.name for item in opts.corpus_in.iterdir()}

afl_fuzz = opts.aflbindir / "afl-fuzz"
tmp_base = Path(mkdtemp(prefix="gfd-"))
try:
Expand All @@ -185,7 +242,7 @@ def afl_main(
# check and restart subprocesses
for idx, proc in enumerate(procs):
if proc and proc.poll() is not None:
print(f"afl-fuzz returned early: {proc.wait()}", file=sys.stderr)
LOG.warning("afl-fuzz returned early: %d", proc.wait())
procs[idx] = proc = None
stats.fields["instances"] -= 1 # type: ignore

Expand Down Expand Up @@ -269,18 +326,16 @@ def afl_main(
crash_info = CrashInfo.fromRawCrashData(
[], [], cfgs[crashing_instance]
)
print(
LOG.warning(
"Warning: Failed to reproduce the given crash, submitting "
"without crash information.",
file=sys.stderr,
)

(sigfile, metadata) = collector.search(crash_info)

if sigfile is not None:
print(
f"Crash matches signature {sigfile}, not submitting...",
file=sys.stderr,
LOG.warning(
"Crash matches signature %s, not submitting...", sigfile
)
else:
collector.generate(
Expand All @@ -297,30 +352,22 @@ def afl_main(
"afl-crash": crash_path.name,
},
)
print(
'Successfully submitted crash: "'
f"{result['shortSignature']}\" as {result['id']}",
file=sys.stderr,
LOG.info(
'Successfully submitted crash: "%s" as %s',
result["shortSignature"],
result["id"],
)
crash_path.rename(
crash_path.parent / f"{crash_path.name}.processed"
)

# Only upload new corpus files every 2 hours or after corpus reduction
if (
opts.s3_queue_upload
and last_queue_upload < time() - QUEUE_UPLOAD_PERIOD
):
assert s3m is not None
s3m.upload_afl_queue_dir(
opts.corpus_out / "0",
opts.corpus_out / "0" / "cmdline",
include_sync=True,
)
if opts.queue_upload and last_queue_upload < time() - QUEUE_UPLOAD_PERIOD:
corpus_syncer.upload_queue(original_corpus)
last_queue_upload = time()

# Calculate stats
if opts.stats and last_stats_report < time() - 30:
if opts.stats and last_stats_report < time() - STATS_UPLOAD_PERIOD:
stats.update_and_write(
opts.stats,
[path.parent for path in opts.corpus_out.glob("*/fuzzer_stats")],
Expand All @@ -342,26 +389,19 @@ def afl_main(
if proc and proc.poll() is not None:
procs[idx] = None
if any(procs):
print(f"need to kill {sum(1 for proc in procs if proc)}", file=sys.stderr)
LOG.info("need to kill %d", sum(1 for proc in procs if proc))
for proc in procs:
if proc:
proc.kill()
try:
proc.wait(timeout=1)
except TimeoutExpired:
print(
f"Process {proc.pid} did not exit after SIGKILL",
file=sys.stderr,
)
LOG.warning("Process %d did not exit after SIGKILL", proc.pid)

log_tee.close()

if s3m and opts.s3_queue_upload:
s3m.upload_afl_queue_dir(
opts.corpus_out / "0",
opts.corpus_out / "0" / "cmdline",
include_sync=True,
)
if opts.queue_upload:
corpus_syncer.upload_queue(original_corpus)

# final stats
if opts.stats:
Expand Down
Loading