Skip to content

Commit

Permalink
Generalize rescoring rule model
Browse files Browse the repository at this point in the history
  • Loading branch information
TuanAnh17N committed Nov 12, 2024
1 parent 687738b commit 4b277d4
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 72 deletions.
9 changes: 5 additions & 4 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import osinfo
import paths
import rescore.artefacts
import rescore.model
import service_extensions
import special_component
import sprint
Expand Down Expand Up @@ -150,8 +151,8 @@ def add_app_context_vars(
).get_component_with_tests

rescoring_feature = features.get_feature(features.FeatureRescoring)
cve_rescoring_rule_set_lookup = rescoring_feature.find_rule_set_by_name
default_rule_set_callback = rescoring_feature.default_rule_set
rescoring_rule_set_lookup = rescoring_feature.find_rule_set
default_rule_set_for_type_callback = rescoring_feature.default_rule_set_for_type

issue_repo_callback = features.get_feature(features.FeatureIssues).get_issue_repo

Expand Down Expand Up @@ -190,8 +191,8 @@ def add_app_context_vars(
app[consts.APP_CFG_FACTORY] = cfg_factory
app[consts.APP_COMPONENT_DESCRIPTOR_LOOKUP] = component_descriptor_lookup
app[consts.APP_COMPONENT_WITH_TESTS_CALLBACK] = component_with_tests_callback
app[consts.APP_CVE_RESCORING_RULE_SET_LOOKUP] = cve_rescoring_rule_set_lookup
app[consts.APP_DEFAULT_RULE_SET_CALLBACK] = default_rule_set_callback
app[consts.APP_RESCORING_RULE_SET_LOOKUP] = rescoring_rule_set_lookup
app[consts.APP_DEFAULT_RULE_SET_FOR_TYPE_CALLBACK] = default_rule_set_for_type_callback
app[consts.APP_DELIVERY_CFG] = parsed_arguments.delivery_cfg
app[consts.APP_EOL_CLIENT] = eol.EolClient()
app[consts.APP_GITHUB_API_LOOKUP] = github_api_lookup
Expand Down
2 changes: 1 addition & 1 deletion cli/_bdba.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def rescore(
categorisation = categorisation_label.value

rescoring_rules = tuple(
rm.rescoring_rules_from_dicts(
rm.cve_rescoring_rules_from_dicts(
ci.util.parse_yaml_file(rescoring_rules)
)
)
Expand Down
42 changes: 26 additions & 16 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ class IssueReplicatorConfig:
number_included_closed_issues: int
artefact_types: tuple[str]
node_filter: collections.abc.Callable[[cnudie.iter.Node], bool]
cve_rescoring_rules: tuple[rescore.model.RescoringRule]
cve_rescoring_rules: tuple[rescore.model.CveRescoringRule]
finding_type_issue_replication_cfgs: tuple[FindingTypeIssueReplicationCfgBase]
milestone_cfg: gcmi.MilestoneConfiguration

Expand Down Expand Up @@ -512,28 +512,34 @@ def deserialise_bdba_config(
configs=matching_configs,
)

cve_rescoring_ruleset = deserialise_config_property(
rescoring_rulesets_raw = deserialise_config_property(
config=bdba_config,
property_key='rescoring',
default_config=default_config,
absent_ok=True,
)

if cve_rescoring_ruleset:
# only one ruleset for now, will be updated with cm06-related "typed" rulesets
rescoring_rule_set_raw = cve_rescoring_ruleset['rescoringRuleSets'][0]
cve_rescoring_rule_sets = tuple(
rescore.model.CveRescoringRuleSet(
name=rule_set_raw['name'],
description=rule_set_raw.get('description'),
type=rule_set_raw['type'],
rules=list(
rescore.model.cve_rescoring_rules_from_dicts(rule_set_raw['rules'])
)
)
for rule_set_raw in rescoring_rulesets_raw['rescoringRuleSets']
)

cve_rescoring_ruleset = dacite.from_dict(
data_class=rescore.model.CveRescoringRuleSet,
data=dict(
**rescoring_rule_set_raw,
rules=list(
rescore.model.rescoring_rules_from_dicts(rescoring_rule_set_raw['rule_set'])
),
),
if cve_rescoring_rule_sets:
cve_rescoring_rule_set = rescore.model.find_rule_set_for_type(
rule_sets=cve_rescoring_rule_sets,
rule_set_type=rescore.model.RuleSetType.CVE,
)
else:
cve_rescoring_rule_set = None

if cve_rescoring_ruleset:
if cve_rescoring_rule_set:
auto_assess_max_severity_raw = deserialise_config_property(
config=bdba_config,
property_key='auto_assess_max_severity',
Expand Down Expand Up @@ -587,7 +593,7 @@ def deserialise_bdba_config(
processing_mode=processing_mode,
artefact_types=artefact_types,
node_filter=node_filter,
cve_rescoring_ruleset=cve_rescoring_ruleset,
cve_rescoring_ruleset=cve_rescoring_rule_set,
auto_assess_max_severity=auto_assess_max_severity,
license_cfg=license_cfg,
delete_inactive_products_after_seconds=delete_inactive_products_after_seconds,
Expand Down Expand Up @@ -814,7 +820,11 @@ def deserialise_issue_replicator_config(
default_config=default_config,
default_value=[],
)
cve_rescoring_rules = tuple(rescore.model.rescoring_rules_from_dicts(cve_rescoring_rules_raw))
cve_rescoring_rules = tuple(
rescore.model.cve_rescoring_rules_from_dicts(
cve_rescoring_rules_raw
)
)

finding_type_issue_replication_cfgs_raw = deserialise_config_property(
config=issue_replicator_config,
Expand Down
4 changes: 2 additions & 2 deletions consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
APP_CFG_FACTORY = 'cfg_factory'
APP_COMPONENT_DESCRIPTOR_LOOKUP = 'component_descriptor_lookup'
APP_COMPONENT_WITH_TESTS_CALLBACK = 'component_with_tests_callback'
APP_CVE_RESCORING_RULE_SET_LOOKUP = 'cve_rescoring_rule_set_lookup'
APP_DEFAULT_RULE_SET_CALLBACK = 'default_rule_set_callback'
APP_RESCORING_RULE_SET_LOOKUP = 'rescoring_rule_set_lookup'
APP_DEFAULT_RULE_SET_FOR_TYPE_CALLBACK = 'default_rule_set_for_type_callback'
APP_DELIVERY_CFG = 'delivery_cfg'
APP_EOL_CLIENT = 'eol_client'
APP_GITHUB_API_LOOKUP = 'github_api_lookup'
Expand Down
49 changes: 28 additions & 21 deletions features/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,23 +424,32 @@ def serialize(self) -> dict[str, any]:
@dataclasses.dataclass(frozen=True)
class FeatureRescoring(FeatureBase):
name: str = 'rescoring'
rescoring_rule_sets: list[rm.CveRescoringRuleSet] = dataclasses.field(default_factory=list)
default_rescoring_rule_set_name: str = None
rescoring_rule_sets: list[rm.RuleSet] = dataclasses.field(default_factory=list)
cve_categorisation_label_url: str = None
cve_severity_url: str = None

def find_rule_set_by_name(
def find_rule_set(
self,
name: str,
) -> rm.CveRescoringRuleSet | None:
rule_set_type: str,
) -> rm.RuleSet | None:
for rs in self.rescoring_rule_sets:
rs: rm.CveRescoringRuleSet
if rs.name == name:
rs: rm.RuleSet
if (
rs.name == name
and rs.type == rule_set_type
):
return rs
return None

def default_rule_set(self) -> rm.CveRescoringRuleSet | None:
return self.find_rule_set_by_name(self.default_rescoring_rule_set_name)
def default_rule_set_for_type(
self,
rule_set_type,
) -> rm.RuleSet | None:
return rm.find_rule_set_for_type(
rule_sets=self.rescoring_rule_sets,
rule_set_type=rule_set_type,
)


@dataclasses.dataclass(frozen=True)
Expand Down Expand Up @@ -625,23 +634,21 @@ def deserialise_current_version_source(


def deserialise_rescoring(rescoring_raw: dict) -> FeatureRescoring:
rescoring_rule_sets = [
dacite.from_dict(
data_class=rm.CveRescoringRuleSet,
data=dict(
**rescoring_rule_set_raw,
rules=list(
rm.rescoring_rules_from_dicts(rescoring_rule_set_raw['rule_set'])
),
),
cve_rescoring_rule_sets = tuple(
rm.CveRescoringRuleSet(
name=rule_set_raw['name'],
description=rule_set_raw.get('description'),
type=rule_set_raw['type'],
rules=list(
rm.cve_rescoring_rules_from_dicts(rule_set_raw['rules'])
)
)
for rescoring_rule_set_raw in rescoring_raw['rescoringRuleSets']
]
for rule_set_raw in rescoring_raw['rescoringRuleSets']
)

return FeatureRescoring(
state=FeatureStates.AVAILABLE,
rescoring_rule_sets=rescoring_rule_sets,
default_rescoring_rule_set_name=rescoring_raw['defaultRuleSetName'],
rescoring_rule_sets=cve_rescoring_rule_sets,
cve_categorisation_label_url=rescoring_raw.get('cveCategorisationLabelUrl'),
cve_severity_url=rescoring_raw.get('cveSeverityUrl'),
)
Expand Down
20 changes: 14 additions & 6 deletions rescore/artefacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,16 @@ class RescoringProposal:

def _find_cve_rescoring_rule_set(
default_cve_rescoring_rule_set: rm.CveRescoringRuleSet,
cve_rescoring_rule_set_lookup: CveRescoringRuleSetLookup,
rescoring_rule_set_lookup: CveRescoringRuleSetLookup,
cve_rescoring_rule_set_name: str | None,
) -> rm.CveRescoringRuleSet | None:
if not cve_rescoring_rule_set_name:
return default_cve_rescoring_rule_set

return cve_rescoring_rule_set_lookup(cve_rescoring_rule_set_name)
return rescoring_rule_set_lookup(
name=cve_rescoring_rule_set_name,
rule_set_type=rm.RuleSetType.CVE,
)


async def _find_artefact_node(
Expand Down Expand Up @@ -245,7 +248,7 @@ async def _find_rescorings(


def _rescore_vulnerabilitiy(
rescoring_rules: collections.abc.Iterable[rm.RescoringRule] | None,
rescoring_rules: collections.abc.Iterable[rm.CveRescoringRule] | None,
categorisation: dso.cvss.CveCategorisation | None,
cvss: dso.cvss.CVSSV3 | dict,
severity: dso.cvss.CVESeverity,
Expand Down Expand Up @@ -374,7 +377,7 @@ def _package_versions_and_filesystem_paths(
def _iter_rescoring_proposals(
artefact_metadata: collections.abc.Iterable[dso.model.ArtefactMetadata],
rescorings: collections.abc.Iterable[dso.model.ArtefactMetadata],
rescoring_rules: collections.abc.Iterable[rm.RescoringRule] | None,
rescoring_rules: collections.abc.Iterable[rm.CveRescoringRule] | None,
categorisation: dso.cvss.CveCategorisation | None,
max_processing_days: gcm.MaxProcessingTimesDays | None=None,
sprints: list[yp.Sprint]=[],
Expand Down Expand Up @@ -833,9 +836,14 @@ async def get(self):

cve_rescoring_rule_set_name = util.param(params, 'cveRescoringRuleSetName')

default_rule_set_for_type_callback = self.request.app[
consts.APP_DEFAULT_RULE_SET_FOR_TYPE_CALLBACK
]
default_cve_rescoring_rule_set = default_rule_set_for_type_callback(rm.RuleSetType.CVE)

cve_rescoring_rule_set = _find_cve_rescoring_rule_set(
default_cve_rescoring_rule_set=self.request.app[consts.APP_DEFAULT_RULE_SET_CALLBACK](),
cve_rescoring_rule_set_lookup=self.request.app[consts.APP_CVE_RESCORING_RULE_SET_LOOKUP],
default_cve_rescoring_rule_set=default_cve_rescoring_rule_set,
rescoring_rule_set_lookup=self.request.app[consts.APP_RESCORING_RULE_SET_LOOKUP],
cve_rescoring_rule_set_name=cve_rescoring_rule_set_name,
)

Expand Down
57 changes: 38 additions & 19 deletions rescore/model.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import collections
import collections.abc
import dataclasses
import enum
import typing
Expand All @@ -15,8 +16,18 @@ class Rescore(enum.Enum):
NO_CHANGE = 'no-change'


@dataclasses.dataclass
class RescoringRule:
class RuleSetType(enum.StrEnum):
CVE = 'cve'


@dataclasses.dataclass(frozen=True)
class Rule:
name: str
rescore: Rescore


@dataclasses.dataclass(frozen=True)
class CveRescoringRule(Rule):
'''
a CVE rescoring rule intended to be used when re-scoring a CVE (see `CVSSV3` type) for an
artefact that has a `CveCategorisation`.
Expand All @@ -34,8 +45,6 @@ class RescoringRule:
'''
category_value: str
cve_values: list[str]
rescore: Rescore
name: typing.Optional[str] = None

@property
def category_attr(self):
Expand Down Expand Up @@ -123,24 +132,34 @@ def matches_categorisation(self, categorisation: dso.cvss.CveCategorisation) ->


@dataclasses.dataclass(frozen=True)
class CveRescoringRuleSet:
'''
Represents configuration for a single rescoring rule set.
A single rule set is used to perform a CVSS rescoring according to a well-defined contract.
One of the rule sets can be defined as default, used as fallback for rescoring if no rule set is
specified.
See rescoring documentation for more details:
TODO: publish rescoring-docs (removed SAP-internal reference prior to open-sourcing)
'''
class RuleSet[T: Rule]:
name: str
description: str
rules: list[RescoringRule]
type: RuleSetType
rules: list[T]
description: str | None = None


class CveRescoringRuleSet(RuleSet[CveRescoringRule]):
pass


def find_rule_set_for_type(
rule_sets: tuple[RuleSet],
rule_set_type: str,
absent_ok: bool=True,
) -> RuleSet | None:
for rule_set in rule_sets:
if rule_set.type == rule_set_type:
return rule_set
if not absent_ok:
raise ValueError(f'No rule set found for {rule_set_type=}')


def rescoring_rules_from_dicts(rules: list[dict]) -> typing.Generator[RescoringRule, None, None]:
def cve_rescoring_rules_from_dicts(
rules: list[dict]
) -> typing.Generator[CveRescoringRule, None, None]:
'''
deserialises rescoring rules. Each dict is expected to have the following form:
deserialises cve_rescoring rules. Each dict is expected to have the following form:
category_value: <CveCategorisation-attr>:<value>
name: <str> (optional)
Expand All @@ -158,7 +177,7 @@ def rescoring_rules_from_dicts(rules: list[dict]) -> typing.Generator[RescoringR
rescore = subrule['rescore']

yield dacite.from_dict(
data_class=RescoringRule,
data_class=CveRescoringRule,
data={
'category_value': category_value,
'cve_values': cve_values,
Expand Down
6 changes: 3 additions & 3 deletions rescore/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,10 @@ def rescorings_for_finding_by_specificity(


def matching_rescore_rules(
rescoring_rules: typing.Iterable[rescore.model.RescoringRule],
rescoring_rules: typing.Iterable[rescore.model.CveRescoringRule],
categorisation: dso.cvss.CveCategorisation,
cvss: dso.cvss.CVSSV3 | dict,
) -> typing.Generator[rescore.model.RescoringRule, None, None]:
) -> typing.Generator[rescore.model.CveRescoringRule, None, None]:
for rescoring_rule in rescoring_rules:
if not rescoring_rule.matches_categorisation(categorisation):
continue
Expand All @@ -156,7 +156,7 @@ def matching_rescore_rules(


def rescore_severity(
rescoring_rules: typing.Iterable[rescore.model.RescoringRule],
rescoring_rules: typing.Iterable[rescore.model.CveRescoringRule],
severity: dso.cvss.CVESeverity,
minimum_severity: int=dso.cvss.CVESeverity.NONE,
) -> dso.cvss.CVESeverity:
Expand Down
Loading

0 comments on commit 4b277d4

Please sign in to comment.