From 4b277d4623d9fa20381acad3ee9c70f5c945c811 Mon Sep 17 00:00:00 2001 From: TuanAnh17N Date: Fri, 8 Nov 2024 10:36:02 +0100 Subject: [PATCH] Generalize rescoring rule model --- app.py | 9 ++++--- cli/_bdba.py | 2 +- config.py | 42 +++++++++++++++++++------------ consts.py | 4 +-- features/__init__.py | 49 ++++++++++++++++++++---------------- rescore/artefacts.py | 20 ++++++++++----- rescore/model.py | 57 ++++++++++++++++++++++++++++-------------- rescore/utility.py | 6 ++--- test/test_rescoring.py | 54 +++++++++++++++++++++++++++++++++++++++ 9 files changed, 171 insertions(+), 72 deletions(-) create mode 100644 test/test_rescoring.py diff --git a/app.py b/app.py index 2eb18d0..4a7d3f5 100755 --- a/app.py +++ b/app.py @@ -30,6 +30,7 @@ import osinfo import paths import rescore.artefacts +import rescore.model import service_extensions import special_component import sprint @@ -150,8 +151,8 @@ def add_app_context_vars( ).get_component_with_tests rescoring_feature = features.get_feature(features.FeatureRescoring) - cve_rescoring_rule_set_lookup = rescoring_feature.find_rule_set_by_name - default_rule_set_callback = rescoring_feature.default_rule_set + rescoring_rule_set_lookup = rescoring_feature.find_rule_set + default_rule_set_for_type_callback = rescoring_feature.default_rule_set_for_type issue_repo_callback = features.get_feature(features.FeatureIssues).get_issue_repo @@ -190,8 +191,8 @@ def add_app_context_vars( app[consts.APP_CFG_FACTORY] = cfg_factory app[consts.APP_COMPONENT_DESCRIPTOR_LOOKUP] = component_descriptor_lookup app[consts.APP_COMPONENT_WITH_TESTS_CALLBACK] = component_with_tests_callback - app[consts.APP_CVE_RESCORING_RULE_SET_LOOKUP] = cve_rescoring_rule_set_lookup - app[consts.APP_DEFAULT_RULE_SET_CALLBACK] = default_rule_set_callback + app[consts.APP_RESCORING_RULE_SET_LOOKUP] = rescoring_rule_set_lookup + app[consts.APP_DEFAULT_RULE_SET_FOR_TYPE_CALLBACK] = default_rule_set_for_type_callback app[consts.APP_DELIVERY_CFG] = parsed_arguments.delivery_cfg app[consts.APP_EOL_CLIENT] = eol.EolClient() app[consts.APP_GITHUB_API_LOOKUP] = github_api_lookup diff --git a/cli/_bdba.py b/cli/_bdba.py index de486dc..18bf3ad 100644 --- a/cli/_bdba.py +++ b/cli/_bdba.py @@ -165,7 +165,7 @@ def rescore( categorisation = categorisation_label.value rescoring_rules = tuple( - rm.rescoring_rules_from_dicts( + rm.cve_rescoring_rules_from_dicts( ci.util.parse_yaml_file(rescoring_rules) ) ) diff --git a/config.py b/config.py index 026a9ce..d59d0a3 100644 --- a/config.py +++ b/config.py @@ -214,7 +214,7 @@ class IssueReplicatorConfig: number_included_closed_issues: int artefact_types: tuple[str] node_filter: collections.abc.Callable[[cnudie.iter.Node], bool] - cve_rescoring_rules: tuple[rescore.model.RescoringRule] + cve_rescoring_rules: tuple[rescore.model.CveRescoringRule] finding_type_issue_replication_cfgs: tuple[FindingTypeIssueReplicationCfgBase] milestone_cfg: gcmi.MilestoneConfiguration @@ -512,28 +512,34 @@ def deserialise_bdba_config( configs=matching_configs, ) - cve_rescoring_ruleset = deserialise_config_property( + rescoring_rulesets_raw = deserialise_config_property( config=bdba_config, property_key='rescoring', default_config=default_config, absent_ok=True, ) - if cve_rescoring_ruleset: - # only one ruleset for now, will be updated with cm06-related "typed" rulesets - rescoring_rule_set_raw = cve_rescoring_ruleset['rescoringRuleSets'][0] + cve_rescoring_rule_sets = tuple( + rescore.model.CveRescoringRuleSet( + name=rule_set_raw['name'], + description=rule_set_raw.get('description'), + type=rule_set_raw['type'], + rules=list( + rescore.model.cve_rescoring_rules_from_dicts(rule_set_raw['rules']) + ) + ) + for rule_set_raw in rescoring_rulesets_raw['rescoringRuleSets'] + ) - cve_rescoring_ruleset = dacite.from_dict( - data_class=rescore.model.CveRescoringRuleSet, - data=dict( - **rescoring_rule_set_raw, - rules=list( - rescore.model.rescoring_rules_from_dicts(rescoring_rule_set_raw['rule_set']) - ), - ), + if cve_rescoring_rule_sets: + cve_rescoring_rule_set = rescore.model.find_rule_set_for_type( + rule_sets=cve_rescoring_rule_sets, + rule_set_type=rescore.model.RuleSetType.CVE, ) + else: + cve_rescoring_rule_set = None - if cve_rescoring_ruleset: + if cve_rescoring_rule_set: auto_assess_max_severity_raw = deserialise_config_property( config=bdba_config, property_key='auto_assess_max_severity', @@ -587,7 +593,7 @@ def deserialise_bdba_config( processing_mode=processing_mode, artefact_types=artefact_types, node_filter=node_filter, - cve_rescoring_ruleset=cve_rescoring_ruleset, + cve_rescoring_ruleset=cve_rescoring_rule_set, auto_assess_max_severity=auto_assess_max_severity, license_cfg=license_cfg, delete_inactive_products_after_seconds=delete_inactive_products_after_seconds, @@ -814,7 +820,11 @@ def deserialise_issue_replicator_config( default_config=default_config, default_value=[], ) - cve_rescoring_rules = tuple(rescore.model.rescoring_rules_from_dicts(cve_rescoring_rules_raw)) + cve_rescoring_rules = tuple( + rescore.model.cve_rescoring_rules_from_dicts( + cve_rescoring_rules_raw + ) + ) finding_type_issue_replication_cfgs_raw = deserialise_config_property( config=issue_replicator_config, diff --git a/consts.py b/consts.py index 2d9f92e..bc4c9b9 100644 --- a/consts.py +++ b/consts.py @@ -12,8 +12,8 @@ APP_CFG_FACTORY = 'cfg_factory' APP_COMPONENT_DESCRIPTOR_LOOKUP = 'component_descriptor_lookup' APP_COMPONENT_WITH_TESTS_CALLBACK = 'component_with_tests_callback' -APP_CVE_RESCORING_RULE_SET_LOOKUP = 'cve_rescoring_rule_set_lookup' -APP_DEFAULT_RULE_SET_CALLBACK = 'default_rule_set_callback' +APP_RESCORING_RULE_SET_LOOKUP = 'rescoring_rule_set_lookup' +APP_DEFAULT_RULE_SET_FOR_TYPE_CALLBACK = 'default_rule_set_for_type_callback' APP_DELIVERY_CFG = 'delivery_cfg' APP_EOL_CLIENT = 'eol_client' APP_GITHUB_API_LOOKUP = 'github_api_lookup' diff --git a/features/__init__.py b/features/__init__.py index ccb79bf..3beaa63 100644 --- a/features/__init__.py +++ b/features/__init__.py @@ -424,23 +424,32 @@ def serialize(self) -> dict[str, any]: @dataclasses.dataclass(frozen=True) class FeatureRescoring(FeatureBase): name: str = 'rescoring' - rescoring_rule_sets: list[rm.CveRescoringRuleSet] = dataclasses.field(default_factory=list) - default_rescoring_rule_set_name: str = None + rescoring_rule_sets: list[rm.RuleSet] = dataclasses.field(default_factory=list) cve_categorisation_label_url: str = None cve_severity_url: str = None - def find_rule_set_by_name( + def find_rule_set( self, name: str, - ) -> rm.CveRescoringRuleSet | None: + rule_set_type: str, + ) -> rm.RuleSet | None: for rs in self.rescoring_rule_sets: - rs: rm.CveRescoringRuleSet - if rs.name == name: + rs: rm.RuleSet + if ( + rs.name == name + and rs.type == rule_set_type + ): return rs return None - def default_rule_set(self) -> rm.CveRescoringRuleSet | None: - return self.find_rule_set_by_name(self.default_rescoring_rule_set_name) + def default_rule_set_for_type( + self, + rule_set_type, + ) -> rm.RuleSet | None: + return rm.find_rule_set_for_type( + rule_sets=self.rescoring_rule_sets, + rule_set_type=rule_set_type, + ) @dataclasses.dataclass(frozen=True) @@ -625,23 +634,21 @@ def deserialise_current_version_source( def deserialise_rescoring(rescoring_raw: dict) -> FeatureRescoring: - rescoring_rule_sets = [ - dacite.from_dict( - data_class=rm.CveRescoringRuleSet, - data=dict( - **rescoring_rule_set_raw, - rules=list( - rm.rescoring_rules_from_dicts(rescoring_rule_set_raw['rule_set']) - ), - ), + cve_rescoring_rule_sets = tuple( + rm.CveRescoringRuleSet( + name=rule_set_raw['name'], + description=rule_set_raw.get('description'), + type=rule_set_raw['type'], + rules=list( + rm.cve_rescoring_rules_from_dicts(rule_set_raw['rules']) + ) ) - for rescoring_rule_set_raw in rescoring_raw['rescoringRuleSets'] - ] + for rule_set_raw in rescoring_raw['rescoringRuleSets'] + ) return FeatureRescoring( state=FeatureStates.AVAILABLE, - rescoring_rule_sets=rescoring_rule_sets, - default_rescoring_rule_set_name=rescoring_raw['defaultRuleSetName'], + rescoring_rule_sets=cve_rescoring_rule_sets, cve_categorisation_label_url=rescoring_raw.get('cveCategorisationLabelUrl'), cve_severity_url=rescoring_raw.get('cveSeverityUrl'), ) diff --git a/rescore/artefacts.py b/rescore/artefacts.py index a70b629..d0714b0 100644 --- a/rescore/artefacts.py +++ b/rescore/artefacts.py @@ -83,13 +83,16 @@ class RescoringProposal: def _find_cve_rescoring_rule_set( default_cve_rescoring_rule_set: rm.CveRescoringRuleSet, - cve_rescoring_rule_set_lookup: CveRescoringRuleSetLookup, + rescoring_rule_set_lookup: CveRescoringRuleSetLookup, cve_rescoring_rule_set_name: str | None, ) -> rm.CveRescoringRuleSet | None: if not cve_rescoring_rule_set_name: return default_cve_rescoring_rule_set - return cve_rescoring_rule_set_lookup(cve_rescoring_rule_set_name) + return rescoring_rule_set_lookup( + name=cve_rescoring_rule_set_name, + rule_set_type=rm.RuleSetType.CVE, + ) async def _find_artefact_node( @@ -245,7 +248,7 @@ async def _find_rescorings( def _rescore_vulnerabilitiy( - rescoring_rules: collections.abc.Iterable[rm.RescoringRule] | None, + rescoring_rules: collections.abc.Iterable[rm.CveRescoringRule] | None, categorisation: dso.cvss.CveCategorisation | None, cvss: dso.cvss.CVSSV3 | dict, severity: dso.cvss.CVESeverity, @@ -374,7 +377,7 @@ def _package_versions_and_filesystem_paths( def _iter_rescoring_proposals( artefact_metadata: collections.abc.Iterable[dso.model.ArtefactMetadata], rescorings: collections.abc.Iterable[dso.model.ArtefactMetadata], - rescoring_rules: collections.abc.Iterable[rm.RescoringRule] | None, + rescoring_rules: collections.abc.Iterable[rm.CveRescoringRule] | None, categorisation: dso.cvss.CveCategorisation | None, max_processing_days: gcm.MaxProcessingTimesDays | None=None, sprints: list[yp.Sprint]=[], @@ -833,9 +836,14 @@ async def get(self): cve_rescoring_rule_set_name = util.param(params, 'cveRescoringRuleSetName') + default_rule_set_for_type_callback = self.request.app[ + consts.APP_DEFAULT_RULE_SET_FOR_TYPE_CALLBACK + ] + default_cve_rescoring_rule_set = default_rule_set_for_type_callback(rm.RuleSetType.CVE) + cve_rescoring_rule_set = _find_cve_rescoring_rule_set( - default_cve_rescoring_rule_set=self.request.app[consts.APP_DEFAULT_RULE_SET_CALLBACK](), - cve_rescoring_rule_set_lookup=self.request.app[consts.APP_CVE_RESCORING_RULE_SET_LOOKUP], + default_cve_rescoring_rule_set=default_cve_rescoring_rule_set, + rescoring_rule_set_lookup=self.request.app[consts.APP_RESCORING_RULE_SET_LOOKUP], cve_rescoring_rule_set_name=cve_rescoring_rule_set_name, ) diff --git a/rescore/model.py b/rescore/model.py index f55ef99..9efc88b 100644 --- a/rescore/model.py +++ b/rescore/model.py @@ -1,4 +1,5 @@ import collections +import collections.abc import dataclasses import enum import typing @@ -15,8 +16,18 @@ class Rescore(enum.Enum): NO_CHANGE = 'no-change' -@dataclasses.dataclass -class RescoringRule: +class RuleSetType(enum.StrEnum): + CVE = 'cve' + + +@dataclasses.dataclass(frozen=True) +class Rule: + name: str + rescore: Rescore + + +@dataclasses.dataclass(frozen=True) +class CveRescoringRule(Rule): ''' a CVE rescoring rule intended to be used when re-scoring a CVE (see `CVSSV3` type) for an artefact that has a `CveCategorisation`. @@ -34,8 +45,6 @@ class RescoringRule: ''' category_value: str cve_values: list[str] - rescore: Rescore - name: typing.Optional[str] = None @property def category_attr(self): @@ -123,24 +132,34 @@ def matches_categorisation(self, categorisation: dso.cvss.CveCategorisation) -> @dataclasses.dataclass(frozen=True) -class CveRescoringRuleSet: - ''' - Represents configuration for a single rescoring rule set. - A single rule set is used to perform a CVSS rescoring according to a well-defined contract. - One of the rule sets can be defined as default, used as fallback for rescoring if no rule set is - specified. - See rescoring documentation for more details: - - TODO: publish rescoring-docs (removed SAP-internal reference prior to open-sourcing) - ''' +class RuleSet[T: Rule]: name: str - description: str - rules: list[RescoringRule] + type: RuleSetType + rules: list[T] + description: str | None = None + + +class CveRescoringRuleSet(RuleSet[CveRescoringRule]): + pass + + +def find_rule_set_for_type( + rule_sets: tuple[RuleSet], + rule_set_type: str, + absent_ok: bool=True, +) -> RuleSet | None: + for rule_set in rule_sets: + if rule_set.type == rule_set_type: + return rule_set + if not absent_ok: + raise ValueError(f'No rule set found for {rule_set_type=}') -def rescoring_rules_from_dicts(rules: list[dict]) -> typing.Generator[RescoringRule, None, None]: +def cve_rescoring_rules_from_dicts( + rules: list[dict] +) -> typing.Generator[CveRescoringRule, None, None]: ''' - deserialises rescoring rules. Each dict is expected to have the following form: + deserialises cve_rescoring rules. Each dict is expected to have the following form: category_value: : name: (optional) @@ -158,7 +177,7 @@ def rescoring_rules_from_dicts(rules: list[dict]) -> typing.Generator[RescoringR rescore = subrule['rescore'] yield dacite.from_dict( - data_class=RescoringRule, + data_class=CveRescoringRule, data={ 'category_value': category_value, 'cve_values': cve_values, diff --git a/rescore/utility.py b/rescore/utility.py index 60c0094..16b44d8 100644 --- a/rescore/utility.py +++ b/rescore/utility.py @@ -142,10 +142,10 @@ def rescorings_for_finding_by_specificity( def matching_rescore_rules( - rescoring_rules: typing.Iterable[rescore.model.RescoringRule], + rescoring_rules: typing.Iterable[rescore.model.CveRescoringRule], categorisation: dso.cvss.CveCategorisation, cvss: dso.cvss.CVSSV3 | dict, -) -> typing.Generator[rescore.model.RescoringRule, None, None]: +) -> typing.Generator[rescore.model.CveRescoringRule, None, None]: for rescoring_rule in rescoring_rules: if not rescoring_rule.matches_categorisation(categorisation): continue @@ -156,7 +156,7 @@ def matching_rescore_rules( def rescore_severity( - rescoring_rules: typing.Iterable[rescore.model.RescoringRule], + rescoring_rules: typing.Iterable[rescore.model.CveRescoringRule], severity: dso.cvss.CVESeverity, minimum_severity: int=dso.cvss.CVESeverity.NONE, ) -> dso.cvss.CVESeverity: diff --git a/test/test_rescoring.py b/test/test_rescoring.py new file mode 100644 index 0000000..9e3f1ad --- /dev/null +++ b/test/test_rescoring.py @@ -0,0 +1,54 @@ +import pytest + +import rescore.model + + +@pytest.fixture +def cve_rescoring_rules_raw() -> dict: + return { + 'rescoringRuleSets': [ + { + 'name': 'my-cve-rescoring', + 'type': 'cve', + 'rules': [ + { + 'category_value': 'network_exposure:public', + 'name': 'network-exposure-public', + 'rules': [ + { + 'cve_values': ['AV:N'], + 'rescore': 'no-change' + }, + { + 'cve_values': ['AV:A'], + 'rescore': 'reduce' + }, + { + 'cve_values': ['AV:L', 'AV:P'], + 'rescore': 'not-exploitable' + }, + ] + } + ], + } + ] + } + + +def test_deserialise_rescoring_rule_sets( + cve_rescoring_rules_raw: dict, +): + cve_rescoring_rule_sets = tuple( + rescore.model.CveRescoringRuleSet( + name=cve_rule_set_raw['name'], + description=cve_rule_set_raw.get('description'), + type=cve_rule_set_raw['type'], + rules=list( + rescore.model.cve_rescoring_rules_from_dicts(cve_rule_set_raw['rules']) + ) + ) + for cve_rule_set_raw in cve_rescoring_rules_raw['rescoringRuleSets'] + ) + assert isinstance(cve_rescoring_rule_sets, tuple) + assert len(cve_rescoring_rule_sets) == 1 + assert isinstance(cve_rescoring_rule_sets[0], rescore.model.CveRescoringRuleSet)