diff --git a/src/nemere/inference/formatRefinement.py b/src/nemere/inference/formatRefinement.py index 31ad572..f6ded5f 100644 --- a/src/nemere/inference/formatRefinement.py +++ b/src/nemere/inference/formatRefinement.py @@ -1,19 +1,32 @@ +import csv, numpy, IPython, logging from abc import ABC, abstractmethod -from typing import List +from typing import List, Dict, Tuple, Sequence, Iterable, Set, Union +from os.path import join, exists +from itertools import chain from bitstring import Bits +from kneed import KneeLocator +from tabulate import tabulate from pyitlib import discrete_random_variable as drv -from nemere.inference.segments import MessageSegment +from netzob.Model.Vocabulary.Messages.AbstractMessage import AbstractMessage + +from nemere.inference.segments import MessageSegment, TypedSegment from nemere.inference.segmentHandler import isExtendedCharSeq +from nemere.inference.templates import FieldTypeContext, DistanceCalculator, Template, ClusterAutoconfException, \ + OPTICSsegmentClusterer, DBSCANadjepsClusterer +from nemere.validation.dissectorMatcher import MessageComparator def isPrintableChar(char: int): + """ + :param char: Character value: integer representation of single byte value). + :return: True if it is a printable character, False otherwise. + """ if 0x20 <= char <= 0x7e or char in ['\t', '\n', '\r']: return True return False - def isPrintable(bstring: bytes) -> bool: """ A bit broader definition of printable than python string's isPrintable() @@ -75,12 +88,24 @@ def isOverlapping(segA: MessageSegment, segB: MessageSegment) -> bool: def entropyOfBytes(byteData: bytes, n=3): + """ + :param byteData: byte string to calculate the "stream" entropy for. + :param n: n-gram length in bits. + :return: Entropy of the bit n-grams of a byte string + """ bitData = Bits(bytes=byteData) ngrams = [bitData[offset:offset+n].uint for offset in range(len(bitData)-n+1)] return drv.entropy(ngrams)/n def entropyOfXor(byteDataA: bytes, byteDataB: bytes, n=3): + """ + + :param byteDataA: byte string A + :param byteDataB: byte string B + :param n: n-gram length in bits. + :return: Entropy of the bit n-grams of the XOR of two byte strings + """ bitDataA = Bits(bytes=byteDataA) bitDataB = Bits(bytes=byteDataB) trunc = min(len(bitDataA), len(bitDataB)) @@ -90,6 +115,7 @@ def entropyOfXor(byteDataA: bytes, byteDataB: bytes, n=3): class MessageModifier(ABC): + """Base class for message segmentation refinements.""" _debug = False def __init__(self, segments: List[MessageSegment]): @@ -99,7 +125,6 @@ def __init__(self, segments: List[MessageSegment]): self.segments = segments - class Merger(MessageModifier, ABC): """ Base class to merge segments based on a variable condition. @@ -129,6 +154,7 @@ def merge(self): def condition(self, segl: MessageSegment, segr: MessageSegment) -> bool: """ A generic condition called to determine whether a merging is necessary. + Needs to be implemented in subclasses. :param segl: left segment :param segr: right segment @@ -162,10 +188,72 @@ class MergeConsecutiveChars(Merger): def condition(self, segl: MessageSegment, segr: MessageSegment): """ Check whether both segments consist of printable characters. + + :param segl: left segment + :param segr: right segment + :return: True if merging is required, False otherwise. """ return isPrintable(segl.bytes) and isPrintable(segr.bytes) +class EntropyMerger(Merger): + + # min entropy above which consecutive segments are merged if both have an n-gram entropy higher than this. + consecutiveEntropiesThresh = 0.5 + consecutiveEntropiesDiffThresh = 0.2 + # min entropy of the congruence between two segments beginnings above which merging is allowed. + congruenceEntropiesThresh = 0.8 + + # # conservative thresholds: minimal or change, improves as well as reduces quality. + # consecutiveEntropiesThresh = 0.7 + # consecutiveEntropiesDiffThresh = 0.05 + # congruenceEntropiesThresh = 0.95 + + def condition(self, segl: MessageSegment, segr: MessageSegment): + clearToMerge = type(self).staticCondition(segl, segr) + if clearToMerge: + logger = logging.getLogger(__name__) + logger.debug(f"merge random segments: {segl.bytes.hex()} and {segr.bytes.hex()}") + return clearToMerge + + @staticmethod + def staticCondition(segl: MessageSegment, segr: MessageSegment): + eobl = entropyOfBytes(segl.bytes) + eobr = entropyOfBytes(segr.bytes) + diff = abs(eobl - eobr) + seglRandomEnough = eobl > EntropyMerger.consecutiveEntropiesThresh + segrRandomEnough = eobr > EntropyMerger.consecutiveEntropiesThresh + congruenceRandomEnough = entropyOfXor(segl.bytes, segr.bytes) > EntropyMerger.congruenceEntropiesThresh + consecutiveRandomEnough = diff < EntropyMerger.consecutiveEntropiesDiffThresh + clearToMerge = (consecutiveRandomEnough and seglRandomEnough and segrRandomEnough and congruenceRandomEnough) \ + or diff < 0.01 + return clearToMerge + + +class FrequencyRiseMerger(Merger): + """start new segment only when frequency rises""" + + def __init__(self, segments: List[TypedSegment], littleEndian=False): + """Abuses the fieldtype of TypedSegment to hold the occurrence frequency value.""" + super().__init__(segments) + self._littleEndian = littleEndian + # hold (retain) the last frequency value for all following unknown segment + self._holdPeak = dict() + lastFrequency = 0 + for seg in reversed(segments) if self._littleEndian else segments: + secondToLast = lastFrequency + lastFrequency = seg.fieldtype if isinstance(seg.fieldtype, int) else lastFrequency + lastFrequency = lastFrequency if lastFrequency >= secondToLast \ + else secondToLast - (secondToLast - lastFrequency) // 4 # dampen downwards + self._holdPeak[seg] = lastFrequency + + def condition(self, segl: TypedSegment, segr: TypedSegment) -> bool: + if self._littleEndian: + return self._holdPeak[segl] <= self._holdPeak[segr] + else: + return self._holdPeak[segl] >= self._holdPeak[segr] + + class RelocateSplits(MessageModifier, ABC): """ Relocate split locations based on properties of adjacent segments. @@ -262,8 +350,8 @@ class ResplitConsecutiveChars(RelocateSplits): @staticmethod def toTheLeft(segl: MessageSegment) -> int: """ - :param segl: - :return: the count of printable chars at the end of the segment + :param segl: The current segment + :return: The count of printable chars at the end of the segment """ splitpos = segl.length for char in reversed(segl.bytes): @@ -276,9 +364,8 @@ def toTheLeft(segl: MessageSegment) -> int: @staticmethod def toTheRight(segr: MessageSegment) -> int: """ - - :param segr: - :return: the count of printable chars at the beginning of the segment + :param segr: The current segment + :return: The count of printable chars at the beginning of the segment """ splitpos = 0 for char in segr.bytes: @@ -372,9 +459,11 @@ def countPairFrequencies(allMsgsSegs: List[List[MessageSegment]]): @staticmethod def frequencies(): + """ + :return: The frequencies of all pairs of consecutive values in this set of segments. + """ return Resplit2LeastFrequentPair.__pairFrequencies - def split(self): """ Perform the splitting of the segments. @@ -407,7 +496,6 @@ def split(self): mangledSegments.append(segc) return mangledSegments - @staticmethod def lookupLeastFrequent(seg: MessageSegment) -> int: """ @@ -491,6 +579,10 @@ def countCommonValues(segmentedMessages: List[List[MessageSegment]]): return mocoShort def split(self): + """ + Perform the split of segments that contain frequent substrings. + :return: The message segments with the segments replaced by refined segments. + """ newmsg = list() for sid, seg in enumerate(self.segments): # enum necessary to change to in place edit after debug (want to do?) didReplace = False @@ -559,7 +651,7 @@ def merge(self): ... if sgms != sgss: ... print("Mismatch!") - :return: a new set of segments after the input has been merged + :return: A new set of segments after the input has been merged """ minLen = 6 @@ -618,7 +710,6 @@ class SplitFixed(MessageModifier): def split(self, segmentID: int, chunkLength: int): """ - :param segmentID: The index of the segment to split within the sequence of segments composing the message :param chunkLength: The fixed length of the target segments in bytes :return: The message segments with the given segment replaced by multiple segments of the given fixed length. @@ -635,5 +726,1819 @@ def split(self, segmentID: int, chunkLength: int): return self.segments +class RelocateZeros(MessageModifier): + """Improve its raw segments by relocating inferred boundaries near the beginning and end of sequences of nulls.""" + + def __init__(self, segments: List[MessageSegment], comparator: MessageComparator): + super().__init__(segments) + self._parsedMessage = comparator.parsedMessages[comparator.messages[segments[0].message]] + self.counts = None + self.doprint = False + + def split(self): + trueFieldEnds = MessageComparator.fieldEndsFromLength([l for t, l in self._parsedMessage.getFieldSequence()]) + + newmsg = self.segments[0:1] + for segr in self.segments[1:]: + segl = newmsg[-1] + # exactly one zero right before or after boundary: no change + if segl.bytes[-1] == 0 and segr.bytes[0] != 0 or segr.bytes[0] == 0 and segl.bytes[-1] != 0: + if segr.offset in trueFieldEnds: + self.counts["0t"] += 1 + else: + self.counts["0f"] += 1 + newmsg.append(segr) + else: + # prio 1: zero to the right of boundary + if segr.length > 2 and segr.bytes[1] == 0 and segr.bytes[0] != 0 and segr.bytes[2] == 0: + # shift right + newmsg[-1] = MessageSegment(segl.analyzer, segl.offset, segl.length + 1) + newmsg.append(MessageSegment(segr.analyzer, segr.offset + 1, segr.length - 1)) + if newmsg[-1].offset in trueFieldEnds: + self.counts["+1tr"] += 1 + else: + self.counts["+1fr"] += 1 + if segr.offset in trueFieldEnds: + self.counts["+1.0"] += 1 + self.doprint = True + elif segl.length > 1 and segl.bytes[-1] == 0 and segl.bytes[-2] != 0: + # shift left (never happens) + newmsg[-1] = MessageSegment(segl.analyzer, segl.offset, segl.length - 1) + newmsg.append(MessageSegment(segr.analyzer, segr.offset - 1, segr.length + 1)) + if newmsg[-1].offset in trueFieldEnds: + self.counts["-1tr"] += 1 + else: + self.counts["-1fr"] += 1 + if segr.offset in trueFieldEnds: + self.counts["-1.0"] += 1 + self.doprint = True + # prio 2: 2 zeros to the left of boundary + elif segr.length > 1 and segr.bytes[0] == 0 and segr.bytes[1] != 0 and \ + segl.length > 1 and segl.bytes[-1] == 0: + # shift right (never happens) + newmsg[-1] = MessageSegment(segl.analyzer, segl.offset, segl.length + 1) + newmsg.append(MessageSegment(segr.analyzer, segr.offset + 1, segr.length - 1)) + if newmsg[-1].offset in trueFieldEnds: + self.counts["+1tl"] += 1 + else: + self.counts["+1fl"] += 1 + if segr.offset in trueFieldEnds: + self.counts["+1.0"] += 1 + self.doprint = True + elif segl.length > 2 and segl.bytes[-2] == 0 and segl.bytes[-1] != 0 and \ + segl.bytes[-3] == 0: + # shift left + newmsg[-1] = MessageSegment(segl.analyzer, segl.offset, segl.length - 1) + newmsg.append(MessageSegment(segr.analyzer, segr.offset - 1, segr.length + 1)) + if newmsg[-1].offset in trueFieldEnds: + self.counts["-1tl"] += 1 + else: + self.counts["-1fl"] += 1 + if segr.offset in trueFieldEnds: + self.counts["-1.0"] += 1 + self.doprint = True + else: + # no zeros at boundary + newmsg.append(segr) + assert len(newmsg) > 1 and newmsg[-2].nextOffset == newmsg[-1].offset or newmsg[-1].offset == 0 + return newmsg + + def initCounts(self, amounts: Dict[str, int] = None): + """ + Counts of different kinds of off-by-one situations for zeros + + :return: + """ + if amounts is None: + self.counts = { + "0t":0, "0f":0, "-1tr":0, "-1fr":0, "-1tl":0, "-1fl":0, "+1tr":0, "+1fr":0, "+1tl":0, "+1fl":0, + "-1.0":0, "+1.0":0 + } # key: boundary offset change + true/false + else: + self.counts = amounts + + + + +class RelocatePCA(object): + + # PCA conditions parameters + minSegLen = 3 # minimum number of cluster members + maxAbsolutePrincipals = 4 # absolute maximum of significant principal components for PCA/sub-clustering + screeMinThresh = 10 # threshold for the minimum of a component to be considered principal + principalCountThresh = .5 # threshold for the maximum number of allowed principal components to start the analysis; + # interpreted as fraction of the component count as dynamic determination. + contributionRelevant = 0.1 # 0.12; threshold for the minimum difference from 0 + # to consider a loading component in the eigenvector as contributing to the variance + # maxLengthDelta = 30 # (max true: 6 in dns-new, min false 9 in dhcp) # replaced by maxLengthDeltaRatio + maxLengthDeltaRatio = 0.5 # threshold for the ratio between sortest and longest segment. + + pcDeltaMin = 0.98 + + # e1 parameters + nearZero = 0.030 # 0.003 + notableContrib = 0.75 # 0.66 # or 0.7 (see smb tf04) + # peak may be (near) +/- 1.0 in most cases, but +/- 0.5 includes also ntp tf06 and smb tf04, + # however, it has false positive dhcp tf01 and smb tf00.4 + + # e2 parameters + # also apply to higher nearZero and lower notableContrib if longer (>= 4) sequence of nearZero + # precedes notableContrib + relaxedNearZero = 0.050 # 0.004 + relaxedNZlength = 4 + relaxedNotableContrib = 0.005 # 0.003 + relaxedMaxContrib = 1.0 # 0.66 + + def __init__(self, similarSegments: FieldTypeContext, + eigenValuesAndVectors: Tuple[numpy.ndarray, numpy.ndarray]=None, + screeKnee: float=None, littleEndian: bool=False): + self._similarSegments = similarSegments + self._eigen = eigenValuesAndVectors if eigenValuesAndVectors is not None \ + else numpy.linalg.eigh(similarSegments.cov) + """Tuple of eigenvalues (one-dimensional ndarray) and eigenvectors (two-dimensional ndarray)""" + self._screeKnee = screeKnee if screeKnee is not None \ + else RelocatePCA.screeKneedle(self._eigen) + + self._screeThresh = max(self._screeKnee, RelocatePCA.screeMinThresh, max(self._eigen[0]) / 10) \ + if any(self._eigen[0] < RelocatePCA.screeMinThresh) \ + else RelocatePCA.screeMinThresh + """ + defines a minimum for any principal component, that is + * at least the knee in scree, + * not less than one magnitude smaller then the first PC, + * and larger than the absolute minimum. + """ + self._principalComponents = self._eigen[0] > self._screeThresh + """**significant** principal components""" + self._contribution = self._eigen[1][:, self._principalComponents] # type: numpy.ndarray + """contributions of loadings per **significant** principal component""" + self._littleEndian = littleEndian + + self._testSubclusters = None + + + @property + def similarSegments(self): + return self._similarSegments + + + @property + def eigen(self): + return self._eigen + + + @staticmethod + def screeKneedle(eigenValuesAndVectors: Tuple[numpy.ndarray, numpy.ndarray]) -> float: + """ + Scree is the name of the downward curve of the eigenvalues of the principal components. Typically, it has a + steep drop from the components with the most significant contribution to the variance towards a leveled tail of + low eigenvalues. The transition between the principal components with a significant contribution and the + negligible components is marked by the knee of the curve, according to common analysis methods. To determine + the knee, we facilitate the Kneedle algorithm (`KneeLocator`). + + :param eigenValuesAndVectors: + :return: y-value of the knee, 0 if there is no knee. + """ + scree = list(reversed(sorted(eigenValuesAndVectors[0].tolist()))) + try: + import warnings + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + kl = KneeLocator( + list(range(eigenValuesAndVectors[0].shape[0])), + scree, curve='convex', direction='decreasing') + except ValueError: + return 0.0 + if not isinstance(kl.knee, int): + return 0.0 + + # if kl.knee > 1: + # print(scree) + # import matplotlib.pyplot as plt + # kl.plot_knee_normalized() + # plt.show() + # # IPython.embed() + + return scree[kl.knee] + + + + @staticmethod + def filterForSubclustering(fTypeContext: Sequence[FieldTypeContext]): + interestingClusters = [ + cid for cid, clu in enumerate(fTypeContext) + # if not clu.length < RelocatePCA.minSegLen + # and not sum([isExtendedCharSeq(seg.bytes) for seg in clu.baseSegments]) > .5 * len(clu.baseSegments) + # and not all(clu.stdev == 0) + # # moved to second iteration below to make it dynamically dependent on the scree-knee: + # and not all(numpy.linalg.eigh(clu.cov)[0] <= screeMinThresh) + ] + + return interestingClusters + + + @staticmethod + def filterRelevantClusters(fTypeContext: Sequence[FieldTypeContext]): + """ + Filteres clusters that have the required properties to be subclustered. Ensures that the kneedle algorithm + can be run on each cluster of the resulting subset of clusters. + + :param fTypeContext: + :return: tuple containing: + * list of indices of the given fTypeContext that are relevant. + * dict of eigenvalues and vectors with the fTypeContext indices as keys + * dict of scree-knee values with the fTypeContext indices as keys + """ + interestingClusters = RelocatePCA.filterForSubclustering(fTypeContext) + + # leave only clusters of at least minSegLen (=6) + minClusterSize = RelocatePCA.minSegLen + for cid in interestingClusters.copy(): + # must be at least minClusterSize unique segment values + if len(set([bs.bytes for bs in fTypeContext[cid].baseSegments])) < minClusterSize: + interestingClusters.remove(cid) + + # remove all clusters that have no knees in scree + eigenVnV = dict() + screeKnees = dict() + for cid in interestingClusters.copy(): + eigen = numpy.linalg.eigh(fTypeContext[cid].cov) # type: Tuple[numpy.ndarray, numpy.ndarray] + eigenVnV[cid] = eigen + screeKnees[cid] = RelocatePCA.screeKneedle(eigen) + if all(eigen[0] <= screeKnees[cid]): + interestingClusters.remove(cid) + + # for cid, clu in enumerate(fTypeContext): + # if cid not in interestingClusters: + # eigen = numpy.linalg.eigh(fTypeContext[cid].cov) + # print(eigen[0]) + # IPython.embed() + + # # remove all clusters having a segment length difference of more than maxLengthDelta # TODo removed due to ntp quality in zeropca-016-refinement-79d3ba4-ntp and nemesys-107-refinement-79d3ba4 + # for cid in interestingClusters.copy(): + # bslen = {bs.length for bs in fTypeContext[cid].baseSegments} + # if min(bslen) / max(bslen) < RelocatePCA.maxLengthDeltaRatio: + # interestingClusters.remove(cid) + # print("Removed cluster {} for too high segment length difference ratio of {}".format( + # fTypeContext[cid].fieldtype, + # (max(bslen) - min(bslen))/max(bslen) + # )) + + return interestingClusters, eigenVnV, screeKnees + + + @property + def principalComponents(self): + """**significant** principal components (scores)""" + return self._principalComponents + + + @property + def screeThresh(self): + return self._screeThresh + + + @property + def contribution(self): + """contributions of loadings per **significant** principal component""" + return self._contribution + + + @staticmethod + def _preFilter(segments: Sequence[MessageSegment], label: str) -> Tuple[FieldTypeContext, bool]: + """ + Create FieldTypeContext object from segments and + apply to decide about the next recursion in sub-clustering. + + Filter conditions are: + * enoughSegments + * enoughVariance + * notOnlyChars + + :param segments: cluster result as raw list of segments + :param label: cluster label + :return: tuple of a + * FieldTypeContext object for the list of segments and + * the result of the filter, telling whether the cluster can be further analyzed by PCA or sub-clustering: + exclude if false. + """ + # resolve templates into their single segments + resolvedSegments = list() + for seg in segments: + if isinstance(seg, Template): + resolvedSegments.extend(seg.baseSegments) + else: + resolvedSegments.append(seg) + + # Align segments. Calc mean, std, cov, ... + ftContext = FieldTypeContext(resolvedSegments) + ftContext.fieldtype = label + + return ftContext, RelocatePCA.__preFilter(ftContext) + + + @staticmethod + def __preFilter(ftContext: FieldTypeContext) -> bool: + """ + Apply to decide about the next recursion in sub-clustering. + + Filter conditions are: + * enoughSegments + * enoughVariance + * notOnlyChars + + :param ftContext: cluster result as FieldTypeContext object + :return: the result of the filter, telling whether the cluster can be further analyzed by PCA or sub-clustering: + exclude if false. + """ + uniqueSegVals = len(set(bs.bytes for bs in ftContext.baseSegments)) + enoughSegments = uniqueSegVals >= RelocatePCA.minSegLen + enoughVariance = not all(ftContext.stdev == 0) + notOnlyChars = not all([isExtendedCharSeq(seg.bytes) for seg in ftContext.baseSegments]) + + if not enoughSegments: + print("Cluster {} has not enough unique segments ({}).".format(ftContext.fieldtype, uniqueSegVals)) + if not enoughVariance: + print("Cluster {} has no variance.".format(ftContext.fieldtype)) + if not notOnlyChars: + print("Cluster {} has only chars.".format(ftContext.fieldtype)) + + return enoughSegments and enoughVariance and notOnlyChars + + + def _meetsPCAprerequisites(self) -> bool: + """ + Apply at the beginning of a new sub-clustering recusion between PCA or further sub-clustering. + + :return: Result of the test, whether all PCA prerequisites are met. + """ + # number of principal components, including fallback in self._screeThresh if there is no knee + tooManyPCs = sum(self._eigen[0] > self._screeThresh) > \ + min(RelocatePCA.maxAbsolutePrincipals, self._eigen[0].shape[0] * RelocatePCA.principalCountThresh) + # segment length difference is too high + bslen = {bs.length for bs in self.similarSegments.baseSegments} + tooHighLenDiff = min(bslen) / max(bslen) < RelocatePCA.maxLengthDeltaRatio + charSegCount = sum([isExtendedCharSeq(seg.bytes) for seg in self._similarSegments.baseSegments]) + tooManyChars = charSegCount > .5 * len(self._similarSegments.baseSegments) + + if tooManyPCs: + print("Cluster {} needs reclustering: too many principal components ({}/{}).".format( + self._similarSegments.fieldtype, sum(self._eigen[0] > self._screeThresh), self._eigen[0].shape[0])) + if tooHighLenDiff: + print("Cluster {} needs reclustering: length difference too high ({:.2f}).".format( + self._similarSegments.fieldtype, min(bslen) / max(bslen))) + if tooManyChars: + print("Cluster {} needs reclustering: too many char segments ({:d}).".format( + self._similarSegments.fieldtype, charSegCount)) + + return not tooManyPCs and not tooHighLenDiff and not tooManyChars + + + def getSubclusters(self, dc: DistanceCalculator = None, S: float = None, + reportFolder: str = None, trace: str = None) -> List[Union['RelocatePCA', FieldTypeContext]]: + """ + Recursive sub-cluster. + + :param dc: Distance calculator for clustering. + :param S: Kneedle sensitivity parameter for autodetection of DBSCAN clustering parameter. + :param reportFolder: + :param trace: + TODO :param collectEvaluationData: For evaluation: Collect the intermediate (sub-)clusters generated during + the analysis of the segments. + :return: A flat list of subclusters. If no subclustering was necessary or possible, returns itself. + """ + # # # # # # # # # # # # # # # # # # # # # # # # + # terminate recursion and return self for PCA + if self._meetsPCAprerequisites(): + return [self] + # # # # # # # # # # # # # # # # # # # # # # # # + + # no DC available for clustering + if dc is None: + print("No dissimilarities available. Ignoring cluster {}.".format(self._similarSegments.fieldtype)) + return [] + + # Sub-Cluster + try: + clusterer = DBSCANadjepsClusterer(dc, segments=self._similarSegments.baseSegments, S=S) + noise, *clusters = clusterer.clusterSimilarSegments(False) + print(self._similarSegments.fieldtype, + clusterer, "- cluster sizes:", [len(s) for s in clusters], "- noise:", len(noise)) + + # # # # # # # # # # # # # # # # # # # # # # # # + # write statistics + if reportFolder is not None and trace is not None: + fn = join(reportFolder, "subcluster-parameters.csv") + writeheader = not exists(fn) + with open(fn, "a") as segfile: + segcsv = csv.writer(segfile) + if writeheader: + segcsv.writerow( + ["trace", "cluster label", "cluster size", + "max segment length", "# principals", + "max principal value", + "# subclusters", "noise", "Kneedle S", "DBSCAN eps", "DBSCAN min_samples" + ] + ) + segcsv.writerow([ + trace, self.similarSegments.fieldtype, len(self.similarSegments.baseSegments), + self.similarSegments.length, sum(self.principalComponents), + self._eigen[0][self._principalComponents].max(), + len(clusters), len(noise), clusterer.S, clusterer.eps, clusterer.min_samples + ]) + + # if there remains only noise, ignore cluster + if len(clusters) == 0: + # TODO test impact + if RelocatePCA.__preFilter(self._similarSegments): + # # # # # # # # # # # # # # # # # # # # # # # # + # terminate recursion and return self for PCA as "last resort" + print("Use super-cluster {}: only noise.".format(self._similarSegments.fieldtype)) + return [self] + else: + print("Ignore cluster {}: only noise.".format(self._similarSegments.fieldtype)) + return [] + + subclusters = list() + for cid, segments in enumerate(clusters): + cLabel = "{}.{}".format(self._similarSegments.fieldtype, cid) + # Generate suitable FieldTypeContext objects from the sub-clusters + ftContext, doRecurse = RelocatePCA._preFilter(segments, cLabel) + # if basic requirements not met, exclude from PCA analysis + if not doRecurse: + # # # # # # # # # # # # # # # # # # # # # # # # + # stop further recursion and ignore this cluster + print("Ignore subcluster {} due to pre-filter.".format(ftContext.fieldtype)) + # performing "common bound" refinement here does not improve the results: + # * is triggered in only few cases + # * there are just a handfull of segments that may benefit + # * there is some probability to produce more FP than TP. + continue + # # # # # # # # # # # # # # # # # # # # # # # # + print("Analyzing sub-cluster", ftContext.fieldtype) + try: + subcluster = RelocatePCA(ftContext, littleEndian=self._littleEndian) + # # # # # # # # # # # # # # # # # # # # # # # # + # descend into recursion + subclusters.extend(subcluster.getSubclusters(dc, S, reportFolder, trace)) + # # # # # # # # # # # # # # # # # # # # # # # # + except numpy.linalg.LinAlgError: + print("Ignore subcluster due to eigenvalues did not converge") + print(repr(ftContext.baseSegments)) + return subclusters + except ClusterAutoconfException as e: + print(e) + return [self] + + + + + def relocateOffsets(self, reportFolder:str = None, trace:str = None, comparator: MessageComparator = None, + conditionA = True, conditionE1 = False, conditionE2 = True, + conditionF = False, conditionG = False): + """ + + :param conditionA: Enable Condition A + :param conditionE1: Enable Condition E1 + :param conditionE2: Enable Condition E2 + :param conditionF: Enable Condition F + :param conditionG: Enable Condition G + :param reportFolder: For debugging + :param trace: For debugging + :param comparator: For debugging + :return: positions to be relocated if the PCA for the current cluster has meaningful result, + otherwise sub-cluster and return the PCA and relocation positions for each sub-cluster. + """ + import csv + + # # # # # # # # # # # # # # # # # # # # # # # # + # Count true boundaries for the segments' relative positions + if comparator: + from collections import Counter + trueOffsets = list() + for bs in self.similarSegments.baseSegments: + fe = comparator.fieldEndsPerMessage(bs.analyzer.message) + offs, nxtOffs = self.similarSegments.paddedPosition(bs) + trueOffsets.extend(o - offs for o in fe if offs <= o <= nxtOffs) + truOffCnt = Counter(trueOffsets) + mostCommonTrueBounds = [offs for offs, cnt in truOffCnt.most_common() + if cnt > 0.5 * len(self.similarSegments.baseSegments)] + + # # # # # # # # # # # # # # # # # # # # # # # # + # "component analysis" + # + # the principal components (i. e. with Eigenvalue > thresh) of the covariance matrix are assumed to peak + # towards the end of varying fields with similar content (e. g. counting numbers). + # The component is near 1 or -1 in the Eigenvector of the respective Eigenvalue. + + relocate = list() # new boundaries found: relocate the next end to this relative offset + # relocateFromStart = list() # new boundaries found: relocate the previous start to this relative offset + + # continue only if we have some principal components + if not self.principalComponents.any(): + return relocate + + littleEndian = self._littleEndian + if littleEndian: + print("Assuming little endian") + + # # Condition a: Covariance ~0 after non-0 + # at which eigenvector component does any of the principal components have a relevant contribution + contributingLoadingComponents = (abs(self._contribution) > RelocatePCA.contributionRelevant).any(1) + contribution = self._contribution + + if littleEndian: + contributingLoadingComponents = numpy.flip(contributingLoadingComponents) + contribution = numpy.flip(contribution) + + if conditionA: + # lc (loading component) is equivalent to the byte offset in the message + for lc in reversed(range(1, contributingLoadingComponents.shape[0])): + # a "significant" relative drop in covariance + relPCdelta = (abs(contribution[lc - 1]).max() - abs(contribution[lc]).max()) \ + / abs(contribution[lc - 1]).max() + + if not contributingLoadingComponents[lc] and contributingLoadingComponents[lc - 1] \ + and relPCdelta > RelocatePCA.pcDeltaMin: + if littleEndian: + offset = contributingLoadingComponents.shape[0] - lc + else: + offset = lc + relocate.append(offset) + + # # # # # # # # # # # # # # # # # # # # # # # # + # write statistics + if reportFolder is not None and trace is not None: + fn = join(reportFolder, "pca-conditions-a.csv") + writeheader = not exists(fn) + with open(fn, "a") as segfile: + segcsv = csv.writer(segfile) + if writeheader: + segcsv.writerow( + ["trace", "cluster label", "cluster size", + "max segment length", "# principals", + "max principal value", + "is FP", + "first boundary true", + "final boundary true", + "offset", "max contribution before offset", "max contribution at offset"] + ) + # noinspection PyUnboundLocalVariable + segcsv.writerow([ + trace, self.similarSegments.fieldtype, len(self.similarSegments.baseSegments), + self.similarSegments.length, sum(self.principalComponents), + self._eigen[0][self._principalComponents].max(), + repr(offset not in mostCommonTrueBounds) if comparator else "", + repr(0 in mostCommonTrueBounds) if comparator else "", + repr(self.similarSegments.length in mostCommonTrueBounds) if comparator else "", + offset, abs(contribution[lc - 1]).max(), abs(contribution[lc]).max() + ]) + + # # # Condition b: Loadings peak in the middle a segment + # principalLoadingPeak = abs(self._eigen[1][:, self._eigen[0].argmax()]).argmax() + # if principalLoadingPeak < self._eigen[0].shape[0] - 1: + # relocate.append(principalLoadingPeak+1) + + # # # Condition c: + # tailSize = 1 + # if not contributingLoadingComponents[:-tailSize].any(): + # for tailP in range(tailSize, 0, -1): + # if contributingLoadingComponents[-tailP]: + # relocate.append(self._eigen[0].shape[0] - tailSize) + # break + + # # Condition d: + # cancelled + + + # prepare list of eigen vectors sorted by eigen values + eigVsorted = list(reversed(sorted([(val, vec) for val, vec in zip( + self._eigen[0][self.principalComponents], self.contribution.T)], + key=lambda x: x[0]))) + eigVecS = numpy.array([colVec[1] for colVec in eigVsorted]).T + + # TODO caveat: precision of numerical method for cov or eigen does not suffice for near zero resolution + # in all cases. e. g. setting nearZero to 0.003 indeterministically results in a false negative for the + # condition. Setting it higher, might introduce more false positives. + + # # Condition e: Loading peak of principal component rising from (near) 0.0 + # + + # apply to multiple PCs to get multiple cuts, see smb tf01 + # leads to only one improvement and one FP in 100s traces. Removed again. + # for rank in range(eigVecS.shape[1]): + # rank = 0 + + rnzCount = 0 + for lc in range(1, eigVecS.shape[0]): + # alternative: just one PC + # pcLoadings = eigVecS[:, rank] + + if littleEndian: + pcLoadings = numpy.flip(eigVecS) + else: + pcLoadings = eigVecS + if all(abs(pcLoadings[lc - 1]) < RelocatePCA.relaxedNearZero): + rnzCount += 1 + else: + rnzCount = 0 + relPCdelta = (abs(pcLoadings[lc]).max() - abs(pcLoadings[lc - 1]).max()) / abs(pcLoadings[lc]).max() + + if conditionE1 \ + and all(abs(pcLoadings[lc - 1]) < RelocatePCA.nearZero) \ + and any(abs(pcLoadings[lc]) > RelocatePCA.notableContrib): + if littleEndian: + offset = eigVecS.shape[0] - lc + else: + offset = lc + # # # # # # # # # # # # # # # # # # # # # # # # + # write statistics + if reportFolder is not None and trace is not None: + fn = join(reportFolder, "pca-conditions-e1.csv") + writeheader = not exists(fn) + with open(fn, "a") as segfile: + segcsv = csv.writer(segfile) + if writeheader: + segcsv.writerow( + ["trace", "cluster label", "cluster size", + "max segment length", "# principals", + "rank", "rank principal value", + "is FP", + "near new bound", + "offset", "principal contribution before offset", "principal contribution at offset"] + ) + segcsv.writerow([ + trace, self.similarSegments.fieldtype, len(self.similarSegments.baseSegments), + self.similarSegments.length, sum(self.principalComponents), + # rank, eigVsorted[rank][0], + "all", "-", + repr(offset not in mostCommonTrueBounds) if comparator else "", + repr(any(nearBound in relocate for nearBound in [offset, offset - 1, offset + 1])), + offset, max(abs(pcLoadings[lc - 1])), max(abs(pcLoadings[lc])) + ]) + relocate.append(offset) + + # has been close to zero for at least the previous **relaxedNZlength** bytes + # and any loading is greater than relaxedNotableContrib + # and all loadings are less than relaxedMaxContrib + # and ... + elif conditionE2 \ + and rnzCount >= RelocatePCA.relaxedNZlength \ + and any(abs(pcLoadings[lc]) > RelocatePCA.relaxedNotableContrib) \ + and all(abs(pcLoadings[lc]) < RelocatePCA.relaxedMaxContrib) \ + and relPCdelta > RelocatePCA.pcDeltaMin: + # # that is away more than 1 position from another new cut (see smb tf03), leads to only one FP + # not any(nearBound in relocate for nearBound in [lc, lc - 1, lc + 1]): + if littleEndian: + offset = eigVecS.shape[0] - lc + else: + offset = lc + # # # # # # # # # # # # # # # # # # # # # # # # + # write statistics + if reportFolder is not None and trace is not None: + fn = join(reportFolder, "pca-conditions-e2.csv") + writeheader = not exists(fn) + with open(fn, "a") as segfile: + segcsv = csv.writer(segfile) + if writeheader: + segcsv.writerow( + ["trace", "cluster label", "cluster size", + "max segment length", "# principals", + "rank", "rnzCount", + "is FP", + "near new bound", + "offset", "principal contribution before offset", "principal contribution at offset"] + ) + segcsv.writerow([ + trace, self.similarSegments.fieldtype, len(self.similarSegments.baseSegments), + self.similarSegments.length, sum(self.principalComponents), + "all", rnzCount, + repr(offset not in mostCommonTrueBounds) if comparator else "", + repr(any(nearBound in relocate for nearBound in [offset, offset - 1, offset + 1])), + lc, max(abs(pcLoadings[lc - 1])), max(abs(pcLoadings[lc])) + ]) + relocate.append(offset) + + # # To search for a case of the "caveat: precision of numerical method" (see above): + # if trace == "dns_ictf2010_deduped-100" and self.similarSegments.fieldtype == "tf03" and 7 not in relocate: + # print("#"*40 + "\nThis is the conditionE1-bug!\n" + "#"*40) + # IPython.embed() + + # # Condition f: inversion of loading of the first principal component if it has a "notable" loading, i. e., + # transition from/to: -0.5 < --|-- > 0.5 + # just concerns ntp tf01 + if conditionF: + # TODO little endian not implemented! + for lc in range(1, eigVecS.shape[0]): + pcLoadings = eigVecS[:, 0] + if pcLoadings[lc - 1] < -RelocatePCA.relaxedNotableContrib \ + and pcLoadings[lc] > RelocatePCA.relaxedNotableContrib \ + or pcLoadings[lc - 1] > RelocatePCA.relaxedNotableContrib \ + and pcLoadings[lc] < -RelocatePCA.relaxedNotableContrib: + + # # # # # # # # # # # # # # # # # # # # # # # # + # write statistics + if reportFolder is not None and trace is not None: + fn = join(reportFolder, "pca-conditions-f.csv") + writeheader = not exists(fn) + with open(fn, "a") as segfile: + segcsv = csv.writer(segfile) + if writeheader: + segcsv.writerow( + ["trace", "cluster label", "cluster size", + "max segment length", "# principals", + "rank principal value", + "is FP", + "near new bound", + "offset", "principal contribution before offset", + "principal contribution at offset"] + ) + segcsv.writerow([ + trace, self.similarSegments.fieldtype, len(self.similarSegments.baseSegments), + self.similarSegments.length, sum(self.principalComponents), + eigVsorted[0][0], + repr(lc not in mostCommonTrueBounds) if comparator else "", + repr(any(nearBound in relocate for nearBound in [lc, lc - 1, lc + 1])), + lc, abs(pcLoadings[lc - 1]), abs(pcLoadings[lc]) + ]) + + relocate.append(lc) + + # # Condition g: + if conditionG and eigVecS.shape[1] > 1: + # TODO little endian not implemented! + smallLoadingDelta = 0.5 + + for lc in range(1, eigVecS.shape[0]): + pcLoadings = eigVecS + + if max(pcLoadings[lc - 1]) - min(pcLoadings[lc - 1]) < smallLoadingDelta \ + < pcLoadings[lc - 1].mean() - pcLoadings[lc].mean() \ + and max(pcLoadings[lc]) - min(pcLoadings[lc]) < smallLoadingDelta: + + # # # # # # # # # # # # # # # # # # # # # # # # + # write statistics + if reportFolder is not None and trace is not None: + fn = join(reportFolder, "pca-conditions-g.csv") + writeheader = not exists(fn) + with open(fn, "a") as segfile: + segcsv = csv.writer(segfile) + if writeheader: + segcsv.writerow( + ["trace", "cluster label", "cluster size", + "max segment length", "# principals", + "is FP", + "near new bound", + "offset", + "contribution delta before offset", + "contribution delta at offset", + "contribution shift at offset", + ] + ) + segcsv.writerow([ + trace, self.similarSegments.fieldtype, len(self.similarSegments.baseSegments), + self.similarSegments.length, sum(self.principalComponents), + repr(lc not in mostCommonTrueBounds) if comparator else "", + repr(any(nearBound in relocate for nearBound in [lc, lc - 1, lc + 1])), + lc, + max(pcLoadings[lc - 1]) - min(pcLoadings[lc - 1]), + max(pcLoadings[lc]) - min(pcLoadings[lc]), + pcLoadings[lc - 1].mean() - pcLoadings[lc].mean() + ]) + + relocate.append(lc) + + + return relocate + + + def relocateBoundaries(self, comparator: MessageComparator = None, reportFolder:str = None) \ + -> Dict[MessageSegment, List[int]]: + """ + Determine new boundaries for all segments in the RelocatePCA object. + Performed by: + * relocatedBounds + * relocatedCommons + + :param comparator: For evaluation: Encapsulated true field bounds to compare results to. + :param reportFolder: For evaluation: Destination path to write results and statistics to. + :return: Relocated boundaries. + """ + from os.path import splitext, basename + trace = splitext(basename(comparator.specimens.pcapFileName))[0] if comparator else None + + # prepare proposed new bounds + relocate = self.relocateOffsets(reportFolder, trace, comparator) + + # prepare different views on the newly proposed offsets + paddOffs = {bs: self.similarSegments.paddedPosition(bs) for bs in self.similarSegments.baseSegments} + baseOffs = {bs: self.similarSegments.baseOffset(bs) for bs in self.similarSegments.baseSegments} + endOffs = {bs: self.similarSegments.baseOffset(bs) + bs.length + for bs in self.similarSegments.baseSegments} + fromEnd = {bs: self.similarSegments.maxLen - self.similarSegments.baseOffset(bs) - bs.length + for bs in self.similarSegments.baseSegments} + minBase = min(baseOffs.values()) + + # translate padded offsets to "local segment-wise offsets" + segSpecificRel = {bs: sorted({rel - baseOffs[bs] for rel in relocate}) + for bs in self.similarSegments.baseSegments} + + # # # # # # # # # # # # # # # # # # # # # # # # + # generate the new cuts from the proposed bounds + relocatedBounds = dict() + for seg, rel in segSpecificRel.items(): + newBounds = list() + # move vs. add first segment + if len(rel) == 0 or rel[0] > 1: + newBounds.append(0) + # new boundaries + for rend in rel: + newBounds.append(rend) + # move vs. add last segment + if len(rel) == 0 or rel[-1] < seg.length - 1: + newBounds.append(seg.length) + relocatedBounds[seg] = newBounds + newPaddingRelative = {bs: [rbound + baseOffs[bs] for rbound in relocatedBounds[bs]] + for bs in self.similarSegments.baseSegments} + # # # # # # # # # # # # # # # # # # # # # # # # + + # # # # # # # # # # # # # # # # # # # # # # # # + # padded range refinement (+ preparation) + # + # padding-relative positions of boundary moves from that position and moves to that position + # based on the starts and ends of the original segment bounds. + moveFrom = dict() + moveTo = dict() + for seg, rel in relocatedBounds.items(): + moveFrom[seg] = list() + moveTo[seg] = list() + if rel[0] > 0: + moveFrom[seg].append(baseOffs[seg]) + moveTo[seg].append(newPaddingRelative[seg][0]) + if rel[-1] < seg.length: + moveFrom[seg].append(endOffs[seg]) + moveTo[seg].append(newPaddingRelative[seg][-1]) + # # # # # # # # # # # # # # # # # # # # # # # # + commonBounds = RelocatePCA.CommonBoundUtil(baseOffs, endOffs, moveFrom, moveTo) + relocatedCommons = commonBounds.frequentBoundReframing(newPaddingRelative, relocate) + # # # # # # # # # # # # # # # # # # # # # # # # + + if comparator and reportFolder: + import tabulate as tabmod + tabmod.PRESERVE_WHITESPACE = True + + # # # # # # # # # # # # # # # # # # # # # # # # + # generate value matrix for visualization + valMtrx = list() + for seg, rel in relocatedBounds.items(): + segVal = [seg.bytes.hex()] + + emptyRelStart = sum(globRel <= baseOffs[seg] for globRel in relocate) + emptyRelStart -= 1 if rel[0] > 0 else 0 + segVal.extend([""]*emptyRelStart) + + if rel[0] >= 0 and (len(relocate) == 0 or min(relocate) - baseOffs[seg] > 0): + prepend = " " * (baseOffs[seg] - minBase) + else: + prepend = "" + + # segment continues + if rel[0] > 0: + segVal.append(prepend[:-2] + " ~" + seg.bytes[:rel[0]].hex()) + prepend = "" + + # determine translated start and end of new boundaries per segment and cut bytes accordingly. + for rstart, rend in zip(rel[:-1], rel[1:]): + if rend < 0: + segVal.append("") + prepend = "" + continue + if rstart < 0: + prepend += " " * -rstart + rstart = 0 + + # values of new segment + segVal.append(prepend + seg.bytes[rstart:rend].hex()) + prepend = "" + + # segment continues + if rel[-1] < seg.length: + segVal.append(seg.bytes[rel[-1]:].hex() + "~ ") + + emptyRelEnd = sum(fromEnd[seg] >= self.similarSegments.maxLen - globRel for globRel in relocate) + segVal.extend([""] * emptyRelEnd) + + valMtrx.append(segVal + [newPaddingRelative[seg]] + [relocatedCommons[seg]]) + + valTable = tabulate(valMtrx, showindex=True, tablefmt="orgtbl").splitlines() + + # # # # # # # # # # # # # # # # # # # # # # # # + # write statistics for padded range cutting + for num, (seg, rel) in enumerate(newPaddingRelative.items()): + from os.path import join, exists, basename, splitext + import csv + + scoFile = "padded-range-cutting.csv" + scoHeader = ["trace", "cluster label", "segment", "base offset", "length", + "common offset (CO)", + "this CO's freq", "max of all CO freq.s", # "CO/com off freq" = "common offset frequency" + "shorter than max len", "accepted CO", + "start/end", "true bound", "relocate", + "moved away", "moved to", "all moved", "off-by-one...", # ... from bound + "in range", + "sole...", # ... or more common than neighbor + "relative frequency", + "commonUnchangedOffbyone", # a Common that is unchanged and Offbyone + "uobofreq", + "vals" + ] + scoTrace = splitext(basename(comparator.specimens.pcapFileName))[0] + fn = join(reportFolder, scoFile) + writeheader = not exists(fn) + + # do not filter out unchanged bounds for "off-by-one", see dns tf03 + oboRel = rel.copy() + # TODO the following four lines create more problems than they solve. + # The adverse effect could be reduced by additional conditions adding a lot of complexity. + # if baseOffs[seg] in oboRel: + # oboRel.remove(baseOffs[seg]) + # if endOffs[seg] in oboRel: + # oboRel.remove(endOffs[seg]) + # COPY IN frequentBoundReframing + relocWmargin = RelocatePCA._offbyone(oboRel) + + # resolve adjacent most common starts/ends (use more common bound) + moCoReSt, moCoReEn = commonBounds.filterOutMoreCommonNeighbors(relocWmargin) + + # resolve adjacent most common starts/ends (use more common bound) + # commonUnchangedOffbyone = commonBounds.commonUnchangedOffByOne(seg, relocate) + commonUnchanged = commonBounds.commonUnchanged(seg, relocate) + + unchangedBounds = commonBounds.unchangedBounds(seg, relocate) + + + # Separately calculate frequency of off-by-one positions of unchanged bounds + # (copy of commonBounds.commonUnchangedOffByOne(seg) internals) + uoboFreq = { + ub + 1: + commonBounds.commonStarts[ub + 1] / sum(commonBounds.commonStarts.values()) + for ub in unchangedBounds if ub + 1 in commonBounds.commonStarts } + uoboFreq.update({ + ub - 1: + max(commonBounds.commonEnds[ub - 1] / sum(commonBounds.commonEnds.values()), + uoboFreq[ub - 1] if ub - 1 in uoboFreq else -1) + for ub in unchangedBounds if ub - 1 in commonBounds.commonEnds }) + # uoboFreq = {uobo: max(commonBounds.commonStarts[uobo] / sum(commonBounds.commonStarts.values()), + # commonBounds.commonEnds[uobo] / sum(commonBounds.commonEnds.values())) + # for uobo in unchangedOffbyone + # if (uobo in commonBounds.commonStarts) + # or (uobo in commonBounds.commonEnds)} + + + # True boundaries for the segments' relative positions + fe = [0] + comparator.fieldEndsPerMessage(seg.analyzer.message) + offs, nxtOffs = paddOffs[seg] + trueOffsets = [o - offs for o in fe if offs <= o <= nxtOffs] + + if writeheader: + with open(fn, "a") as segfile: + segcsv = csv.writer(segfile) + segcsv.writerow(scoHeader) + for com, cnt in commonBounds.commonStarts.most_common(): + with open(fn, "a") as segfile: + segcsv = csv.writer(segfile) + segcsv.writerow([ + scoTrace, self.similarSegments.fieldtype, num, baseOffs[seg], seg.length, + com, repr(com in relocatedCommons[seg]), + cnt, commonBounds.commonStarts.most_common(1)[0][1], + "({})".format(self.similarSegments.maxLen - com), + "start", repr(com in trueOffsets), + repr(com in rel), + repr(com in moveFrom[seg]), + repr(com in moveTo[seg]), + repr(com in commonBounds.allAreMoved), + repr(com in relocWmargin), + repr(com > min(rel)), + repr(com in moCoReSt), + commonBounds.commonStarts[com] / sum(commonBounds.commonStarts.values()), + repr(com in commonUnchanged), + uoboFreq[com] if com in uoboFreq else "", + valTable[num], + ]) + for com, cnt in commonBounds.commonEnds.most_common(): + with open(fn, "a") as segfile: + segcsv = csv.writer(segfile) + segcsv.writerow([ + scoTrace, self.similarSegments.fieldtype, num, baseOffs[seg], seg.length, + com, repr(com in relocatedCommons[seg]), + cnt, commonBounds.commonEnds.most_common(1)[0][1], + self.similarSegments.maxLen - com, + "end", repr(com in trueOffsets), + repr(com in rel), + repr(com in moveFrom[seg]), + repr(com in moveTo[seg]), + repr(com in commonBounds.allAreMoved), + repr(com in relocWmargin), + repr(com < max(rel)), + repr(com in moCoReEn), + commonBounds.commonEnds[com]/sum(commonBounds.commonEnds.values()), + repr(com in commonUnchanged), + uoboFreq[com] if com in uoboFreq else "", + valTable[num], + ]) + + # # segment boundary change comparison tables + # print() + # print(sc.similarSegments.fieldtype) + # print() + # print(tabulate(valMtrx, showindex=True, headers=["seg", "original"] + # + ["new"] * (len(valMtrx[0]) - 3) + # + ["newBounds", "cutsExt"] + # )) + # print() + # print(commonBounds.commonStarts.most_common()) + # print(commonBounds.commonEnds.most_common()) + tabmod.PRESERVE_WHITESPACE = False + + # collect new bounds + relocatedBoundaries = {seg: list() for seg in self.similarSegments.baseSegments} + for segment, newBounds in relocatedBounds.items(): + relocatedBoundaries[segment].extend([int(rb + segment.offset) for rb in newBounds]) + assert len(relocatedBoundaries[segment]) == len(set(relocatedBoundaries[segment])) + for segment, newCommons in relocatedCommons.items(): + relocatedBoundaries[segment].extend([int(rc + segment.offset) for rc in newCommons + if int(rc + segment.offset) not in relocatedBoundaries[segment]]) + if not len(relocatedBoundaries[segment]) == len(set(relocatedBoundaries[segment])): + IPython.embed() + assert len(relocatedBoundaries[segment]) == len(set(relocatedBoundaries[segment])), \ + repr(relocatedBoundaries[segment]) + "\n" + repr(set(relocatedBoundaries[segment])) + + return relocatedBoundaries + + + + + @staticmethod + def _offbyone(reloc: List[int]): + """ + :param reloc: A list of integer values. + :return: A list of integer values with their direct off by one neighbors. Sorted and deduplicated. + """ + return sorted(set([r - 1 for r in reloc] + [r for r in reloc] + [r + 1 for r in reloc])) + + + + + + class CommonBoundUtil(object): + + commonFreqThreshold = 0.4 + """Threshold for the frequency of a bound has at least to have + to be considered common in CommonBoundUtil#frequentBoundReframing().""" + uoboFreqThresh = 0.4 # 0.8 + """Threshold for the frequency + direct neighbors of unchanged bounds of a segment that are common bounds themselves + have to have at least to be considered in CommonBoundUtil#commonUnchangedOffByOne().""" + + def __init__(self, baseOffs: Dict[MessageSegment, int], endOffs: Dict[MessageSegment, int], + moveFrom: Dict[MessageSegment, List[int]], moveTo: Dict[MessageSegment, List[int]]): + """ + + :param baseOffs: The relative base offset for all segments to analyze. + :param endOffs: The relative ends positions for all segments to analyze. + :param moveFrom: Lists of boundaries that should be replaced by another + for the segments where this is applicable. + :param moveTo: Lists of boundaries that should replace others for the segments where this is applicable. + """ + from collections import Counter + + assert baseOffs.keys() == endOffs.keys() == moveFrom.keys() == moveTo.keys(), \ + "All segments need to have a base offset and end defined." + + self._baseOffs = baseOffs + self._endOffs = endOffs + self._moveFrom = moveFrom + self._moveTo = moveTo + + moveAtStart = {seg: baseOffs[seg] in mofro for seg, mofro in moveFrom.items()} # mofro[0] == baseOffs[seg] + moveAtEnd = {seg: endOffs[seg] in mofro for seg, mofro in moveFrom.items()} # mofro[-1] == endOffs[seg] + commonStarts = Counter(baseOffs.values()) + commonEnds = Counter(endOffs.values()) + + self.allAreMoved = \ + [ + globrel for globrel in commonStarts.keys() + if all(moveAtStart[seg] for seg, sstart in baseOffs.items() if globrel == sstart) + ] + [ + globrel for globrel in commonEnds.keys() + if all(moveAtEnd[seg] for seg, send in endOffs.items() if globrel == send) + ] + + # if all original bounds that constitute a commonStart/End are moved away in all segments of the + # type, remove from common bounds. + self.commonStarts = Counter(base if base not in moveFrom[seg] else moveTo[seg][moveFrom[seg].index(base)] + for seg, base in baseOffs.items()) + self.commonEnds = Counter(end if end not in moveFrom[seg] else moveTo[seg][moveFrom[seg].index(end)] + for seg, end in endOffs.items()) + + + def filterOutMoreCommonNeighbors(self, relocWmargin: List[int]) -> Tuple[List[int], List[int]]: + """ + resolve adjacent most common starts/ends (use more common bound) + + :param relocWmargin: + :return: More common starts and ends, than their neighboring common starts or ends. + """ + moCoReSt = [cS for cS, cnt in self.commonStarts.most_common() + if (cS + 1 not in set(self.commonStarts.keys()).difference(relocWmargin) + or cnt > self.commonStarts[cS + 1]) # cS+1 in relocWmargin or + and (cS - 1 not in set(self.commonStarts.keys()).difference(relocWmargin) + or cnt > self.commonStarts[cS - 1])] # cS-1 in relocWmargin or + moCoReEn = [cS for cS, cnt in self.commonEnds.most_common() + if (cS + 1 not in set(self.commonEnds.keys()).difference(relocWmargin) + or cnt > self.commonEnds[cS + 1]) # cS+1 in relocWmargin or + and (cS - 1 not in set(self.commonEnds.keys()).difference(relocWmargin) + or cnt > self.commonEnds[cS - 1])] # cS-1 in relocWmargin or + # TODO really annoying FP after this change (i. e., the 4 line ends commented out above) + # in ntp_SMIA-20111010_deduped-100 tf02 + # set(common[...].keys()).difference(relocWmargin) solves more common neighbor that is filtered by + # closeness to new bound (dns-new tf01/6 1,2,4,6,...) + + return moCoReSt, moCoReEn + + + def unchangedBounds(self, seg: MessageSegment, reloc: List[int]): + """ + inverse of moveAt* + + :param seg: + :param reloc: here we need the padding-global raw "relocate" without the added 0 and length + :return: + """ + unchangedBounds = [off for off in (self._baseOffs[seg], self._endOffs[seg]) + if off not in reloc] + return unchangedBounds + + + def commonUnchangedOffByOne(self, seg: MessageSegment, reloc: List[int]): + """ + Consider only common bounds that are off-by-one from the original bound + + :param seg: + :param reloc: here we need the padding-global raw "relocate" without the added 0 and length + :return: List of direct neighbors of the unchanged bounds of seg that are common bounds themselves and are + more frequent than uoboFreqThresh. + """ + unchangedBounds = self.unchangedBounds(seg, reloc) + + commonUnchangedOffbyone = [ + ub + 1 for ub in unchangedBounds + if ub + 1 in self.commonStarts + and self.commonStarts[ub + 1] > + RelocatePCA.CommonBoundUtil.uoboFreqThresh * sum(self.commonStarts.values()) + ] + [ + ub - 1 for ub in unchangedBounds + if ub - 1 in self.commonEnds + and self.commonEnds[ub - 1] > + RelocatePCA.CommonBoundUtil.uoboFreqThresh * sum(self.commonEnds.values()) + ] + + # unchangedOffbyone = [ub - 1 for ub in unchangedBounds] + [ub + 1 for ub in unchangedBounds] + # commonUnchangedOffbyone = [uobo for uobo in unchangedOffbyone + # if (uobo in self.commonStarts and + # self.commonStarts[uobo] > + # RelocatePCA.CommonBoundUtil.uoboFreqThresh * sum(self.commonStarts.values())) + # or (uobo in self.commonEnds and + # self.commonEnds[uobo] > + # RelocatePCA.CommonBoundUtil.uoboFreqThresh * sum(self.commonEnds.values()) + # )] + + # if any(cb for cb in (self.commonStarts, self.commonEnds) + # if uobo in cb and + # cb[uobo] > RelocatePCA.CommonBoundUtil.uoboFreqThresh * sum(cb.values()) + # ) + # ] + + return commonUnchangedOffbyone + + + def commonUnchanged(self, seg: MessageSegment, reloc: List[int]): + """ + Consider all common bounds between the start and the outermost (first/last) relocated bound for a segment. + + :param seg: + :param reloc: here we need the padding-global raw "relocate" without the added 0 and length + :return: + """ + commonUnchanged = \ + [cs for cs in self.commonStarts.keys() + if len(reloc) == 0 or cs < min(reloc) > self._baseOffs[seg]] \ + + [ce for ce in self.commonEnds.keys() + if len(reloc) == 0 or self._endOffs[seg] > max(reloc) < ce] + return [cu for cu in commonUnchanged + if (cu + 1 > self._endOffs[seg] + or ((cu+1 not in self.commonStarts or self.commonStarts[cu+1] < self.commonStarts[cu]) + and (cu+1 not in self.commonEnds or self.commonEnds[cu+1] < self.commonEnds[cu]))) + and (cu - 1 < self._baseOffs[seg] + or ((cu-1 not in self.commonStarts or self.commonStarts[cu-1] < self.commonStarts[cu]) + and (cu-1 not in self.commonEnds or self.commonEnds[cu-1] < self.commonEnds[cu]))) + ] + + + def frequentBoundReframing(self, newPaddingRelative: Dict[MessageSegment, List[int]], relocate: List[int]) \ + -> Dict[MessageSegment, Set[int]]: + """ + Frequent raw segment bound reframing: + Refine boundaries within the padded range of all segments in the cluster considering frequent common offsets + and end positions. + + :param newPaddingRelative: new relative boundaries, including start and end of a unchanged segment, if any. + :param relocate: here we need the padding-global raw "relocate" without the added 0 and length + :return: The proposed cut positions per segment based on frequent common raw segment bounds in the cluster. + """ + startsFreqSum = sum(self.commonStarts.values()) + endsFreqSum = sum(self.commonEnds.values()) + startsMoreFreqThanThresh = [com for com, cnt in self.commonStarts.items() + if cnt / startsFreqSum > type(self).commonFreqThreshold] + endsMoreFreqThanThresh = [com for com, cnt in self.commonEnds.items() + if cnt / endsFreqSum > type(self).commonFreqThreshold] + + cutsExt = dict() + # # # # # # # # # # # # # # # # # # # # # # # # + # padded range refinement + for seg, reloc in newPaddingRelative.items(): + # resolve adjacent most common starts/ends (use more common bound) + # commonUnchanged = self.commonUnchanged(seg, relocate) + commonUnchangedOffbyone = self.commonUnchangedOffByOne(seg, relocate) + + # Used to determine positions that are more than off-by-one from new bound, + # naturally includes: is not a move and not a relocation + relocWmargin = RelocatePCA._offbyone(reloc + commonUnchangedOffbyone) + moCoReSt, moCoReEn = self.filterOutMoreCommonNeighbors(relocWmargin) + + cutsExtStart = sorted(common for common in startsMoreFreqThanThresh + # conditions for reframing by common segment starts + if common > min(reloc) and ( + common not in relocWmargin and common in moCoReSt + or common in commonUnchangedOffbyone + ) + ) + cutsExtEnd = sorted(common for common in endsMoreFreqThanThresh + # conditions for reframing by common segment ends + if common < max(reloc) and ( + common not in relocWmargin and common in moCoReEn + or common in commonUnchangedOffbyone + ) + ) + cutsExt[seg] = set(cutsExtStart + cutsExtEnd) + return cutsExt + + + + + + @staticmethod + def segs4bound(segs2bounds: Dict[MessageSegment, List[int]], bound: int) -> List[MessageSegment]: + """ + Helper for iterating bounds in segment : list-of-bounds structure. + + :param segs2bounds: + :param bound: + :return: Yields the segment that is a key in segs2bounds, if it has bound in its value list. + Yields the same segment for each time bound is in its list. + """ + for seg, bounds in segs2bounds.items(): + for b in bounds.copy(): + if b == bound: + yield seg + + + @staticmethod + def removeSuperfluousBounds(newBounds: Dict[AbstractMessage, Dict[MessageSegment, List[int]]]): + """ + Iterate the given new bounds per message and remove superfluous ones: + * bound in segment offsets or nextOffsets + * de-duplicate bounds that are in scope of more than one segment + + **Changes are made at newBounds in place!** + + :param newBounds: Dict of all input messages, with a Dict mapping each segment to a list of its bounds. + :return: reference to the newBounds Dict + """ + from nemere.visualization.simplePrint import markSegmentInMessage + + for message, segsbounds in newBounds.items(): + # Below, by list(chain(...)) create a copy to iterate, so we can delete stuff in the original bound lists. + + # if bound in segment offsets or nextOffsets: + # remove bound (also resolves some off-by-one conflicts, giving precedence to moves) + for bound in list(chain(*segsbounds.values())): + if bound in (segA.offset for segA in segsbounds.keys()) \ + or bound in (segA.nextOffset for segA in segsbounds.keys()) \ + or 0 <= bound >= len(message.data): + for lookedupSeg in RelocatePCA.segs4bound(segsbounds, bound): + segsbounds[lookedupSeg].remove(bound) # calls remove as often as there is bound in the list + # while bound in segsbounds[lookedupSeg]: + + # if bound in scope of more than one segment: resolve + for bound in list(chain(*segsbounds.values())): + lookedupSegs = list(RelocatePCA.segs4bound(segsbounds, bound)) + if len(lookedupSegs) > 1: + segsNotHavingBoundInScope = [seg for seg in lookedupSegs if + seg.offset > bound or bound < seg.nextOffset] + if len(lookedupSegs) - len(segsNotHavingBoundInScope) == 1: + for segOutScope in segsNotHavingBoundInScope: + while bound in segsbounds[segOutScope]: + segsbounds[segOutScope].remove(bound) + elif len(lookedupSegs) - len(segsNotHavingBoundInScope) == 0: + # just leave one arbitrary reference to the bound. + for segOutScope in segsNotHavingBoundInScope[1:]: + while bound in segsbounds[segOutScope]: + segsbounds[segOutScope].remove(bound) + else: + # multiple segments truly have bound in scope + # TODO replace by exception - what can we do before failing? + print("Bound {} is in scope of multiple segments:".format(bound)) + for lookedupSeg in RelocatePCA.segs4bound(segsbounds, bound): + markSegmentInMessage(lookedupSeg) + print("Needs resolving!") + print() + # IPython.embed() + + # if bound in other segment is as close as one position away: resolve + flatSegsboundsCopy = [(seg, bound) for seg, bounds in segsbounds.items() for bound in bounds] + for seg, bound in flatSegsboundsCopy: + for neighbor in [bound - 1, bound + 1]: + if neighbor in (b for segA, bounds in segsbounds.items() if segA != seg for b in bounds): + # retain seg/neighborSeg that has bound in scope, delete other(s) + inScopeSeg = seg.offset <= bound < seg.nextOffset + neiSeg = [segA for segA, bounds in segsbounds.items() if segA != seg and neighbor in bounds] + inScopeNei = [segA.offset <= neighbor < segA.nextOffset for segA in neiSeg] + + if sum([inScopeSeg] + inScopeNei) <= 1: # just one neighbor remains, good + if not inScopeSeg: + while bound in segsbounds[seg]: + segsbounds[seg].remove(bound) + for segN, inScope in zip(neiSeg, inScopeNei): + if not inScope: + while neighbor in segsbounds[segN]: + segsbounds[segN].remove(neighbor) + continue + + # TODO replace by exception + print("There are off-by-one neighbors:") + for lookedupSeg in RelocatePCA.segs4bound(segsbounds, bound): + markSegmentInMessage(lookedupSeg) + print("bound: {} - neighbor: {}".format(bound, neighbor)) + print("Needs resolving!") + print() + IPython.embed() + + + return newBounds + + + @staticmethod + def refineSegmentedMessages(inferredSegmentedMessages: Iterable[Sequence[MessageSegment]], + newBounds: Dict[AbstractMessage, Dict[MessageSegment, List[int]]]): + """ + + :param inferredSegmentedMessages: List of messages split into inferred segments. + :param newBounds: Mapping of messages to the hitherto segments and + the new bounds in each segment's scope, if any. + :return: List of messages split into the refined segments, including unchanged segments to always form valid + segment representations of all messages. + """ + margin = 1 + refinedSegmentedMessages = list() # type: List[List[MessageSegment]] + # iterate sorted message segments + for msgsegs in inferredSegmentedMessages: + + # TODO happens during RelocatePCA.refineSegments(charPass1, refinementDC) for dns sigma 2.4 + if len(msgsegs) < 1: + print("Empty message. Investigate!") + + continue + + msg = msgsegs[0].message + if msg not in newBounds: + refinedSegmentedMessages.append(msgsegs) + continue + + # sort new bounds and ensure they are in within the message (offset >= 0 and <= len) + newMsgBounds = sorted({nb for nb in chain(*newBounds[msg].values()) if 0 <= nb <= len(msg.data)}) + lastBound = 0 + currentBoundID = 0 + refinedSegmentedMessages.append(list()) + for segInf in sorted(msgsegs, key=lambda x: x.offset): + # for condition tracing + ifs = list() + + # finish if lastBound reached message end (is hit in case of the msgsegs[-1] is replaced with + # a segment starting in scope of msgsegs[-2] + if len(refinedSegmentedMessages[-1]) > 0 and \ + lastBound == refinedSegmentedMessages[-1][-1].nextOffset == len(msg.data): + break + + # add skipped bytes to next segment if next_segment.nextOffset < bound + # create segment from last bound to min(next_segment.nextOffset, bound); + if lastBound < segInf.offset: + assert len(newMsgBounds) > 0 # should never happen otherwise + + if currentBoundID < len(newMsgBounds) and newMsgBounds[ + currentBoundID] <= segInf.nextOffset + margin: + nextOffset = newMsgBounds[currentBoundID] + currentBoundID += 1 + else: + nextOffset = segInf.nextOffset + + assert not len(refinedSegmentedMessages[-1]) > 0 or \ + refinedSegmentedMessages[-1][-1].nextOffset == lastBound, \ + "Segment sequence error: add skipped bytes" + ifs.append("skipped") + + refinedSegmentedMessages[-1].append( + MessageSegment(segInf.analyzer, lastBound, nextOffset - lastBound) + ) + lastBound = refinedSegmentedMessages[-1][-1].nextOffset + if nextOffset >= segInf.nextOffset: + continue + + # if no bounds in scope of segment: add old segment and continue + if lastBound == segInf.offset and ( + len(newMsgBounds) == 0 or currentBoundID >= len(newMsgBounds) + or newMsgBounds[currentBoundID] > segInf.nextOffset + margin): + assert not len(refinedSegmentedMessages[-1]) > 0 \ + or refinedSegmentedMessages[-1][-1].nextOffset == segInf.offset, \ + "Segment sequence error: no bounds in scope of segment" + ifs.append("no bounds") + + refinedSegmentedMessages[-1].append(segInf) + lastBound = segInf.nextOffset + continue + # if bound in scope of segment: + # create segment from segment offset or last bound (which is larger) to bound + for bound in [nmb for nmb in newMsgBounds[currentBoundID:] if nmb < segInf.nextOffset]: + newOffset = max(segInf.offset, lastBound) + + assert not len(refinedSegmentedMessages[-1]) > 0 or \ + refinedSegmentedMessages[-1][-1].nextOffset == newOffset, \ + "Segment sequence error: bound in scope of segment" + ifs.append("bounds") + + refinedSegmentedMessages[-1].append( + MessageSegment(segInf.analyzer, newOffset, bound - newOffset) + ) + lastBound = newMsgBounds[currentBoundID] + currentBoundID += 1 + + # no further bounds (at least until segment end) + if segInf.nextOffset - lastBound <= margin and len(msg.data) - segInf.nextOffset > 0: + continue + + # if no further bounds for message or bound > segment next offset+1 and resulting segment longer than 1: + # create segment from last bound to inferred segment's next offset; + if currentBoundID >= len(newMsgBounds) or ( + newMsgBounds[currentBoundID] > segInf.nextOffset + 1): + + assert not len(refinedSegmentedMessages[-1]) > 0 or \ + refinedSegmentedMessages[-1][-1].nextOffset == lastBound, \ + "Segment sequence error: if no further bounds" + # try: + # print(ifs) + # MessageSegment(segInf.analyzer, lastBound, segInf.nextOffset - lastBound) + # except: + # IPython.embed() + ifs.append("no further bounds") + + refinedSegmentedMessages[-1].append( + MessageSegment(segInf.analyzer, lastBound, segInf.nextOffset - lastBound) + ) + lastBound = refinedSegmentedMessages[-1][-1].nextOffset + # do not advance currentBoundID bound (in case there is another bound + # so we need to consider it in scope of a later segment) + + # bound == next offset+1 and resulting segment longer than 1: create segment from last bound to bound + elif newMsgBounds[currentBoundID] == segInf.nextOffset + 1 and newMsgBounds[ + currentBoundID] - lastBound > 1: + + assert not len(refinedSegmentedMessages[-1]) > 0 or \ + refinedSegmentedMessages[-1][-1].nextOffset == lastBound, \ + "Segment sequence error: bound == next offset+1" + ifs.append("bound == next offset+1") + + refinedSegmentedMessages[-1].append( + MessageSegment(segInf.analyzer, lastBound, newMsgBounds[currentBoundID] - lastBound) + ) + lastBound = refinedSegmentedMessages[-1][-1].nextOffset + currentBoundID += 1 + + # final assertion of complete representation of message by the new segments + msgbytes = b"".join([seg.bytes for seg in refinedSegmentedMessages[-1]]) + if not msgbytes == msg.data: + print(msg.data.hex()) + print(msgbytes.hex()) + print(msgsegs) + IPython.embed() + assert msgbytes == msg.data, "segment sequence does not match message bytes" + return refinedSegmentedMessages + + + @staticmethod + def refineSegments(inferredSegmentedMessages: Iterable[Sequence[MessageSegment]], dc: DistanceCalculator, + initialKneedleSensitivity: float=10.0, subclusterKneedleSensitivity: float=5.0, + comparator: MessageComparator = None, reportFolder: str = None, + collectEvaluationData: Union[List['RelocatePCA'], bool]=False, retClusterer=False, + littleEndian=False, trace: str=None) \ + -> Iterable[Sequence[MessageSegment]]: + """ + Main method to conduct PCA refinement for a set of segments. + + :param inferredSegmentedMessages: List of messages split into inferred segments. + :param subclusterKneedleSensitivity: sensitivity of the initial clustering autodetection. + :param initialKneedleSensitivity: use reduced sensitivity (from 10 to 5) + due to large dissimilarities in clusters (TODO more evaluation!). + :param dc: Distance calculator representing the segments to be analyzed and refined. + :param comparator: For evaluation: Encapsulated true field bounds to compare results to. + :param reportFolder: For evaluation: Destination path to write results and statistics to. + :param collectEvaluationData: For evaluation: Collect the intermediate (sub-)clusters generated during + the analysis of the segments. + :param retClusterer: For evaluation: If a list is provided, any used clusterer instances are appended to it. + + :return: List of segments grouped by the message they are from. + :raise ClusterAutoconfException: In case no clustering can be performed due to failed parameter autodetection. + """ + # include only segments that are not just 0 and longer than 1 byte + relevantSegments = [rs for rs in dc.rawSegments if set(rs.values) != {0} and len(rs.values) > 1] + try: + clusterer = DBSCANadjepsClusterer(dc, relevantSegments, S=initialKneedleSensitivity) + except ClusterAutoconfException as e: + logger = logging.getLogger(__name__) + logger.warning(repr(e) + ". Falling back to OPTICS clusterer.") + clusterer = OPTICSsegmentClusterer(dc, relevantSegments) + noise, *clusters = clusterer.clusterSimilarSegments(False) + print("Initial clustering:", + clusterer, "- cluster sizes:", [len(s) for s in clusters], "- noise:", len(noise)) + if isinstance(retClusterer, List): + retClusterer.append(clusterer) + + # if there remains only noise, ignore clustering + if len(clusters) == 0: + print("No refinement possible: clustering returns only noise.") + return inferredSegmentedMessages + + newBounds = dict() # type: Dict[AbstractMessage, Dict[MessageSegment, List[int]]] + for cLabel, segments in enumerate(clusters): + # Generate suitable FieldTypeContext objects from the sub-clusters + ftContext, suitedForAnalysis = RelocatePCA._preFilter(segments, "tf{:02d}".format(cLabel)) + # if basic requirements not met, exclude from PCA analysis + if not suitedForAnalysis: + # # # # # # # # # # # # # # # # # # # # # # # # + # stop further recursion and ignore this cluster + print("Ignore subcluster {} due to pre-filter.".format(ftContext.fieldtype)) + # performing "common bound" refinement here does not improve the results: + # * is triggered in only few cases + # * there are just a handfull of segments that may benefit + # * there is some probability to produce more FP than TP. + continue + # # # # # # # # # # # # # # # # # # # # # # # # + + print("Analyzing cluster", ftContext.fieldtype) + try: + cluster = RelocatePCA(ftContext, littleEndian=littleEndian) + # # # # # # # # # # # # # # # # # # # # # # # # + # start recursion + collectedSubclusters = cluster.getSubclusters(dc, subclusterKneedleSensitivity, reportFolder, trace) + # # # # # # # # # # # # # # # # # # # # # # # # + except numpy.linalg.LinAlgError: + print("Ignore cluster due to eigenvalues did not converge") + print(repr(ftContext.baseSegments)) + continue + + if isinstance(collectEvaluationData, list): + collectEvaluationData.extend(collectedSubclusters) + + # relocateBoundaries for all collectedSubclusters + for sc in collectedSubclusters: + if isinstance(sc, RelocatePCA): + clusterBounds = sc.relocateBoundaries(comparator, reportFolder) + else: + # => this is never executed! + # performing "common bound" refinement here does not improve the results: + # * is triggered in only few cases + # * there are just a handfull of segments that may benefit + # * there is some probability to produce more FP than TP. + # so, no such subclusters are returned by getSubclusters and on "if not suitedForAnalysis" + # => this is never executed! + baseOffs = {bs: sc.baseOffset(bs) for bs in sc.baseSegments} + endOffs = {bs: sc.baseOffset(bs) + bs.length + for bs in sc.baseSegments} + commonBounds = RelocatePCA.CommonBoundUtil( + baseOffs, endOffs, dict.fromkeys(baseOffs.keys(), []), dict.fromkeys(baseOffs.keys(), [])) + allOffs = {bs: [baseOffs[bs], endOffs[bs]] for bs in sc.baseSegments} + relocatedCommons = commonBounds.frequentBoundReframing(allOffs, []) + relocatedBoundaries = {seg: list() for seg in sc.baseSegments} + for segment, newCommons in relocatedCommons.items(): + relocatedBoundaries[segment].extend([int(rc + segment.offset) for rc in newCommons + if int(rc + segment.offset) not in relocatedBoundaries[segment]]) + if not len(relocatedBoundaries[segment]) == len(set(relocatedBoundaries[segment])): + IPython.embed() + assert len(relocatedBoundaries[segment]) == len(set(relocatedBoundaries[segment])), \ + repr(relocatedBoundaries[segment]) + "\n" + repr(set(relocatedBoundaries[segment])) + clusterBounds = relocatedBoundaries + + for segment, bounds in clusterBounds.items(): + if segment.message not in newBounds: + newBounds[segment.message] = dict() + elif segment in newBounds[segment.message] and newBounds[segment.message][segment] != bounds: + # TODO replace by exception or solution + print("\nSame segment was PCA-refined multiple times. Needs resolving. Segment is:\n", + segment, "Concurrent bounds are:\n", + newBounds[segment.message][segment], "and\n", + bounds) + print() + IPython.embed() + newBounds[segment.message][segment] = bounds + + # remove from newBounds, in place + RelocatePCA.removeSuperfluousBounds(newBounds) + + return RelocatePCA.refineSegmentedMessages(inferredSegmentedMessages, newBounds) + + + +class BlendZeroSlices(MessageModifier): + """ + Generate zero-bounded segments from bytes of given message (i.e. `self.segments[0].message.data`). + Blend these segments with the given segments (i.e. `self.segments`), with zero bounds have precedence. + """ + def __init__(self, segments: List[MessageSegment]): + """ + :param segments: The segments of one message in offset order + """ + super().__init__(segments) + if BlendZeroSlices._debug: + self.zeroSegments = list() + + + def blend(self, ignoreSingleZeros=False, littleEndian=False): + """ + :param ignoreSingleZeros: ignore single zero bytes (except after char sequences), + or if False, generate segments only from zero sequences of at least two bytes. + :return: List of segments blended together from the segments of the object as basis and the zero, + together forming the message. + """ + zeroBounds = list() + mdata = self.segments[0].message.data # type: bytes + + # all transitions from 0 to any and vice versa + message start and end + for bi, bv in enumerate(mdata[1:], 1): + if bv == 0 and mdata[bi-1] != 0 \ + or bv != 0 and mdata[bi-1] == 0: + zeroBounds.append(bi) + zeroBounds = [0] + zeroBounds + [len(mdata)] + + # remove boundaries of short zero sequences to merge to previous or next non-zero segment + minCharLen = 6 # 6 + zeroBounds = sorted(set(zeroBounds)) + zBCopy = zeroBounds.copy() + for zi, zb in enumerate(zBCopy[:-1]): # omit message end bound + if mdata[zb] == 0: + # next zero bound + nzb = zBCopy[zi + 1] + + # TODO should that be done after the char `if`? + # if the next bound (nzb) is only one byte ahead and we should ignore single zeros, remove both bounds. + if ignoreSingleZeros and zb + 1 == nzb: + # if the current bound is not the message start + if zb > 0: + zeroBounds.remove(zb) + # if the next bound is not the message end + if nzb < len(mdata): + zeroBounds.remove(nzb) + continue + + # ... there are only one or two zeros in a row ... + if zb + 2 >= nzb: + # if chars are preceding or its little endian, add zero to previous + if isExtendedCharSeq(mdata[max(0,zb-minCharLen):zb], minLen=minCharLen) or littleEndian: # \ + # or zb > 0 and MessageAnalyzer.nibblesFromBytes(mdata[zb-1:zb])[1] == 0: + # # or the least significant nibble of the preceding byte is zero + if zb in zeroBounds: + zeroBounds.remove(zb) + # otherwise to next + elif nzb < len(mdata): + if nzb in zeroBounds: + zeroBounds.remove(nzb) + + if BlendZeroSlices._debug: + # generate zero-bounded segments from bounds + ms = list() + for segStart, segEnd in zip(zeroBounds[:-1], zeroBounds[1:]): + ms.append(MessageSegment(self.segments[0].analyzer, segStart, segEnd - segStart)) + self.zeroSegments.append(ms) + + + # integrate original inferred bounds with zero segments, zero bounds have precedence + combinedMsg = list() # type: List[MessageSegment] + infMarginOffsets = [infs.nextOffset for infs in self.segments + if infs.nextOffset - 1 not in zeroBounds and infs.nextOffset + 1 not in zeroBounds] + remZeroBounds = [zb for zb in zeroBounds if zb not in infMarginOffsets] + combinedBounds = sorted(infMarginOffsets + remZeroBounds) + if combinedBounds[0] != 0: + combinedBounds = [0] + combinedBounds + if combinedBounds[-1] != len(mdata): + combinedBounds += [len(mdata)] + startEndMap = {(seg.offset, seg.nextOffset) : seg for seg in self.segments} + analyzer = self.segments[0].analyzer + for bS, bE in zip(combinedBounds[:-1], combinedBounds[1:]): + # unchanged + if (bS, bE) in startEndMap: + combinedMsg.append(startEndMap[(bS, bE)]) + else: + nseg = MessageSegment(analyzer, bS, bE-bS) + combinedMsg.append(nseg) + # final assertion of complete representation of message by the new segments + msgbytes = b"".join([seg.bytes for seg in combinedMsg]) + assert msgbytes == mdata, "segment sequence does not match message bytes" + + return combinedMsg + + + + +class CropChars(MessageModifier): + """ + Identify probable char sequences using `inference.fieldTypes.FieldTypeRecognizer.charsInMessage`, which in turn uses + `inference.segmentHandler.isExtendedCharSeq`, to find these. + Generates segments from those char sequences and the given segments of the message (see constructor). + """ + + def split(self): + """ + Split a message into char segments and blend them with the segments given in the object instance's segment + variable. + + :return: List of char and non-char segments for the message + """ + from nemere.inference.fieldTypeLibrary import BinaryProtocolsFieldTypeRecognizer + + ftrecog = BinaryProtocolsFieldTypeRecognizer(self.segments[0].analyzer) + # RecognizedFields or type char using isExtendedCharSeq + charsRecog = [cr.toSegment() for cr in ftrecog.charsInMessage()] + + blendedSegments = self.blend(charsRecog) + + # final assertion of complete representation of message by the new segments + newbytes = b"".join([seg.bytes for seg in blendedSegments]) + msgbytes = b"".join([seg.bytes for seg in self.segments]) + assert msgbytes == newbytes, "segment sequence does not match message bytes:\nshould: " \ + + msgbytes.hex() + "\nis: " + newbytes.hex() + + return blendedSegments + + + def blend(self, mixin: List[MessageSegment]): + """ + :param mixin: list of segments to blend into the segments of the object + :return: List of segments blended together from the segments of the object as basis and the mixin, + together forming the message. + """ + stack = sorted(mixin, key=lambda x: -x.offset) + newSegSeq = list() # type: List[MessageSegment] + # shifting from stack (from mixin) and inserting from self.segments (msg) to newSegSeq + # v---------------, + # newSegSeq <- msg stack/mixin + # |:::| | | | | + # |---| |:::| | | + # |---| |:::| |---| + # |:::| |:::| |---| + # +---+ +---+ +---+ + for seg in self.segments: + while len(stack) > 0 and stack[-1].offset < seg.nextOffset: # stack peek + lastOffset = newSegSeq[-1].nextOffset if len(newSegSeq) > 0 else 0 + if stack[-1].offset > lastOffset: # prepend/fill gap to char + newSegSeq.append(MessageSegment(seg.analyzer, lastOffset, stack[-1].offset - lastOffset)) + newSegSeq.append(stack.pop()) # append char + lastOffset = newSegSeq[-1].nextOffset if len(newSegSeq) > 0 else 0 + if lastOffset == seg.offset: # append unchanged segment + newSegSeq.append(seg) + elif lastOffset < seg.nextOffset: # append gap to next segment + newSegSeq.append(MessageSegment(seg.analyzer, lastOffset, seg.nextOffset - lastOffset)) + # else nothing to do since no bytes/segments left + return newSegSeq + + + @staticmethod + def isOverlapping(segA: MessageSegment, segB: MessageSegment) -> bool: + """ + Determines whether the given segmentS overlap. + + :param segA: The segment to check against. + :param segB: The segment to check against. + :return: Is overlapping or not. + """ + return isOverlapping(segA, segB) + diff --git a/src/nemere/inference/segmentHandler.py b/src/nemere/inference/segmentHandler.py index 93384ce..e6fbe26 100644 --- a/src/nemere/inference/segmentHandler.py +++ b/src/nemere/inference/segmentHandler.py @@ -1,6 +1,8 @@ """ Batch handling of multiple segments. """ +from itertools import chain + import numpy import copy from typing import List, Dict, Tuple, Union, Sequence, TypeVar, Iterable @@ -10,7 +12,7 @@ from nemere.utils.loader import BaseLoader from nemere.inference.segments import MessageSegment, HelperSegment, TypedSegment, AbstractSegment from nemere.inference.analyzers import MessageAnalyzer, Value -from nemere.inference.templates import AbstractClusterer, TypedTemplate +from nemere.inference.templates import AbstractClusterer, TypedTemplate, DelegatingDC, MemmapDC def segmentMeans(segmentsPerMsg: List[List[MessageSegment]]): @@ -126,6 +128,10 @@ def segmentsFromLabels(analyzer, labels) -> Tuple[TypedSegment]: def segmentsFromSymbols(symbols: List[Symbol]): + """ + :param symbols: List of Netzob symbols. + :return: List of messages represented by a list of segments each. + """ msgflds = [(msg,flds) for s in symbols for msg,flds in s.getMessageCells().items()] segmentedMessages = [] for msg,flds in msgflds: @@ -319,9 +325,140 @@ def refinements(segmentsPerMsg: List[List[MessageSegment]], **kwargs) -> List[Li :param segmentsPerMsg: a list of one list of segments per message. :return: refined segments in list per message """ - return nemetylRefinements(segmentsPerMsg) + return zerocharPCAmocoSFrefinements(segmentsPerMsg, **kwargs) + + +def pcaMocoRefinements(segmentsPerMsg: List[List[MessageSegment]], **kwargs) -> List[List[MessageSegment]]: + """ + Refine the segmentation using specific improvements for the feature: + Inflections of gauss-filtered bit-congruence deltas. + + :param segmentsPerMsg: a list of one list of segments per message. + :return: refined segments in list per message + """ + from itertools import chain + from nemere.inference.formatRefinement import RelocatePCA, CropDistinct + + print("Refine segmentation (2xPCA, moco refinements)...") + + if "collectedSubclusters" in kwargs: + kwargs["collectEvaluationData"] = kwargs["collectedSubclusters"] + del kwargs["collectedSubclusters"] + + # charPass1 = charRefinements(segmentsPerMsg) + # refinedSegmentedMessages = RelocatePCA.refineSegments(charPass1, dc) + + # pcaRound = charRefinements(segmentsPerMsg) + pcaRound = MessageAnalyzer.convertAnalyzers(segmentsPerMsg, Value) + for _ in range(2): + refinementDC = DelegatingDC(list(chain.from_iterable(pcaRound))) + pcaRound = RelocatePCA.refineSegments(pcaRound, refinementDC, **kwargs) + + # additionally perform most common values refinement + moco = CropDistinct.countCommonValues(pcaRound) + print([m.hex() for m in moco]) + refinedSM = list() + for msg in pcaRound: + croppedMsg = CropDistinct(msg, moco).split() + refinedSM.append(croppedMsg) + + charPass2 = charRefinements(refinedSM) + + return charPass2 + + +def pcaMocoDoubleCharRefinements(segmentsPerMsg: Sequence[Sequence[MessageSegment]], **kwargs + ) -> List[List[MessageSegment]]: + """ + perform the char refinements after 2 passes of PCA and moco. + TODO Is this better than pcaMocoRefinements + + :param segmentsPerMsg: + :param kwargs: + :return: + """ + import nemere.inference.formatRefinement as refine + + print("Refine segmentation (2xPCA, moco refinements)...") + + if "collectedSubclusters" in kwargs: + kwargs["collectEvaluationData"] = kwargs["collectedSubclusters"] + del kwargs["collectedSubclusters"] + + valueSegsPerMsg = MessageAnalyzer.convertAnalyzers(segmentsPerMsg, Value) + + pcaRound = charRefinements(valueSegsPerMsg) + for _ in range(2): + refinementDC = DelegatingDC(list(chain.from_iterable(pcaRound))) + pcaRound = refine.RelocatePCA.refineSegments(pcaRound, refinementDC, **kwargs) + + # additionally perform most common values refinement + moco = refine.CropDistinct.countCommonValues(pcaRound) + print([m.hex() for m in moco]) + refinedSM = list() + for msg in pcaRound: + croppedMsg = refine.CropDistinct(msg, moco).split() + refinedSM.append(croppedMsg) + return charRefinements(refinedSM) + + +def pcaRefinements(segmentsPerMsg: Sequence[Sequence[MessageSegment]], **kwargs) -> List[List[MessageSegment]]: + """ + Refine the segmentation using specific improvements for the feature: + Inflections of gauss-filtered bit-congruence deltas. + + :param segmentsPerMsg: a list of one list of segments per message. + :param kwargs: is forwarded to RelocatePCA.refineSegments + :return: refined segments in list per message + """ + from itertools import chain + from nemere.inference.formatRefinement import RelocatePCA + + if "collectedSubclusters" in kwargs: + kwargs["collectEvaluationData"] = kwargs["collectedSubclusters"] + del kwargs["collectedSubclusters"] + + print("Refine segmentation (PCA refinements)...") + + valueSegsPerMsg = MessageAnalyzer.convertAnalyzers(segmentsPerMsg, Value) + + # char refinement before and after + charPass1 = charRefinements(valueSegsPerMsg) + refinementDC = DelegatingDC(list(chain.from_iterable(charPass1))) + refinedSM = RelocatePCA.refineSegments(charPass1, refinementDC, **kwargs) + charPass2 = charRefinements(refinedSM) + + return charPass2 + + +def pcaPcaRefinements(segmentsPerMsg: Sequence[Sequence[MessageSegment]], **kwargs) -> List[List[MessageSegment]]: + """ + Refine the segmentation using specific improvements for the feature: + Inflections of gauss-filtered bit-congruence deltas. + + :param segmentsPerMsg: a list of one list of segments per message. + :param kwargs: is forwarded to RelocatePCA.refineSegments + :return: refined segments in list per message + """ + from itertools import chain + from nemere.inference.formatRefinement import RelocatePCA + + if "collectedSubclusters" in kwargs: + kwargs["collectEvaluationData"] = kwargs["collectedSubclusters"] + del kwargs["collectedSubclusters"] + print("Refine segmentation (2xPCA refinements)...") + # char refinement before and after + # pcaRound = charRefinements(segmentsPerMsg) + pcaRound = MessageAnalyzer.convertAnalyzers(segmentsPerMsg, Value) + + for _ in range(2): + refinementDC = DelegatingDC(list(chain.from_iterable(pcaRound))) + pcaRound = RelocatePCA.refineSegments(pcaRound, refinementDC, **kwargs) + refinedSM = charRefinements(pcaRound) + + return refinedSM def baseRefinements(segmentsPerMsg: Sequence[Sequence[MessageSegment]]) -> List[List[MessageSegment]]: @@ -353,6 +490,25 @@ def baseRefinements(segmentsPerMsg: Sequence[Sequence[MessageSegment]]) -> List[ return newstuff +def zeroBaseRefinements(segmentsPerMsg: Sequence[Sequence[MessageSegment]], **kwargs) -> List[List[MessageSegment]]: + import nemere.inference.formatRefinement as refine + + if "collectedSubclusters" in kwargs: + kwargs["collectEvaluationData"] = kwargs["collectedSubclusters"] + del kwargs["collectedSubclusters"] + littleEndian = "littleEndian" in kwargs and kwargs["littleEndian"] == True + + print("Refine segmentation (zero-slices refinements)...") + + valueSegsPerMsg = MessageAnalyzer.convertAnalyzers(segmentsPerMsg, Value) + + combinedRefinedSegments = [refine.BlendZeroSlices(list(msg)).blend(littleEndian=littleEndian) + for msg in valueSegsPerMsg] + return baseRefinements(combinedRefinedSegments) + + +def zeroPCARefinements(segmentsPerMsg: Sequence[Sequence[MessageSegment]]) -> List[List[MessageSegment]]: + return pcaRefinements(zeroBaseRefinements(segmentsPerMsg)) def nemetylRefinements(segmentsPerMsg: Sequence[Sequence[MessageSegment]]) -> List[List[MessageSegment]]: @@ -446,6 +602,151 @@ def originalRefinements(segmentsPerMsg: Sequence[Sequence[MessageSegment]]) -> L return refinedPerMsg +def zerocharPCAmocoRefinements(segmentsPerMsg: Sequence[Sequence[MessageSegment]], **kwargs + ) -> List[List[MessageSegment]]: + import nemere.inference.formatRefinement as refine + + if "collectedSubclusters" in kwargs: + kwargs["collectEvaluationData"] = kwargs["collectedSubclusters"] + del kwargs["collectedSubclusters"] + littleEndian = "littleEndian" in kwargs and kwargs["littleEndian"] == True + + print("Refine segmentation (zero-slices, char, 2xPCA, moco refinements)...") + + valueSegsPerMsg = MessageAnalyzer.convertAnalyzers(segmentsPerMsg, Value) + + # .blend(True) to omit single zeros: in most cases (dns, nbns, smb), quality deteriorates. + zeroSlicedMessages = [refine.BlendZeroSlices(list(msg)).blend(False, littleEndian) for msg in valueSegsPerMsg] + pcaRound = [refine.CropChars(segs).split() for segs in zeroSlicedMessages] + for _ in range(2): + refinementDC = MemmapDC(list(chain.from_iterable(pcaRound))) + pcaRound = refine.RelocatePCA.refineSegments(pcaRound, refinementDC, **kwargs) + + # additionally perform most common values refinement + moco = refine.CropDistinct.countCommonValues(pcaRound) + print([m.hex() for m in moco]) + refinedSM = list() + for msg in pcaRound: + croppedMsg = refine.CropDistinct(msg, moco).split() + refinedSM.append(croppedMsg) + + # decreases FMS slightly for dhcp and smb + # refinedSM = charRefinements(refinedSM) + + return refinedSM + + # now needs recalculation of segment distances in dc + + +def pcaMocoSFrefinements(segmentsPerMsg: Sequence[Sequence[MessageSegment]], **kwargs + ) -> List[List[MessageSegment]]: + """ + Refine the segmentation according to the (unpublished) NEMEPCA paper method using specific improvements + for the feature: Inflections of gauss-filtered bit-congruence deltas. + * PCA + * CropDistinct + * SplitFixedv2 + + :param segmentsPerMsg: a list of one list of segments per message. + :param kwargs: For evaluation: + * comparator: Encapsulated true field bounds to compare results to. + * reportFolder: For evaluation: Destination path to write results and statistics to. + * collectedSubclusters: For evaluation: Collect the intermediate (sub-)clusters generated during + the analysis of the segments. + * and others accepted by refine.RelocatePCA.refineSegments() + :return: refined segments in list per message + + :raises ClusterAutoconfException: In case no clustering can be performed due to failed parameter autodetection. + """ + import nemere.inference.formatRefinement as refine + + if "collectedSubclusters" in kwargs: + kwargs["collectEvaluationData"] = kwargs["collectedSubclusters"] + del kwargs["collectedSubclusters"] + + print("Refine segmentation (PCA, CropDistinct, SplitFixedv2 refinements)...") + + pcaRound = MessageAnalyzer.convertAnalyzers(segmentsPerMsg, Value) + for _ in range(1): + refinementDC = MemmapDC(list(chain.from_iterable(pcaRound))) + pcaRound = refine.RelocatePCA.refineSegments(pcaRound, refinementDC, **kwargs) + + # additionally perform most common values refinement + moco = refine.CropDistinct.countCommonValues(pcaRound) + print([m.hex() for m in moco]) + refinedSM = list() + for msg in pcaRound: + croppedMsg = refine.CropDistinct(msg, moco).split() + # and: first segments that are longer than 2 and at least two bytes are less than \x10 + # (that have the first two bytes being non-zero) + if croppedMsg[0].length > 2 and sum(b < 0x10 for b in croppedMsg[0].bytes) >= 2: + splitfixed = refine.SplitFixed(croppedMsg).split(0, 1) + refinedSM.append(splitfixed) + else: + refinedSM.append(croppedMsg) + return refinedSM + +def zerocharPCAmocoSFrefinements(segmentsPerMsg: Sequence[Sequence[MessageSegment]], **kwargs + ) -> List[List[MessageSegment]]: + """ + Refine the segmentation according to the (unpublished) NEMEPCA paper method using specific improvements + for the feature: Inflections of gauss-filtered bit-congruence deltas. + * NEMESYS + * NullBytes + * CropChars + * PCA + * CropDistinct + * SplitFixedv2 + + :param segmentsPerMsg: a list of one list of segments per message. + :param kwargs: For evaluation: + * comparator: Encapsulated true field bounds to compare results to. + * reportFolder: For evaluation: Destination path to write results and statistics to. + * collectedSubclusters: For evaluation: Collect the intermediate (sub-)clusters generated during + the analysis of the segments. + * and others accepted by refine.RelocatePCA.refineSegments() + :return: refined segments in list per message + + :raises ClusterAutoconfException: In case no clustering can be performed due to failed parameter autodetection. + """ + import nemere.inference.formatRefinement as refine + + if "collectedSubclusters" in kwargs: + kwargs["collectEvaluationData"] = kwargs["collectedSubclusters"] + del kwargs["collectedSubclusters"] + littleEndian = "littleEndian" in kwargs and kwargs["littleEndian"] == True + + print("Refine segmentation (NullBytes, CropChars)...") + + valueSegsPerMsg = MessageAnalyzer.convertAnalyzers(segmentsPerMsg, Value) + + # .blend(True) to omit single zeros: in most cases (dns, nbns, smb), quality deteriorates. + zeroSlicedMessages = [refine.BlendZeroSlices(list(msg)).blend(False, littleEndian) for msg in valueSegsPerMsg] + pcaRound = [refine.CropChars(segs).split() for segs in zeroSlicedMessages] + return pcaMocoSFrefinements(pcaRound, **kwargs) + +def entropymergeZeroCharPCAmocoSFrefinements(segmentsPerMsg: Sequence[Sequence[MessageSegment]], **kwargs + ) -> List[List[MessageSegment]]: + import nemere.inference.formatRefinement as refine + + print("Refine segmentation (Merge consecutive random segments)...") + valueSegsPerMsg = MessageAnalyzer.convertAnalyzers(segmentsPerMsg, Value) + entropyMergedMessages = None + newMergedMessages = valueSegsPerMsg + # repeat merging as long as there are segments to merge left that match the conditions + while newMergedMessages != entropyMergedMessages: + entropyMergedMessages = newMergedMessages + newMergedMessages = [refine.EntropyMerger(list(msg)).merge() for msg in entropyMergedMessages] + refinedMessages = zerocharPCAmocoSFrefinements(newMergedMessages, **kwargs) + return refinedMessages + # return [refine.EntropyMerger(list(msg)).merge() for msg in refinedMessages] + + +# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # +# # # # # # # # # # # # # # End: Refinements # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # +# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # + + T = TypeVar('T') def matrixFromTpairs(distances: Iterable[Tuple[T,T,float]], segmentOrder: Sequence[T], identity=0, incomparable=1, simtrx: numpy.ndarray=None) -> numpy.ndarray: @@ -619,6 +920,12 @@ def filterSegments(segments: Iterable[MessageSegment]) -> List[MessageSegment]: return filteredSegments def isExtendedCharSeq(values: bytes, meanCorridor=(50, 115), minLen=6): + """ + :param values: Byte values to test for being a character sequence. + :param meanCorridor: Corridor of allowed mean values of non-null values in the bytes + :param minLen: Minimum length of the byte string to be considered as a character sequence. + :return: The given bytes string is likely a character sequence. + """ vallen = len(values) nonzeros = [v for v in values if v > 0x00] return (vallen >= minLen @@ -655,12 +962,13 @@ def filterChars(segments: Iterable[AbstractSegment], meanCorridor=(50, 115), min def wobbleSegmentInMessage(segment: MessageSegment): """ - At start for now. + "Wobbles" segment byte values against its own start offsets. - For end if would be, e. g.: if segment.nextOffset < len(segment.message.data): segment.nextOffset + 1 + TODO Instead of the start offsets, for the end if would be, e. g.: + if segment.nextOffset < len(segment.message.data): segment.nextOffset + 1 - :param segment: - :return: + :param segment: Segment to wobble + :return: Wobbled segments """ wobbles = [segment] diff --git a/src/nemere/utils/evaluationHelpers.py b/src/nemere/utils/evaluationHelpers.py index 5a8e0d6..2c9f377 100644 --- a/src/nemere/utils/evaluationHelpers.py +++ b/src/nemere/utils/evaluationHelpers.py @@ -42,36 +42,6 @@ } -# # raw nemesys - cft-121 "withoutrefinement" -# sigmapertrace = { -# "dhcp_SMIA2011101X_deduped-100.pcap" : 0.6, -# "dns_ictf2010_deduped-100.pcap" : 0.6, -# "dns_ictf2010-new-deduped-100.pcap" : 0.6, -# "nbns_SMIA20111010-one_deduped-100.pcap" : 1.0, -# "ntp_SMIA-20111010_deduped-100.pcap" : 1.2, -# "smb_SMIA20111010-one_deduped-100.pcap" : 0.6, -# "dhcp_SMIA2011101X_deduped-1000.pcap" : 0.6, -# "dns_ictf2010_deduped-982-1000.pcap" : 0.6, -# "dns_ictf2010-new-deduped-1000.pcap" : 1.0, -# "nbns_SMIA20111010-one_deduped-1000.pcap" : 1.0, -# "ntp_SMIA-20111010_deduped-1000.pcap" : 1.2, -# "smb_SMIA20111010-one_deduped-1000.pcap" : 0.6, -# -# # assumptions derived from first traces -# "dhcp_SMIA2011101X-filtered_maxdiff-100.pcap": 0.6, -# "dns_ictf2010_maxdiff-100.pcap": 0.6, -# "dns_ictf2010-new_maxdiff-100.pcap": 0.6, -# "nbns_SMIA20111010-one_maxdiff-100.pcap": 1.0, -# "ntp_SMIA-20111010_maxdiff-100.pcap": 1.2, -# "smb_SMIA20111010-one-rigid1_maxdiff-100.pcap": 0.6, -# "dhcp_SMIA2011101X-filtered_maxdiff-1000.pcap": 0.6, -# "dns_ictf2010_maxdiff-1000.pcap": 0.6, -# "dns_ictf2010-new_maxdiff-1000.pcap": 0.6, -# "nbns_SMIA20111010-one_maxdiff-1000.pcap": 1.0, -# "ntp_SMIA-20111010_maxdiff-1000.pcap": 1.2, -# "smb_SMIA20111010-one-rigid1_maxdiff-1000.pcap": 0.6, -# } - sigmapertrace = { "dhcp_SMIA2011101X-filtered_maxdiff-1000.pcap": 0.4, "dns_ictf2010-new_maxdiff-1000.pcap": 0.9, diff --git a/src/nemere/utils/reportWriter.py b/src/nemere/utils/reportWriter.py index c15ea11..7360e61 100644 --- a/src/nemere/utils/reportWriter.py +++ b/src/nemere/utils/reportWriter.py @@ -18,6 +18,7 @@ from nemere.inference.templates import Template, TypedTemplate, FieldTypeTemplate from nemere.utils.evaluationHelpers import StartupFilecheck, reportFolder, segIsTyped, unknown from nemere.utils.loader import SpecimenLoader +from nemere.validation.clusterInspector import SegmentClusterCauldron from nemere.validation.dissectorMatcher import FormatMatchScore, MessageComparator, BaseDissectorMatcher from nemere.visualization.simplePrint import FieldtypeComparingPrinter @@ -55,7 +56,7 @@ def getMinMeanMaxFMS(scores: Iterable[float]) -> Tuple[float, float, float]: def countMatches(quality: Iterable[FormatMatchScore]): """ :param quality: List of FormatMatchScores - :return: count of exact matches, off-by-one near matches, off-by-more-than-one matches + :return: Count of exact matches, off-by-one near matches, off-by-more-than-one matches """ exactcount = 0 offbyonecount = 0 @@ -70,6 +71,16 @@ def countMatches(quality: Iterable[FormatMatchScore]): def writeReport(formatmatchmetrics: Dict[AbstractMessage, FormatMatchScore], runtime: float, comparator: MessageComparator, inferenceTitle: str, folder="reports", withTitle=False): + """ + Write report of the message segmentation quality measured in FMS to files. + + :param formatmatchmetrics: FMS for each message. + :param runtime: Runtime of the analysis. + :param comparator: Comparator to relate the inference to the ground truth. + :param inferenceTitle: The title to use in the report to refer to the analysis run. + :param folder: Folder to store the report to. + :param withTitle: Flag to use the inferenceTitle in the file names of the report. + """ if not isdir(folder): raise NotADirectoryError("The reports folder {} is not a directory. Reports cannot be written there.".format( @@ -149,6 +160,12 @@ def writeReport(formatmatchmetrics: Dict[AbstractMessage, FormatMatchScore], def writeSegmentedMessages2CSV(segmentsPerMsg: Sequence[Sequence[MessageSegment]], folder="reports"): + """ + Write the given segmentation into a CSV file. + + :param segmentsPerMsg: List of messages as a list of segments + :param folder: Folder to store the report to. + """ import csv fileNameS = 'SegmentedMessages' with open(os.path.join(folder, fileNameS + '.csv'), 'w') as csvfile: @@ -160,6 +177,15 @@ def writeSegmentedMessages2CSV(segmentsPerMsg: Sequence[Sequence[MessageSegment] def writeFieldTypesTikz(comparator: MessageComparator, segmentedMessages: List[Tuple[MessageSegment]], fTypeTemplates: List[FieldTypeTemplate], filechecker: StartupFilecheck): + """ + Visualization of segments from clusters in messages as tikz file. + see also nemere/visualization/simplePrint.py + + :param comparator: Comparator to relate the inference to the ground truth. + :param segmentedMessages: List of messages as a list of segments. + :param fTypeTemplates: Field type templates of the inferred fields. + :param filechecker: Filechecker to determine a suitable folder to write the file to. + """ # select the messages to print by quality: three of around fmsmin, fmsmean, fmsmax each fmslist = [BaseDissectorMatcher(comparator, msg).calcFMS() for msg in segmentedMessages] fmsdict = {fms.score: fms for fms in fmslist} # type: Dict[float, FormatMatchScore] @@ -177,9 +203,26 @@ def writeFieldTypesTikz(comparator: MessageComparator, segmentedMessages: List[T tikzfile.write(tikzcode) +def writeSemanticTypeHypotheses(cauldron: SegmentClusterCauldron, filechecker: StartupFilecheck): + cauldron.regularClusters.shapeStats(filechecker) + semanticHeaders = ["regCluI", "Cluster Label", "semantic type"] + semanticHypotheses = cauldron.regularClusters.semanticTypeHypotheses() + # print(tabulate([(i, cauldron.regularClusters.clusterLabel(i), h) for i, h in + # semanticHypotheses.items()], headers=semanticHeaders)) + reportFile = join(filechecker.reportFullPath, "semanticTypeHypotheses-" + filechecker.pcapstrippedname + ".csv") + print("Write semantic type hypotheses to", reportFile) + with open(reportFile, 'a') as csvfile: + statisticscsv = csv.writer(csvfile) + statisticscsv.writerow(semanticHeaders) + statisticscsv.writerows([( i, cauldron.regularClusters.clusterLabel(i), h ) + for i, h in semanticHypotheses.items()]) + Element = TypeVar('Element', AbstractMessage, AbstractSegment) class Report(ABC): + """ + A base class for writing reports of various inference aspects into files. + """ statsFile = "statistics" def __init__(self, groundtruth, pcap: Union[str, StartupFilecheck], reportPath: str=None): @@ -196,6 +239,7 @@ def __init__(self, groundtruth, pcap: Union[str, StartupFilecheck], reportPath: @abstractmethod def write(self, inference, runtitle: Union[str, Dict]): + """To be implemented by a subclass.""" raise NotImplementedError() class ClusteringReport(Report, ABC): @@ -235,6 +279,10 @@ def _printMessage(self, outfile: str): @staticmethod def inferenceColumns(inferenceParams: Dict[str, str]): + """ + :param inferenceParams: Parameters of the inference to be included in each line of the report. + :return: List of additional columns in the report about the subject inference. + """ infCols = OrderedDict() infCols["tokenrefine"] = inferenceParams["tokenizer"] if inferenceParams["tokenParams"] is not None: infCols["tokenrefine"] += "-" + inferenceParams["tokenParams"] @@ -265,6 +313,12 @@ def addColumn(self, colData: Dict[Hashable, Any], header: str): self._additionalColumns[header] = colData def write(self, clusters: Dict[Hashable, List[Element]], runtitle: Union[str, Dict]): + """ + Write the report with the individual cluster statistics for the given clusters to the file system. + + :param clusters: Clusters to generate the report for. + :param runtitle: Title by which this analysis can be identified. + """ numSegs = 0 prList = [] @@ -371,6 +425,8 @@ def recall(self): def write(self, clusters: Dict[Hashable, List[Element]], runtitle: Union[str, Dict], ignoreUnknown=True): """ + Write the report with the individual cluster statistics for the given clusters to the file system. + Precision and recall for the whole clustering interpreted as number of draws from pairs of messages. For details see: https://nlp.stanford.edu/IR-book/html/htmledition/evaluation-of-clustering-1.html @@ -380,6 +436,9 @@ def write(self, clusters: Dict[Hashable, List[Element]], runtitle: Union[str, Di for (a) all clusters and for (b) clusters that have a size of at least 1/40 of the number of samples/messages. 'total segs' and 'unique segs' are including 'unknown' and 'noise' + + :param clusters: Clusters to generate the report for. + :param runtitle: Title by which this analysis can be identified. """ from collections import Counter from itertools import combinations, chain @@ -499,12 +558,12 @@ def plotMultiSegmentLines(segmentGroups: List[Tuple[str, List[Tuple[str, TypedSe """ This is a not awfully important helper function saving the writing of a few lines of code. - :param segmentGroups: - :param specimens: - :param pagetitle: - :param colorPerLabel: + :param segmentGroups: Groups of clusters of segments that should be plotted. + :param specimens: Specimen object to link the plot to the source trace. + :param pagetitle: Title to include in the plot name. + :param colorPerLabel: Flag to select whether segments should be colored accorrding to their label. :param typeDict: dict of types (str-keys: list of segments) present in the segmentGroups - :param isInteractive: + :param isInteractive: Use a interactive windows or write to file. """ from nemere.visualization.multiPlotter import MultiMessagePlotter @@ -544,6 +603,12 @@ def __init__(self, pcap: Union[str, StartupFilecheck], reportPath: str=None): super().__init__(None, pcap, reportPath) def write(self, clusters: Dict[str, List[Union[MessageSegment, Template]]], runtitle: Union[str, Dict]=None): + """ + Write the report with the cluster statistics for the given clusters to the file system. + + :param clusters: Clusters to generate the report for. + :param runtitle: Title by which this analysis can be identified. + """ self._writeCSV(clusters, runtitle) def _writeCSV(self, clusters: Dict[str, List[Union[MessageSegment, Template]]], runtitle: Union[str, Dict]=None): @@ -579,7 +644,7 @@ def __init__(self, comparator: MessageComparator, segments: List[AbstractSegment pcap: Union[str, StartupFilecheck], reportPath: str=None): """ - :param comparator: The comparator providing the ground truth + :param comparator: The comparator providing the ground truth :param segments: List of segments to write statistics for :param pcap: The filename or StartupFilecheck object pointing to the pcap :param reportPath: If None, automatically determine a path in the report folder using pcap.reportFullPath @@ -595,6 +660,12 @@ def __init__(self, comparator: MessageComparator, segments: List[AbstractSegment for rawSeg, typSeg in self.typedMatchTemplates.items()} def write(self, clusters: Dict[str, Union[MessageSegment, Template]], runtitle: Union[str, Dict]=None): + """ + Write the report with the cluster statistics for the given clusters to the file system. + + :param clusters: Clusters to generate the report for. + :param runtitle: Title by which this analysis can be identified. + """ self._writeCSV(clusters, runtitle) def _writeCSV(self, clusters: Dict[str, Union[MessageSegment, Template]], runtitle: Union[str, Dict]=None): diff --git a/src/nemere/validation/clusterInspector.py b/src/nemere/validation/clusterInspector.py index ade0af6..d4f1a47 100644 --- a/src/nemere/validation/clusterInspector.py +++ b/src/nemere/validation/clusterInspector.py @@ -1,10 +1,10 @@ import math from collections import Counter, MutableSequence -from itertools import combinations +from itertools import combinations, chain from typing import List, Iterable, Sequence, Tuple, Union import scipy.stats -import numpy, re +import numpy, pandas, re from networkx import Graph from networkx.algorithms.components.connected import connected_components from tabulate import tabulate @@ -13,11 +13,16 @@ from nemere.inference.segments import MessageSegment, TypedSegment, AbstractSegment from nemere.inference.templates import DelegatingDC, AbstractClusterer, Template, TypedTemplate, FieldTypeTemplate from nemere.utils.loader import SpecimenLoader -from nemere.utils.baseAlgorithms import tril +from nemere.utils.evaluationHelpers import StartupFilecheck +from nemere.utils.baseAlgorithms import ecdf, tril from nemere.visualization.distancesPlotter import DistancesPlotter +from nemere.visualization.multiPlotter import MultiMessagePlotter class ClusterLabel(object): + """ + Helper to generate cluster labels from cluster properties. + """ def __init__(self, clusterNumber: Union[None, str, int] = None): self.clusterNumber = None # type: Union[None, str] if clusterNumber is None: @@ -79,7 +84,7 @@ def __repr__(self): return f"tf{self.clusterNumber}" def toString(self): - """More classical string representation than repr""" + """More classical string representation than `__repr__()`""" if self.isNoise: if self.analysisTitle and self.lengthsString and self.clusterSize: return '{} ({} bytes), Noise: {} Seg.s'.format(self.analysisTitle, self.lengthsString, self.clusterSize) @@ -107,6 +112,9 @@ def toString(self): @property def mostFrequentRatio(self) -> Union[None, float]: + """ + :return: The most frequent field type in the cluster. Requires `self.mostFrequentTypes` to be set. + """ if isinstance(self.mostFrequentTypes, Sequence): return self.mostFrequentTypes[0][1] / sum(s for t, s in self.mostFrequentTypes) return None @@ -114,19 +122,19 @@ def mostFrequentRatio(self) -> Union[None, float]: class SegmentClusterCauldron(object): """ - Container class for results of the clustering of segments + Container class for results of the clustering of segments. """ noise: List[AbstractSegment] clusters: List[List[AbstractSegment]] def __init__(self, clusterer: AbstractClusterer, analysisTitle: str): """ - Cluster segments according to the distance of their feature vectors. + Cluster segments according to the dissimilarities of their feature vectors. Keep and label segments classified as noise. Start post processing of clusters (splitting, merging, singular/regular clusters, ...) after the desired preparation of clusters (e.g., by extractSingularFromNoise, appendCharSegments, ...) by calling - **clustersOfUniqueSegments()** + `clustersOfUniqueSegments()` before advanced function are available. :param clusterer: Clusterer object that contains all the segments to be clustered @@ -163,13 +171,13 @@ def extractSingularFromNoise(self): self.clusters.append([self.noise.pop(idx)]) # .baseSegments def appendCharSegments(self, charSegments: List[AbstractSegment]): - """Append the given char segments to the cluster container. Needs """ + """Append the given char segments to the cluster container.""" if len(charSegments) > 0: self.clusters.append(charSegments) @staticmethod def truncateList(values: Iterable, maxLen=5): - """Truncate a list of values if its longer than maxLen by adding ellipsis.""" + """Truncate a list of values if its longer than maxLen by adding ellipsis dots.""" output = [str(v) for v in values] if len(output) > maxLen: return output[:2] + ["..."] + output[-2:] @@ -180,9 +188,9 @@ def truncateList(values: Iterable, maxLen=5): def clustersOfUniqueSegments(self): """ Consolidate cluster contents so that the same value in different segments is only represented once per cluster. - The clusters are also stored in the instance as property self.unisegClusters + The clusters are also stored in the instance as property `self.unisegClusters` - :return: structure of segments2clusteredTypes + :return: structure of `segments2clusteredTypes` """ segmentClusters = [ # identical to the second iteration of segments2clusteredTypes() ( self._clusterLabel(segs), type(self)._segmentsLabels(segs) ) @@ -226,7 +234,7 @@ def __mixed(self): def _regularAndSingularClusters(self): """ Fill lists with clusters that contain at least three distinct values (regular) and less (singular). - see also nemere.inference.segmentHandler.extractEnumClusters() + see also `nemere.inference.segmentHandler.extractEnumClusters()` """ self.regularClusters = SegmentClusters(self.clusterer.distanceCalculator) self.singularClusters = SegmentClusters(self.clusterer.distanceCalculator) @@ -288,6 +296,9 @@ def _clusterLabel(self, cluster: List[AbstractSegment]): return cLabel def exportAsTemplates(self): + """ + :return: List of field type templates generated from the clusters in this cauldron. + """ fTypeTemplates = list() for i in self.regularClusters.clusterIndices: # generate FieldTypeTemplates (padded nans) - Templates as is @@ -322,7 +333,7 @@ class TypedSegmentClusterCauldron(SegmentClusterCauldron): def __init__(self, clusterer: AbstractClusterer, analysisTitle: str): """ - Cluster segments according to the distance of their feature vectors. + Cluster segments according to the dissimilarities of their feature vectors. Keep and label segments classified as noise. :param clusterer: Clusterer object that contains all the segments to be clustered @@ -393,6 +404,11 @@ def _clusterLabel(self, cluster: List[TypedSegment]): @staticmethod def getMostFrequentTypes(cluster: List[TypedSegment]): + """ + Determine the most frequent field types from the given list of typed segments. + :param cluster: List of typed segments + :return: Sorted list of most frequent types as tuples of a type and its count. + """ typeGroups = segments2types(cluster) return sorted(((ftype, len(tsegs)) for ftype, tsegs in typeGroups.items()), key=lambda x: -x[1]) @@ -431,71 +447,158 @@ def label4segment(self, seg: AbstractSegment) -> Union[bool, str]: return False class SegmentClusterContainer(MutableSequence): - """Container for Clusters of unique segments.""" + """Container for clusters of unique segments.""" - _il3q = 99 # parameter: percentile q to remove extreme outliers from distances (for std calc) - _il3t = .4 # parameter: threshold for the ratio of stds to indicate a linear chain - _il4t = 3. # parameter: threshold for the ratio of the increase of the matrix traces of the sorted distance matrix - # in direction of the largest extent of the cluster + _il3q = 99 + """parameter: percentile q to remove extreme outliers from dissimilarities (for std calc)""" + _il3t = .4 + """parameter: threshold for the ratio of stds to indicate a linear chain""" + _il4t = 3. + """ + parameter: threshold for the ratio of the increase of the matrix traces of the sorted dissimilarity matrix + in direction of the largest extent of the cluster""" def __init__(self, dc: DelegatingDC): + """ + Initialize an empty container for segment clusters. + :param dc: Distance calculator that contains entries for all segments that will be contained in any clusters + that will be added to this container. + """ self._clusters = list() # type: List[Tuple[ClusterLabel, List[Tuple[str, TypedSegment]]]] self._distanceCalculator = dc def insert(self, index: int, o: Tuple[ClusterLabel, List[Tuple[str, TypedSegment]]]) -> None: - # TODO check if o[1] is already in cluster #i and do not do anything (raise an Error?) except updating the element labels to [mixed] if they were not identical in o[1] and cluster[i][1] + """ + Insert the entry o before for the cluster with the given index. + + TODO check if o[1] is already in cluster #i and do not do anything (raise an Error?) + except updating the element labels to [mixed] if they were not identical in o[1] and cluster[i][1] + + :param index: Cluster index. + :param o: Cluster tuple structure. + """ self._clusters.insert(index, o) def __getitem__(self, i: int) -> Tuple[ClusterLabel, List[Tuple[str, TypedSegment]]]: + """ + Return the entry for the cluster with the given index. + + :param i: Cluster index. + :return: Cluster tuple structure. + """ return self._clusters.__getitem__(i) def __setitem__(self, i: int, o: Tuple[ClusterLabel, List[Tuple[str, TypedSegment]]]) -> None: - # TODO check if o[1] is already in cluster #i and do not do anything (raise an Error?) except updating the element labels to [mixed] if they were not identical in o[1] and cluster[i][1] + """ + Set/replace the entry for the cluster with the given index with o. + + TODO check if o[1] is already in cluster #i and do not do anything (raise an Error?) + except updating the element labels to [mixed] if they were not identical in o[1] and cluster[i][1] + + :param i: Cluster index. + :param o: Cluster tuple structure. + """ self._clusters.__setitem__(i, o) def __delitem__(self, i: int) -> None: + """ + Remove the cluster with the given index from this container. + :param i: Cluster index. + """ self._clusters.__delitem__(i) def __len__(self) -> int: + """ + :return: Number of clusters in this container. + """ return self._clusters.__len__() def __contains__(self, o: Tuple[ClusterLabel, List[Tuple[str, TypedSegment]]]): - # TODO check if o[1] is already in cluster #i and do not do anything (raise an Error?) except updating the element labels to [mixed] if they were not identical in o[1] and cluster[i][1] + """ + Check if o exists in this container. + + TODO check if o[1] is already in cluster #i and do not do anything (raise an Error?) + except updating the element labels to [mixed] if they were not identical in o[1] and cluster[i][1] + + :param o: Cluster tuple structure. + :return: True if the given cluster o exists in this container. + """ return self._clusters.__contains__(o) def clusterContains(self, i: int, o: Tuple[str, TypedSegment]): - # TODO check if o[1] is already in cluster #i and do not do anything (raise an Error?) except updating the element labels to [mixed] if they were not identical in o[1] and cluster[i][1] + """ + Check if the entry o exists in the cluster given by the index. + + TODO check if o[1] is already in cluster #i and do not do anything (raise an Error?) + except updating the element labels to [mixed] if they were not identical in o[1] and cluster[i][1] + + :param i: Cluster index. + :param o: Cluster elements. + :return: True if o is in the cluster with the given index, False otherwise. + """ for lab, ele in self._clusters[i][1]: if lab == o[0] and ele.values == o[1].values: return True return False def clusterIndexOfSegment(self, segment: TypedSegment): + """ + :param segment: Segment to search for. + :return: Index of the cluster that contains the given segment, + None if the segment is not in any of the clusters of this container. + """ for ci in self.clusterIndices: if segment in self.clusterElements(ci): return ci return None def clusterLabel(self, i: int) -> str: + """ + :param i: Cluster index. + :return: Label of the cluster with the given index. + """ return str(self._clusters[i][0]) def clusterElements(self, i: int) -> List[TypedSegment]: + """ + :param i: Cluster index. + :return: List of segments contained in the cluster with the given index. + """ return [b for a,b in self._clusters[i][1]] @property def clusters(self): + """ + :return: Raw list of clusters in this container. + """ return self._clusters def dcSubMatrix(self, i: int): + """ + :param i: Cluster index. + :return: Dissimilarity matrix for the cluster with the given index. + """ return self._distanceCalculator.distancesSubset(self.clusterElements(i)) def __repr__(self): + """ + :return: Textual representation of this cluster container. + """ return "\n".join(self.clusterLabel(i) for i in self.clusterIndices) # # # # # # # # # # # # # # # # # # # # # # # # # # # def _modClusterLabel(self, subc: List[Tuple[str, Union[AbstractSegment, Template]]], hasGT: bool, clusterNumber: str, analysisTitle: str): + """ + Prepare a new cluster label for the given cluster subc from its properties. + + :param subc: Cluster to create a label for. + :param hasGT: Flag to indicate if ground truth information should be used. Requires TypedSegment in subc. + :param clusterNumber: Cluster index. + :param analysisTitle: Title of the analysis to use in the label. + :return: Label for the cluster with the given index. + """ cLabel = ClusterLabel(clusterNumber) cLabel.analysisTitle = analysisTitle cLabel.lengthsString = " ".join(SegmentClusterCauldron.truncateList({seg.length for l, seg in subc})) @@ -512,7 +615,7 @@ def splitOnOccurrence(self): """ Split clusters if they have extremely polarized occurrences (e.g., many unique values, very few very high occurring values). - As pivot use ln(occSum). (Determination of a knee has some false positives + As pivot use `ln(occSum)`. (Determination of a knee has some false positives and is way too complex for its benefit). """ splitClusters = list() @@ -551,7 +654,7 @@ def mergeOnDensity(self): epsilonDensityThreshold = 0.01 neighborDensityThreshold = 0.002 - # median values for the 1-nearest neighbor ("minimum" distance). + # median values for the 1-nearest neighbor ("minimum" dissimilarity). minmedians = [numpy.median([self._distanceCalculator.neighbors(ce, self.clusterElements(i))[1][1] for ce in self.clusterElements(i)]) for i in self.clusterIndices] @@ -561,7 +664,8 @@ def mergeOnDensity(self): cpDists = { (i, j): self._distanceCalculator.distancesSubset(self.clusterElements(i), self.clusterElements(j)) for i, j in combinations(self.clusterIndices, 2) } - # in case of empty distances, the median may be requested from an empty list. This is no problem, thus ignore. + # In case of empty dissimilarities, the median may be requested from an empty list. + # This is no problem, thus ignore. with warnings.catch_warnings(): warnings.simplefilter("ignore", category=RuntimeWarning) vals = list() @@ -575,7 +679,7 @@ def mergeOnDensity(self): smallCluster = i if maxdists[i] < maxdists[j] else j # extent of the smaller cluster smallClusterExtent = maxdists[smallCluster] - # density as median distances in $\epsilon$-neighborhood with smallClusterExtent as $\epsilon$ + # density as median dissimilarityie in $\epsilon$-neighborhood with smallClusterExtent as $\epsilon$ dists2linki = numpy.delete(self.dcSubMatrix(i)[coordmin[0]], coordmin[0]) dists2linkj = numpy.delete(self.dcSubMatrix(j)[coordmin[1]], coordmin[1]) densityi = numpy.median(dists2linki[dists2linki <= smallClusterExtent / 2]) @@ -583,10 +687,10 @@ def mergeOnDensity(self): vals.append(( (i,j), # 0: indices tuple - trils[i].mean(), None, minmedians[i], # 1*, 2, 3*: of distances in cluster i + trils[i].mean(), None, minmedians[i], # 1*, 2, 3*: of dissimilarities in cluster i None, # 4 - trils[j].mean(), None, minmedians[j], # 5*, 6, 7*: of distances in cluster j - cpDists[(i, j)].min(), # 8*: min of distances between i and j + trils[j].mean(), None, minmedians[j], # 5*, 6, 7*: of dissimilarities in cluster j + cpDists[(i, j)].min(), # 8*: min of dissimilarities between i and j None, # 9 densityi, # 10*: density within epsilon around link segment in i densityj # 11*: density within epsilon around link segment in j @@ -638,13 +742,14 @@ def mergeOnDensity(self): def trilFlat(self, i: int) -> numpy.ndarray: """ :param i: cluster index - :return: The values of the lower triangle of the distance matrix of the given cluster omitting the diagonal + :return: The values of the lower triangle of the dissimilarity matrix of the given cluster omitting the diagonal as a list. """ return tril(self.dcSubMatrix(i)) def occurrences(self, i: int) -> List[int]: """ + Count distinct values in a cluster. You may put this list in a Counter: >>> from collections import Counter @@ -694,13 +799,64 @@ def distinctValues(self, i: int): """ return len(self.clusterElements(i)) + def traverseViaNearest(self, i: int): + """ + Path through the cluster hopping from nearest neighbor to nearest neighbor that has not already been visited. + All segments are visited this way. + + >>> from tabulate import tabulate + >>> from nemere.utils.baseAlgorithms import generateTestSegments + >>> from nemere.inference.templates import DelegatingDC + >>> from nemere.validation.clusterInspector import SegmentClusterContainer + >>> segments = generateTestSegments() + >>> dc = DelegatingDC(segments) + Calculated distances for 37 segment pairs in ... seconds. + >>> someclusters = SegmentClusterContainer(dc) + >>> someclusters.append(("Sample Cluster", list(("Some Type", s) for s in segments))) + >>> path = zip(*someclusters.traverseViaNearest(0)) + >>> print(tabulate((path))) + ---------------------------------------------------------------- --------- + MessageSegment 4 bytes at (0, 4): 00000000 | values: (0, 0, 0... 1 + MessageSegment 4 bytes at (0, 4): 01020304 | values: (1, 2, 3... 0.125 + MessageSegment 4 bytes at (0, 4): 03020304 | values: (3, 2, 3... 0.214355 + MessageSegment 3 bytes at (0, 3): 020304 | values: (2, 3, 4) 0.111084 + MessageSegment 3 bytes at (0, 3): 010304 | values: (1, 3, 4) 0.367676 + MessageSegment 2 bytes at (0, 2): 0204 | values: (2, 4) 0.0714111 + MessageSegment 2 bytes at (0, 2): 0203 | values: (2, 3) 0.700684 + MessageSegment 3 bytes at (0, 3): 250545 | values: (37, 5, 69) 0.57666 + ---------------------------------------------------------------- --------- + + :param i: Cluster index + :return: The path as a list of segments and the distances along this path + """ + idxA, idxC, segA, segC = self.remotestSegments(i) + distances = self.dcSubMatrix(i) + numpy.fill_diagonal(distances, numpy.nan) + segmentReference = self.clusterElements(i) + if Counter(segmentReference).most_common(1)[0][1] > 1: + raise ValueError("Duplicate element in cluster.") + df = pandas.DataFrame(distances, index=segmentReference, columns=segmentReference) + + segB = segA + path = [segA] + distsAlongPath = list() + while df.shape[0] > 1: + nearest = df[segB].idxmin(0) + path.append(nearest) + distsAlongPath.append(df.at[segB, nearest]) + df.drop(segB, 0, inplace=True) + df.drop(segB, 1, inplace=True) + assert all(numpy.isnan(numpy.diag(df))) # be sure we removed symmetrical + segB = nearest + return path, numpy.array(distsAlongPath) + def maxDist(self, i: int): """ :param i: Cluster index - :return: The maximum distance between any two segments in cluster i. + :return: The maximum dissimilarity between any two segments in cluster i. """ if self.distinctValues(i) < 2: - # If cluster contains only one template, we define the (maximum) distance to itself to be zero. + # If cluster contains only one template, we define the (maximum) dissimilarity to itself to be zero. # (Less than 1 template/segment should not happen, but we handle it equally and do not fail, # should it happen.) return 0 @@ -710,7 +866,7 @@ def maxDist(self, i: int): def remotestSegments(self, i: int): """ - Determine the segment with the maximum sum of distances to all other segments (A) + Determine the segment with the maximum sum of dissimilarities to all other segments (A) and the segment farthest away from this (C). >>> from pprint import pprint @@ -767,14 +923,26 @@ def remotestSegments(self, i: int): return idxA, idxC, segA, segC def elementLengths(self, i: int) -> numpy.ndarray: + """ + :param i: Cluster index. + :return: The lengths in bytes of the elements in cluster i. + """ segLens = numpy.array(list({e.length for e in self.clusterElements(i)})) return segLens def charSegmentCount(self, i: int): + """ + :param i: Cluster index. + :return: Number of character segments in cluster i. + """ return len(filterChars(self.clusterElements(i))) @staticmethod def mostFrequentTypes(cluster: List[Tuple[str, TypedSegment]]): + """ + :param cluster: Cluster as a list of labels and typed segments. + :return: Sorted list of the most frequent types extraced from the element labels. + """ segLabelExtractor = re.compile(r"(\w*): (\d*) Seg.s") allLabels = {l for l,e in cluster} typeStats = [next(segLabelExtractor.finditer(l)).groups() for l in allLabels] @@ -783,6 +951,11 @@ def mostFrequentTypes(cluster: List[Tuple[str, TypedSegment]]): return mostFrequentTypes def distancesSortedByLargestExtent(self, i: int): + """ + :param i: Cluster index. + :return: Dissimilarities of all elements in the cluster with index i + sorted by the dissimilarity from the remotest segment in the cluster. + """ smi = self.dcSubMatrix(i) dfari = smi[self.remotestSegments(i)[0]] cmi = self.clusterElements(i) @@ -793,11 +966,11 @@ def distancesSortedByLargestExtent(self, i: int): def traceMeanDiff(self, i: int): """ Difference of the first and the mean of all other diffs of all $k$-traces (sums of the - $k$th superdiagonals) of the sorted distance matrix for the segments in this cluster. - It is sorted by the distance from the remotest segment in the cluster. + $k$th superdiagonals) of the sorted dissimilarity matrix for the segments in this cluster. + It is sorted by the dissimilarity from the remotest segment in the cluster. - :param i: - :return: + :param i: Cluster index. + :return: Dissimilarity difference value. """ # # @@ -811,16 +984,111 @@ def traceMeanDiff(self, i: int): # # # # # # # # # # # # # # # # # # # # # # # # # # # + def shapeIndicators(self, i: int): + path, distsAlongPath = self.traverseViaNearest(i) + maxD = self.maxDist(i) + distList = self.trilFlat(i) + + # deal with extreme outliers for indiLinear3 + distMask = distList < numpy.percentile(distList, type(self)._il3q) + diapMask = distsAlongPath < numpy.percentile(distsAlongPath, type(self)._il3q) + # standard deviation of all distances without extreme outliers (outside 99-percentile) + distStdwoO = numpy.std(distList[distMask], dtype=numpy.float64) + # standard deviation of distances along path without extreme outliers (outside 99-percentile) + diapStdwoO = numpy.std(distsAlongPath[diapMask], dtype=numpy.float64) + + indiLinear1 = bool(sum(distsAlongPath) < maxD * math.log(len(path))) # indicates probable linear chain + indiLinear3 = bool(diapStdwoO / distStdwoO < type(self)._il3t) # indicates probable linear chain + indiChaotic = bool(sum(distsAlongPath) > maxD * 1.2 * math.log(len(path))) # indicates probable blob/chaotic cluster + + return indiLinear1, indiLinear3, indiChaotic + + def indiLinear1(self, i: int): + return self.shapeIndicators(i)[0] + def indiLinear3(self, i: int): + return self.shapeIndicators(i)[1] + + def indiLinear4(self, i: int): + return self.traceMeanDiff(i) < self._il4t + + def indiChaotic(self, i: int): + return self.shapeIndicators(i)[2] + + def isAddr(self, i: int): + return self.occurrenceLnPercentRank(i) < 35 + # don't care about shape + # and self.shapeIndicators(i)[1:] == (True, False) # don't care about indicator 1 + + def isSequence(self, i: int): + return 70 <= self.occurrenceLnPercentRank(i) <= 92.5 \ + and self.indiLinear4(i) == True and self.indiChaotic(i) == False + + def isIdFlagsInt(self, i: int): + return self.occurrenceLnPercentRank(i) > 85 \ + and all(2 <= self.elementLengths(i)) and all(self.elementLengths(i) <= 4) + + def isTimestamp(self, i: int): + return self.occurrenceLnPercentRank(i) > 95 \ + and self.indiLinear4(i) == False and self.indiChaotic(i) == True \ + and all(3 <= self.elementLengths(i)) and all(self.elementLengths(i) <= 8) + + def isPayload(self, i: int): + # CHARS [ or (BYTES > 8) ] + return self.charSegmentCount(i) / len(self.clusterElements(i)) > .80 # \ + # or all(self.elementLengths(i) > 8) + + def isPad(self, i: int): + valCnt = Counter(chain.from_iterable(e.values for e in self.clusterElements(i))) + mcVal, mcCnt = valCnt.most_common(1)[0] + # most common is null byte and nulls are almost the exclusive content + return mcVal == 0 and mcCnt / sum(valCnt.values()) > .95 @property def clusterIndices(self): + """ + :return: Range of all valid cluster indices in this container. + """ return range(len(self)) + def semanticTypeHypotheses(self): + hypo = dict() + for i in self.clusterIndices: + if self.isPad(i): + hypo[i] = "pad" + elif self.distinctValues(i) <= 3: + hypo[i] = None # Flags/Addresses? see singular cluster rules for flags/id/addr + elif self.isPayload(i): # prio 0 + hypo[i] = "payload" + elif self.isTimestamp(i): # prio 2 (swapped to 1) + hypo[i] = "timestamp" + elif self.isIdFlagsInt(i): # prio 1 (swapped to 2) + hypo[i] = "id/flags/int" + elif self.isSequence(i): # prio 3 + hypo[i] = "addr/seq" # "sequence" TODO try to use distribution of values (equally distributed distances?) + elif self.isAddr(i): # prio 4 + hypo[i] = "addr/seq" # "addr" + else: + hypo[i] = None + return hypo + class SegmentClusters(SegmentClusterContainer): + """ + Extension of the container for clusters of unique segments with further kinds of cluster properties to be evaluated. + This may support future work to identify semnatics of field types and thus also contains a number of helper + functions to plot and print cluster properties, representations, and hypothesis statistics. + """ def plotDistances(self, i: int, specimens: SpecimenLoader, comparator=None): + """ + Plot the dissimilarities between all segments of the given cluster in this container as Topology Plots. + + :param i: Cluster index. + :param specimens: SpecimenLoader object to determine the source of the data + for retaining the correct link to the evaluated trace. + :param comparator: Compare to ground truth, if available. + """ if len(self.clusterElements(i)) < 2: print("Too few elements to plot in", self.clusterLabel(i), "Ignoring it.") return @@ -843,3 +1111,335 @@ def plotDistances(self, i: int, specimens: SpecimenLoader, comparator=None): sdp.plotManifoldDistances(self.clusterElements(i), dists, numpy.array(labels)) sdp.axesFlat[1].set_title(self.clusterLabel(i)) sdp.writeOrShowFigure() + + def plotDistributions(self, specimens: SpecimenLoader): + plotTitle = "Distances Distribution Density per Cluster" + mmp = MultiMessagePlotter(specimens, plotTitle, math.ceil(len(self))) + subTitles = list() + # spn: sub plot number, clu: cluster + for spn in range(len(self)): + cl = self.clusterLabel(spn) + subTitles.append(cl) + + dist = self.trilFlat(spn) + cnts, bins, _ = mmp.axes[spn].hist(dist, bins=30) + # mmp.axes[spn].legend() + + mu, sigma = scipy.stats.norm.fit(dist) + bestfit = scipy.stats.norm.pdf(bins, mu, sigma) + yscale = sum(cnts) / sum(bestfit) + mmp.axes[spn].plot(bins, bestfit*yscale) + mmp.nameEachAx(subTitles) + mmp.writeOrShowFigure() + + def plotPincipalComponents(self, specimens: SpecimenLoader, sampleSize=None): + """ + Print up to the first ten eigenvalues of the covariance matrix of the distances per cluster. + + :param specimens: For instantiating the plotter class. + :param sampleSize: If not None, enforce comparable results by random subsampling of clusters to + sampleSize and ignoring smaller clusters. + """ + plotTitle = "Eigenvalues per Cluster" + mmp = MultiMessagePlotter(specimens, plotTitle, math.ceil(len(self))) + subTitles = list() + vals = list() + for i in range(len(self)): + cl = self.clusterLabel(i) + subTitles.append(cl) + if sampleSize is not None: + if len(self.clusterElements(i)) < sampleSize: + print("Ignoring cluster", cl, "with element count", len(self.clusterElements(i))) + continue + dists = self._distanceCalculator.distancesSubset( + numpy.random.choice(self.clusterElements(i), sampleSize, False)) + else: + dists = self.dcSubMatrix(i) + cov = numpy.cov(dists) + eigenValuesAndVectors = numpy.linalg.eigh(cov) + vals.append((cl, *eigenValuesAndVectors[0][10::-1])) + mmp.plotToSubfig(i, eigenValuesAndVectors[0][10::-1]) + print(tabulate(vals)) + mmp.nameEachAx(subTitles) + mmp.writeOrShowFigure() + + def shapiro(self, i: int): + """ + Result: Probably no test for normal distribution will work. Clusters with few elements tend to become false + positives while in any case the densities in the histograms will be truncated at 0 and the cluster "boundary". + """ + dists = self.trilFlat(i) + subsDists = numpy.random.choice(dists, 5000, False) if len(dists) > 5000 else dists + # noinspection PyUnresolvedReferences + return scipy.stats.shapiro(subsDists)[1] > 0.05 + + def skewedness(self, i: int): + """(mode - mean)/stdev""" + dists = self.trilFlat(i) + # noinspection PyUnresolvedReferences + mode = scipy.stats.mode(dists).mode[0] + median = numpy.median(dists) + mean = numpy.mean(dists) + stdev = numpy.std(dists) + skn = (mode - median) / stdev + return mode, median, mean, stdev, skn + + def occurrencePercentile(self, i: int, q=95): + return numpy.percentile(numpy.array(self.occurrences(i)), q) + + def eig_eigw(self, i: int, rank=1): + """Deprecated. Direct eigenvalues up to rank.""" + dists = self.dcSubMatrix(i) + return numpy.absolute(scipy.linalg.eigh(dists)[0])[:rank] + + def eig_hasDirection(self, i: int): + """Deprecated. + Has a "direction" in terms of a "significantly large" first eigenvalue relative to the + maximal distance in the cluster.""" + dirFactThresh = 5 + ew = self.eig_eigw(i,2) + # ew0diff1 = ew[0] > 2 * ev[1] # this distinguishes nothing helpful + ew0snf = ew[0] > dirFactThresh * self.maxDist(i) + return ew0snf # and ew0diff1 + + def pc_percentile(self, i: int, q=85): + """q: quantile""" + dists = self.dcSubMatrix(i) + cov = numpy.cov(dists) + eigenValuesAndVectors = numpy.linalg.eigh(cov) + return numpy.percentile(eigenValuesAndVectors[0], q) + + def pc_stats(self, i: int, q=95): + """three-sigma rule of thumb: 68–**95**–99.7 rule""" + eigenValuesAndVectors = numpy.linalg.eigh(numpy.cov(self.dcSubMatrix(i))) + p95 = self.pc_percentile(i, q) + pcS = sum(eigenValuesAndVectors[0] > p95) # larger 1: "has extent" + return ( + self.clusterLabel(i), + sum(eigenValuesAndVectors[0][:-2]) / sum(eigenValuesAndVectors[0][-2:]), + p95, + pcS, + pcS / self.distinctValues(i), # value > 0.1: ratio of significant PCs among all components, + # works as indicator for shape in nbns, not for smb + # interpretation: variance of distances concentrated on first pcSs. + self.distinctValues(i) + ) + + def entropy(self, i: int): + raise NotImplementedError() + + def ecdf(self, i: int): + dist = self.trilFlat(i) # type: numpy.ndarray + return ecdf(list(dist)) + + def plotECDFs(self, specimens: SpecimenLoader): + plotTitle = "Empirical Cumulated Distribution Function per Cluster" + mmp = MultiMessagePlotter(specimens, plotTitle, math.ceil(len(self))) + subTitles = list() + for spn in range(len(self)): + cl = self.clusterLabel(spn) + print(cl) + subTitles.append(cl) + + ecdValues = self.ecdf(spn) + mmp.axes[spn].plot(*ecdValues) + mmp.nameEachAx(subTitles) + mmp.writeOrShowFigure() + + def triangularDistances(self, i: int): + idxA, idxC, segA, segC = self.remotestSegments(i) + distances = self.dcSubMatrix(i) + directAC = distances[idxA,idxC] + + viaB = distances[idxA] + distances[idxC] + assert viaB[idxA] == directAC # this is path A--A--C + assert viaB[idxC] == directAC # this is path A--C--C + viaB[idxC] = numpy.nan + viaB[idxA] = numpy.nan + + detourB = viaB - directAC # how much longer than the direct path A--C are we for each B? + return detourB + # TODO how to compare to a single threshold (as a tuned parameter)? + + def shapeStats(self, filechecker: StartupFilecheck, doPrint=False): + """filechecker.pcapstrippedname""" + from tabulate import tabulate + import csv + + vals = list() + for i in range(len(self)): + if self.distinctValues(i) < 2: + print("Omit cluster with less than two elements", self.clusterLabel(i), "(id/flags?)") + continue + + idxA, idxC, segA, segC = self.remotestSegments(i) + path, distsAlongPath = self.traverseViaNearest(i) + maxD = self.maxDist(i) + distList = self.trilFlat(i) + + # deal with extreme outliers for indiLinear3 + distMask = distList < numpy.percentile(distList, type(self)._il3q) + diapMask = distsAlongPath < numpy.percentile(distsAlongPath, type(self)._il3q) + # standard deviation of all distances without extreme outliers (outside 99-percentile) + distStdwoO = numpy.std(distList[distMask], dtype=numpy.float64) + # standard deviation of distances along path without extreme outliers (outside 99-percentile) + diapStdwoO = numpy.std(distsAlongPath[diapMask], dtype=numpy.float64) + + # indicates probable linear chain + indiLinear1 = bool(sum(distsAlongPath) < maxD * math.log(len(path))) + # (indicates probable linear chain) + indiLinear2 = bool(path.index(segC) / len(path) > .9) + # indicates probable linear chain + indiLinear3 = bool(diapStdwoO / distStdwoO < type(self)._il3t) + # indicates probable blob/chaotic cluster + indiChaotic = bool(sum(distsAlongPath) > maxD * 1.2 * math.log(len(path))) + # everything else: probable non-linear (multi-linear) substructure + assert self.shapeIndicators(i) == (indiLinear1, indiLinear3, indiChaotic) + + # shorter float can cause "RuntimeWarning: overflow encountered in reduce" for large clusters + distStd = numpy.std(distList, dtype=numpy.float64) + diapStd = numpy.std(distsAlongPath, dtype=numpy.float64) + + # collect statistics + vals.append(( + self.clusterLabel(i), + numpy.mean(distList), + distStd, + maxD, + sum(distsAlongPath), + sum(distsAlongPath) - maxD, + path.index(segC), + len(path), + numpy.mean(distsAlongPath), + diapStd, + numpy.percentile(distsAlongPath, 95), + diapStdwoO, + distStdwoO, + self.traceMeanDiff(i), + + indiLinear1, + indiLinear2, + indiLinear3, + self.indiLinear4(i), + indiChaotic, + + math.log(self.occurrenceSum(i)), + self.occurrenceLnPercentRank(i), + self.occurrencePercentile(i), + numpy.std(self.occurrences(i)), + numpy.median(self.occurrences(i)), + self.distinctValues(i), + )) + headers = [ + "cluster", "meanDist", "stdDist", "maxDist", "distsAlongPath", "detour", "segCinPath", "pathLength", + "mean(distsAlongPath)", "std(distsAlongPath)", "percentile(distsAlongPath, 95)", "distStdwoO", "diapStdwoO", + "traceMeanDiff", + "indiLinear1", "indiLinear2", "indiLinear3", "indiLinear4", "indiChaotic", + "lnOccSum", "lnOccPercRank", "95percentile", "occStd", "occMedian", "distinctV" ] + if doPrint: + print(tabulate(vals, headers=headers)) + from os.path import join + reportFile = join(filechecker.reportFullPath, "shapeStats-" + filechecker.pcapstrippedname + ".csv") + print("Write cluster shape statistics to", reportFile) + with open(reportFile, 'a') as csvfile: + statisticscsv = csv.writer(csvfile) + statisticscsv.writerow(headers) + statisticscsv.writerows(vals) + return vals + + def routingPath(self, i: int): + raise NotImplementedError("Find a routing algorithm to find the shortest path from the most distant elements.") + + def linkedClustersStats(self): + """Merge nearby (single-linked) clusters with very similar densities.""" + # median values for the 1-nearest neighbor ("minimum" distance). + minmedians = [numpy.median([self._distanceCalculator.neighbors(ce, self.clusterElements(i))[1][1] + for ce in self.clusterElements(i)]) + for i in self.clusterIndices] + # minmeans = [numpy.mean([self._distanceCalculator.neighbors(ce, self.clusterElements(i))[1][1] + # for ce in self.clusterElements(i)]) + # for i in self.clusterIndices()] + maxdists = [self.maxDist(i) for i in self.clusterIndices] + + trils = [self.trilFlat(i) for i in self.clusterIndices] + cpDists = { (i, j): self._distanceCalculator.distancesSubset(self.clusterElements(i), self.clusterElements(j)) + for i, j in combinations(self.clusterIndices, 2) } + # plt.plot(*ecdf(next(iter(cpDists.values())).flat)) + + vals = list() + for i, j in combinations(self.clusterIndices, 2): + # density in $\epsilon$-neighborhood around nearest points between similar clusters + # $\epsilon$-neighborhood: link segments (closest in two clusters) s_lc_ic_j + # d(s_lc_ic_j, s_k) <= $\epsilon$ for all (s_k in c_i) != s_lc_ic_j + # the nearest points between the clusters + coordmin = numpy.unravel_index(cpDists[(i, j)].argmin(), cpDists[(i, j)].shape) + # index of the smaller cluster + smallCluster = i if maxdists[i] < maxdists[j] else j + # extent of the smaller cluster + smallClusterExtent = maxdists[smallCluster] + # density as median distances in $\epsilon$-neighborhood with smallClusterExtent as $\epsilon$ + dists2linki = numpy.delete(self.dcSubMatrix(i)[coordmin[0]], coordmin[0]) + dists2linkj = numpy.delete(self.dcSubMatrix(j)[coordmin[1]], coordmin[1]) + densityi = numpy.median(dists2linki[dists2linki <= smallClusterExtent / 2]) + densityj = numpy.median(dists2linkj[dists2linkj <= smallClusterExtent / 2]) + + trili = trils[i] + trilj = trils[j] + vals.append(( + self.clusterLabel(i), # 0 + trili.mean(), trili.std(), minmedians[i], # 1, 2, 3 + self.clusterLabel(j), # 4 + trilj.mean(), trilj.std(), minmedians[j], # 5, 6, 7 + cpDists[(i, j)].min(), # 8 + cpDists[(i, j)].max(), # 9 + densityi, # 10 + densityj # 11 + )) + + # print(tabulate([(self.clusterLabel(i), minmedians[i]) for i in self.clusterIndices], + # headers=["cluster", "density"])) + # # density +/- 0.002 (?) + # print(tabulate( + # [(*v, bool(v[8] < v[1] or v[8] < v[5]), numpy.array([v[1], v[5]]).min() * 10, abs(v[3] - v[7])) + # for v in vals])) + # print(tabulate( + # [(*v, bool(v[8] < v[1] or v[8] < v[5]), + # bool(v[8] < numpy.array([v[1] + v[5]]).min() * 10), abs(v[3] - v[7])) + # for v in vals])) + # print(tabulate( + # [(*v, bool(v[8] < v[1] or v[8] < v[5]), + # bool(v[8] < v[1] + v[2] + v[5] + v[6]), bool(abs(v[3] - v[7]) < 0.002)) + # for v in vals])) + + # merge cluster condition!! working on nbns with no FP! + areVeryCloseBy = [bool(v[8] < v[1] or v[8] < v[5]) for v in vals] + linkHasSimilarEpsilonDensity = [bool(abs(v[10] - v[11]) < 0.01) for v in vals] + # closer as the mean between both cluster's "densities" normalized to the extent of the cluster + areSomewhatCloseBy = [bool(v[8] < numpy.mean([v[3] / v[1], v[7] / v[5]])) for v in vals] + haveSimilarDensity = [bool(abs(v[3] - v[7]) < 0.002) for v in vals] + + print(tabulate([ + ( *v, areVeryCloseBy[ij], linkHasSimilarEpsilonDensity[ij], + areSomewhatCloseBy[ij], haveSimilarDensity[ij] ) for ij, v in enumerate(vals)])) + # works with test traces: + # nbns! smb! + # dhcp: no! => + linkHasSimilarEpsilonDensity + # dns! + # ntp! + return vals + + # mergedClusters = SegmentClusters(self._distanceCalculator) + # mergedClusters.append(("#2, #4: flags", [element for i in [1, 3] for element in self[i][1]])) + # mergedClusters.append(("ids", [element for i in [0, 6, 7, 8, 9] for element in self[i][1]])) + # mergedClusters.append(("ipv4s", [element for i in [1, 3, 4, 5] for element in self[i][1]])) + # mergedClusters.semanticTypeHypotheses() + # return mergedClusters + + # mmp = MultiMessagePlotter(specimens, analysisTitle, 20) + # c03 = list(((i, j), d) for (i, j), d in cpDists.items() if i in [0, 3]) + # for ax, ((i, j), didi) in zip(mmp.axes, c03): + # ax.plot(*ecdf(didi.flat)) + # ax.set_title( + # cauldron.regularClusters.clusterLabel(i)[10:30] + " | " + cauldron.regularClusters.clusterLabel(j)[10:30]) + # mmp.writeOrShowFigure() + diff --git a/src/nemere/visualization/multiPlotter.py b/src/nemere/visualization/multiPlotter.py index f98fa46..6aadd1d 100644 --- a/src/nemere/visualization/multiPlotter.py +++ b/src/nemere/visualization/multiPlotter.py @@ -296,6 +296,11 @@ def plotCorrelations(self, def plotMultiSegmentLines(self, segmentGroups: List[Tuple[str, List[Tuple[str, TypedSegment]]]], colorPerLabel=False): + """ + + :param segmentGroups: Groups of clusters of segments that should be plotted. + :param colorPerLabel: Flag to select whether segments should be colored accorrding to their label. + """ import matplotlib.cm # make the leftover axes invisible diff --git a/src/nemere/visualization/simplePrint.py b/src/nemere/visualization/simplePrint.py index 1b6600a..1a5e3fb 100644 --- a/src/nemere/visualization/simplePrint.py +++ b/src/nemere/visualization/simplePrint.py @@ -1,3 +1,6 @@ +""" +Helper methods and classes to generate human-readable output from the inference results. +""" from collections import defaultdict from itertools import chain from time import strftime @@ -16,6 +19,12 @@ def printMatrix(lines: Iterable[Iterable], headers: Iterable=None): + """ + Print the two-dimensional Iterable in lines as matrix table, optionally with the given headers. + + :param lines: Content to print + :param headers: Headers for the matrix + """ ml = MatrixList() if headers: ml.headers = headers @@ -63,6 +72,11 @@ def alignDescreteValues(listA: list, listB: list) -> Tuple[list, list]: def tabuSeqOfSeg(sequence: Sequence[Sequence[MessageSegment]]): + """ + Print the two-dimensional Sequences of MessageSegments in a table with a index number as header. + + :param sequence: two-dimensional Sequences of MessageSegments + """ print(tabulate(((sg.bytes.hex() if sg is not None else '' for sg in msg) for msg in sequence), headers=range(len(sequence[0])), showindex="always", disable_numparse=True)) @@ -80,6 +94,17 @@ def resolveIdx2Seg(dc: DistanceCalculator, segseq: Sequence[Sequence[int]]): def printMarkedBytesInMessage(message: AbstractMessage, markStart, markEnd, subStart=0, subEnd=None): + """ + Print the given message and mark a substring from markStart to markEnd. + Optionally print only a substring of the message that must be at least as large as the marking. + + :param message: The message whose hex-coded binary values are to be printed. + :param markStart: The start of the substring to be marked. + :param markEnd: The end of the substring to be marked. + :param subStart: The optional start of the substring to be printed. + :param subEnd: The optional end of the substring to be printed. + :return: + """ if subEnd is None: subEnd = len(message.data) assert markStart >= subStart @@ -99,6 +124,12 @@ def printMarkedBytesInMessage(message: AbstractMessage, markStart, markEnd, subS def markSegmentInMessage(segment: Union[MessageSegment, Template]): + """ + Print a message with the given segment in the message marked (underlined). + Supports Templates by resolving them to their base segments. + + :param segment: The segment to be marked. The message is implicitly extracted from the object. + """ if isinstance(segment, MessageSegment): printMarkedBytesInMessage(segment.message, segment.offset, segment.nextOffset) else: @@ -137,9 +168,11 @@ def markSegNearMatch(segment: Union[Iterable[MessageSegment], MessageSegment, Te cprinter = ComparingPrinter(comparator, [inf4seg]) cprinter.toConsole([seg.message], (seg.offset, seg.nextOffset), context) + # # # # # Alternatives and usage examples: + # # # a simpler approach - without true fields marked as spaces # markSegmentInMessage(segment) - + # # # get field number of next true field # tsm = trueSegmentedMessages[segment.message] # type: List[MessageSegment] # fsnum, offset = 0, 0 @@ -147,7 +180,7 @@ def markSegNearMatch(segment: Union[Iterable[MessageSegment], MessageSegment, Te # offset += tsm[fsnum].offset # fsnum += 1 # markSegmentInMessage(trueSegmentedMessages[segment.message][fsnum]) - + # # # limit to immediate segment context # posSegMatch = None # first segment that starts at or after the recognized field # for sid, seg in enumerate(trueSegmentedMessages[segment.message]): @@ -313,6 +346,13 @@ def _msgoffs2label(self, msg, po): return "" def toTikz(self, selectMessages: Iterable[AbstractMessage] = None, styles = None): + """ + Generate compilable tikz (LaTeX) code to visualize the given messages. + + :param selectMessages: Messages to generate tikz code for. + :param styles: List of tikz styles to be included in the tikz environment preamble. + :return: The generated tikz/LaTeX code. + """ if styles is None: styles = type(self)._basestyles.copy() else: @@ -352,6 +392,14 @@ def toTikz(self, selectMessages: Iterable[AbstractMessage] = None, styles = None return texcode + "\n" def toTikzFile(self, selectMessages: Iterable[AbstractMessage] = None, styles = None, folder = None): + """ + Generate compilable tikz (LaTeX) code to visualize the given messages and write into a file in the given folder. + + :param selectMessages: Messages to generate tikz code for. + :param styles: List of tikz styles to be included in the tikz environment preamble. + :param folder: The folder to write the file to that contains the generated tikz/LaTeX code. + """ + from os.path import join, isdir, exists if folder is None: from ..utils.evaluationHelpers import reportFolder @@ -501,6 +549,9 @@ def _colorlabels(self, selectMessages: Iterable[AbstractMessage]): class FieldtypeComparingPrinter(ComparingPrinter): + """ + Comprehensive class to encapsulate visualizations of segments from clusters in messages. + """ def __init__(self, comparator: MessageComparator, ftclusters: List[FieldTypeTemplate]): # We populate the inferred segments from ftclusters ourselves right afterwards by _mapMessages2Segments(). @@ -590,6 +641,9 @@ def toTikz(self, selectMessages: Iterable[AbstractMessage] = None, styles = None class FieldClassesPrinter(SegmentPrinter): + """ + Comprehensive class to encapsulate visualizations of segments from clusters in messages. + """ def __init__(self, ftclusters: List[FieldTypeTemplate]): super().__init__(()) self._ftHelper = FieldtypeHelper(ftclusters)