Skip to content

Commit

Permalink
Add type hints to reporter
Browse files Browse the repository at this point in the history
  • Loading branch information
tysmith committed May 8, 2024
1 parent 3e1a06c commit c05f69c
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 113 deletions.
137 changes: 73 additions & 64 deletions grizzly/common/report.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from collections import namedtuple
from hashlib import sha1
from logging import getLogger
from os import SEEK_END
Expand All @@ -12,6 +11,7 @@
from shutil import copyfileobj, move, rmtree
from tempfile import mkstemp
from time import strftime
from typing import List, NamedTuple, Optional, cast

# import FuzzManager utilities
from Collector.Collector import Collector
Expand All @@ -28,8 +28,12 @@

LOG = getLogger(__name__)


# NOTE: order matters, aux -> stderr -> stdout
LogMap = namedtuple("LogMap", "aux stderr stdout")
class LogMap(NamedTuple):
aux: Optional[Path]
stderr: Optional[Path]
stdout: Optional[Path]


class Report:
Expand All @@ -50,13 +54,17 @@ class Report:
"stack",
)

def __init__(self, log_path, target_binary, is_hang=False, size_limit=MAX_LOG_SIZE):
assert isinstance(log_path, Path)
assert isinstance(target_binary, Path)
self._crash_info = None
def __init__(
self,
log_path: Path,
target_binary: Path,
is_hang: bool = False,
size_limit: int = MAX_LOG_SIZE,
) -> None:
self._crash_info: Optional[CrashInfo] = None
self._logs = self._select_logs(log_path)
assert self._logs is not None
self._short_signature = None
self._short_signature: Optional[str] = None
self._signature = None
self._target_binary = target_binary
self.is_hang = is_hang
Expand All @@ -77,27 +85,27 @@ def __init__(self, log_path, target_binary, is_hang=False, size_limit=MAX_LOG_SI
# was detected to attempt to help local bucketing
stack.height_limit = self.HANG_STACK_HEIGHT if is_hang else 0
self.prefix = f"{stack.minor[:8]}_{strftime('%Y-%m-%d_%H-%M-%S')}"
self.stack = stack
self.stack: Optional[Stack] = stack
break
else:
self.prefix = f"{self.DEFAULT_MINOR}_{strftime('%Y-%m-%d_%H-%M-%S')}"
self.stack = None

@staticmethod
def calc_hash(signature):
def calc_hash(signature: Optional[CrashSignature]) -> str:
"""Create unique hash from a signature.
Args:
None
signature:
Returns:
str: Hash of the raw signature.
Hash of the raw signature.
"""
if signature is None:
return "NO_SIGNATURE"
return sha1(signature.rawSignature.encode("utf-8")).hexdigest()[:16]

def cleanup(self):
def cleanup(self) -> None:
"""Remove Report data from filesystem.
Args:
Expand All @@ -106,36 +114,36 @@ def cleanup(self):
Returns:
None
"""
if self.path and self.path.is_dir():
if self.path.is_dir():
rmtree(self.path)
self.path = None

@property
def crash_hash(self):
def crash_hash(self) -> str:
"""Create unique hash from a signature.
Args:
None
Returns:
str: Hash of the raw signature of the crash.
Hash of the raw signature of the crash.
"""
if self.is_hang:
# TODO: we cannot create a unique bucket hash for hangs atm
return "hang"
return self.calc_hash(self.crash_signature)

@property
def crash_info(self):
def crash_info(self) -> CrashInfo:
"""Create CrashInfo object from logs.
Args:
None
Returns:
CrashInfo: CrashInfo based on log data.
CrashInfo based on log data.
"""
if self._crash_info is None:
assert self._logs is not None
assert self.path is not None
# create ProgramConfiguration that can be reported to a FM server
if Path(f"{self._target_binary}.fuzzmanagerconf").is_file():
Expand All @@ -152,22 +160,22 @@ def crash_info(self):
)
# read the log files and create a CrashInfo object
self._crash_info = CrashInfo.fromRawCrashData(
self._load_log(self._logs.stdout),
self._load_log(self._logs.stderr),
self._load_log(self._logs.stdout) if self._logs.stdout else None,
self._load_log(self._logs.stderr) if self._logs.stderr else None,
fm_cfg,
auxCrashData=self._load_log(self._logs.aux),
auxCrashData=self._load_log(self._logs.aux) if self._logs.aux else None,
)
return self._crash_info

@property
def crash_signature(self):
def crash_signature(self) -> Optional[CrashSignature]:
"""Create CrashSignature object from CrashInfo.
Args:
None
Returns:
CrashSignature: CrashSignature based on log data.
CrashSignature based on log data.
"""
if self._signature is None:
collector = Collector()
Expand All @@ -187,16 +195,17 @@ def crash_signature(self):
return self._signature

@staticmethod
def crash_signature_max_frames(crash_info, suggested_frames=8):
def crash_signature_max_frames(
crash_info: CrashInfo, suggested_frames: int = 8
) -> int:
"""Determine how many stack frames should be used when creating a signature.
Args:
crash_info (CrashInfo): Data to analyse.
suggested_frames (int): Minimum number of frames to use when creating a
signature.
crash_info: Data to analyse.
suggested_frames: Minimum number of frames to use when creating a signature.
Returns:
int: Number of frames to use when creating a signature.
Number of frames to use when creating a signature.
"""
ignore = 0
for count, entry in enumerate(crash_info.backtrace, start=1):
Expand All @@ -212,14 +221,14 @@ def crash_signature_max_frames(crash_info, suggested_frames=8):
return suggested_frames + ignore

@staticmethod
def _find_ffpuppet_worker(logs):
def _find_ffpuppet_worker(logs: List[Path]) -> Optional[Path]:
"""Search through list of log files for a ffpuppet worker log.
Args:
logs (list(Path)): List of log files to search.
logs: List of log files to search.
Returns:
Path: Log file if a match is found otherwise None.
Log file if a match is found otherwise None.
"""
found = None
for log_file in (x for x in logs if "ffp_worker" in x.name):
Expand All @@ -230,14 +239,14 @@ def _find_ffpuppet_worker(logs):
return found

@staticmethod
def _find_minidump(logs):
def _find_minidump(logs: List[Path]) -> Optional[Path]:
"""Search through list of log files for a minidump log.
Args:
logs (list(Path)): List of log files to search.
logs: List of log files to search.
Returns:
Path: Log file if a match is found otherwise None.
Log file if a match is found otherwise None.
"""
re_dump_req = re_compile(
r"\d+\|0\|.+?\|google_breakpad::ExceptionHandler::WriteMinidump"
Expand All @@ -254,14 +263,14 @@ def _find_minidump(logs):
return None

@staticmethod
def _find_sanitizer(logs):
def _find_sanitizer(logs: List[Path]) -> Optional[Path]:
"""Search through list of log files for a sanitizer (ASan, UBSan, etc...) log.
Args:
logs (list(Path)): List of log files to search.
logs: List of log files to search.
Returns:
Path: Log file if a match is found otherwise None.
Log file if a match is found otherwise None.
"""
# pattern to identify the ASan crash triggered when the parent process goes away
# TODO: this may no longer be required
Expand Down Expand Up @@ -310,86 +319,83 @@ def _find_sanitizer(logs):
return found or fallback

@staticmethod
def _find_valgrind(logs):
def _find_valgrind(logs: List[Path]) -> Optional[Path]:
"""Search through log files for a Valgrind log. Empty files are ignored.
Args:
logs (list(Path)): List of log files to search.
logs: List of log files to search.
Returns:
Path: Log file if a match is found otherwise None.
Log file if a match is found otherwise None.
"""
for log_file in logs:
if "valgrind" in log_file.name and log_file.stat().st_size:
return log_file
return None

@staticmethod
def _load_log(path):
def _load_log(path: Path) -> List[str]:
"""Load and sanitize text from a file for use with CrashInfo.fromRawCrashData().
Args:
path (Path): Log file to load.
path: Log file to load.
Returns:
list(str): Text data sanitized and split into lines or None if path is None.
Text data sanitized and split into lines.
"""
if not path:
return None
return path.read_text("utf-8", errors="replace").replace("\0", "?").splitlines()

@property
def major(self):
"""The inclusive bucketing hash based on the stack trace
data found in logs.
def major(self) -> str:
"""The inclusive bucketing hash based on the stack trace data found in logs.
Args:
None
Returns:
str: major hash string.
Major hash string.
"""
if self.stack and self.stack.major is not None:
return self.stack.major
return self.DEFAULT_MAJOR

@property
def minor(self):
"""The specific bucketing hash based on the stack trace
data found in logs.
def minor(self) -> str:
"""The specific bucketing hash based on the stack trace data found in logs.
Args:
None
Returns:
str: minor hash string.
Minor hash string.
"""
if self.stack and self.stack.minor is not None:
return self.stack.minor
return self.DEFAULT_MINOR

@property
def preferred(self):
def preferred(self) -> Optional[Path]:
"""Log file containing most relevant data.
Args:
None
Returns:
Path: Log file.
Log file.
"""
assert self._logs is not None
return self._logs.aux or self._logs.stderr

@classmethod
def _select_logs(cls, path):
def _select_logs(cls, path: Path) -> Optional[LogMap]:
"""Scan path for file containing stderr, stdout and other (aux)
data and build a LogMap.
Args:
path (Path): Location to scan for log files.
path: Location to scan for log files.
Returns:
LogMap: A LogMap pointing to log files or None if path is empty.
A LogMap pointing to log files or None if path is empty.
"""
files = (x for x in path.iterdir() if x.is_file())
# order by date hopefully the oldest log is the cause of the issue
Expand Down Expand Up @@ -417,14 +423,14 @@ def _select_logs(cls, path):
return result if any(result) else None

@property
def short_signature(self):
def short_signature(self) -> str:
"""Short signature of the report created by FuzzManager.
Args:
None
Returns:
str: Short signature.
Short signature.
"""
if self._short_signature is None:
if self.is_hang:
Expand All @@ -434,16 +440,19 @@ def short_signature(self):
# FM crash signature creation failed
self._short_signature = "Signature creation failed"
else:
self._short_signature = self.crash_info.createShortSignature()
# TODO: FM is missing type hints
self._short_signature = cast(
str, self.crash_info.createShortSignature()
)
return self._short_signature

@staticmethod
def tail(in_file, size_limit):
def tail(in_file: Path, size_limit: int) -> None:
"""Tail the given file. WARNING: This is destructive!
Args:
in_file (Path): File to work with.
size_limit (int): Maximum size of file after tail operation.
in_file: File to work with.
size_limit: Maximum size of file after tail operation.
Returns:
None
Expand Down
Loading

0 comments on commit c05f69c

Please sign in to comment.