From c98e6e0402d508699ff0bb56e4a8c555a5e0545b Mon Sep 17 00:00:00 2001 From: Gregor Lichtner Date: Tue, 6 Aug 2024 21:03:27 +0200 Subject: [PATCH 1/2] feat: added conditional filter (for dosage condition ext) --- .../converter/action/drug_administration.py | 46 +- execution_engine/execution_graph/graph.py | 3 + .../omop/criterion/combination/logical.py | 143 +++++- execution_engine/omop/criterion/factory.py | 2 + execution_engine/task/task.py | 53 ++- execution_engine/util/cohort_logic.py | 25 ++ execution_engine/util/interval.py | 44 +- tests/_fixtures/concept.py | 11 + .../combination/test_logical_combination.py | 409 +++++++++++++++++- 9 files changed, 674 insertions(+), 62 deletions(-) diff --git a/execution_engine/converter/action/drug_administration.py b/execution_engine/converter/action/drug_administration.py index 425c3754..3aa0f34d 100644 --- a/execution_engine/converter/action/drug_administration.py +++ b/execution_engine/converter/action/drug_administration.py @@ -18,6 +18,7 @@ from execution_engine.omop.criterion.abstract import Criterion from execution_engine.omop.criterion.combination.logical import ( LogicalCriterionCombination, + NonCommutativeLogicalCriterionCombination, ) from execution_engine.omop.criterion.drug_exposure import DrugExposure from execution_engine.omop.criterion.point_in_time import PointInTimeCriterion @@ -37,6 +38,7 @@ class ExtensionType(TypedDict): code: Concept value: Value + type: str class DrugAdministrationAction(AbstractAction): @@ -223,7 +225,7 @@ def process_dosage_extensions(cls, dosage: FHIRDosage) -> list[ExtensionType]: code = parse_code(condition_type.valueCodeableConcept) value = parse_value(condition_value, "value") - extensions.append({"code": code, "value": value}) + extensions.append({"code": code, "value": value, "type": "conditional"}) else: raise NotImplementedError(f"Unknown dosage extension {extension.url}") @@ -298,28 +300,38 @@ def _to_criterion(self) -> Criterion | LogicalCriterionCombination | None: route=dosage.get("route", None), ) - if dosage.get("extensions", None) is None: + extensions = dosage.get("extensions", None) + if extensions is None or len(extensions) == 0: drug_actions.append(drug_action) else: - comb = LogicalCriterionCombination( - exclude=drug_action.exclude, # need to pull up the exclude flag from the criterion into the combination - category=CohortCategory.INTERVENTION, - operator=LogicalCriterionCombination.Operator("AND"), - ) - drug_action.exclude = False # reset the exclude flag, as it is now part of the combination + if len(extensions) > 1: + raise NotImplementedError( + "Multiple extensions in dosage not supported yet" + ) - comb.add(drug_action) + extension = extensions[0] - for extension in dosage["extensions"]: # type: ignore # (extension is not None as per the if-block) - comb.add( - PointInTimeCriterion( - exclude=False, # extensions are always included (at least for now) - category=CohortCategory.INTERVENTION, - concept=extension["code"], - value=extension["value"], - ) + if extension["type"] != "conditional": + raise NotImplementedError( + f"Extension type {extension['type']} not supported yet" ) + drug_action.exclude = False # reset the exclude flag, as it is now part of the combination + + ext_criterion = PointInTimeCriterion( + exclude=False, # extensions are always included (at least for now) + category=CohortCategory.INTERVENTION, + concept=extension["code"], + value=extension["value"], + ) + + comb = NonCommutativeLogicalCriterionCombination.ConditionalFilter( + exclude=drug_action.exclude, # need to pull up the exclude flag from the criterion into the combination + category=CohortCategory.INTERVENTION, + left=ext_criterion, + right=drug_action, + ) + drug_actions.append(comb) if len(drug_actions) == 1: diff --git a/execution_engine/execution_graph/graph.py b/execution_engine/execution_graph/graph.py index 7c3d5f78..03e40544 100644 --- a/execution_engine/execution_graph/graph.py +++ b/execution_engine/execution_graph/graph.py @@ -8,6 +8,7 @@ from execution_engine.omop.criterion.combination.combination import CriterionCombination from execution_engine.omop.criterion.combination.logical import ( LogicalCriterionCombination, + NonCommutativeLogicalCriterionCombination, ) from execution_engine.omop.criterion.combination.temporal import ( TemporalIndicatorCombination, @@ -381,6 +382,8 @@ def conjunction_from_combination( f"Expected {LogicalCriterionCombination.Operator.AND}" ) return logic.NonSimplifiableAnd + elif isinstance(comb, NonCommutativeLogicalCriterionCombination): + return logic.ConditionalFilter elif comb.operator.operator == LogicalCriterionCombination.Operator.AND: return logic.And elif comb.operator.operator == LogicalCriterionCombination.Operator.OR: diff --git a/execution_engine/omop/criterion/combination/logical.py b/execution_engine/omop/criterion/combination/logical.py index eeff0cd1..8d49e4ae 100644 --- a/execution_engine/omop/criterion/combination/logical.py +++ b/execution_engine/omop/criterion/combination/logical.py @@ -1,4 +1,4 @@ -from typing import Union +from typing import Any, Dict, Iterator, Union from execution_engine.constants import CohortCategory from execution_engine.omop.criterion.abstract import Criterion @@ -141,3 +141,144 @@ def AllOrNone( category=category, criteria=criteria, ) + + +class NonCommutativeLogicalCriterionCombination(LogicalCriterionCombination): + """ + A combination of criteria that is not commutative. + """ + + _left: Union[Criterion, CriterionCombination] + _right: Union[Criterion, CriterionCombination] + + class Operator(CriterionCombination.Operator): + """Operators for criterion combinations.""" + + CONDITIONAL_FILTER = "CONDITIONAL_FILTER" + + def __init__(self, operator: str, threshold: int | None = None): + assert operator in [ + "CONDITIONAL_FILTER", + ], f"Invalid operator {operator}" + + self.operator = operator + + def __init__( + self, + exclude: bool, + operator: "NonCommutativeLogicalCriterionCombination.Operator", + category: CohortCategory, + left: Union[Criterion, CriterionCombination] | None = None, + right: Union[Criterion, CriterionCombination] | None = None, + root_combination: bool = False, + ): + """ + Initialize the criterion combination. + """ + super().__init__(exclude=exclude, operator=operator, category=category) + + self._criteria = [] + if left is not None: + self._left = left + if right is not None: + self._right = right + + self._root = root_combination + + @property + def left(self) -> Union[Criterion, CriterionCombination]: + """ + Get the left criterion. + """ + return self._left + + @property + def right(self) -> Union[Criterion, CriterionCombination]: + """ + Get the right criterion. + """ + return self._right + + def __str__(self) -> str: + """ + Get the string representation of the criterion combination. + """ + return f"{self.operator}({', '.join(str(c) for c in self._criteria)})" + + def __repr__(self) -> str: + """ + Get the string representation of the criterion combination. + """ + return f'{self.__class__.__name__}("{self.operator}", {self._criteria})' + + def __eq__(self, other: object) -> bool: + """ + Check if the criterion combination is equal to another criterion combination. + """ + if not isinstance(other, NonCommutativeLogicalCriterionCombination): + return NotImplemented + return ( + self.operator == other.operator + and self._left == other._left + and self._right == other._right + and self.exclude == other.exclude + and self.category == other.category + ) + + def __iter__(self) -> Iterator[Union[Criterion, "CriterionCombination"]]: + """ + Iterate over the criteria in the combination. + """ + yield self._left + yield self._right + + def dict(self) -> dict: + """ + Get the dictionary representation of the criterion combination. + """ + return { + "operator": self._operator.operator, + "exclude": self.exclude, + "category": self.category.value, + "left": self._left.dict(), + "right": self._right.dict(), + } + + @classmethod + def from_dict( + cls, data: Dict[str, Any] + ) -> "NonCommutativeLogicalCriterionCombination": + """ + Create a criterion combination from a dictionary. + """ + + from execution_engine.omop.criterion.factory import ( + criterion_factory, # needs to be here to avoid circular import + ) + + return cls( + exclude=data["exclude"], + operator=cls.Operator(data["operator"]), + category=CohortCategory(data["category"]), + left=criterion_factory(**data["left"]), + right=criterion_factory(**data["right"]), + ) + + @classmethod + def ConditionalFilter( + cls, + left: Union[Criterion, "CriterionCombination"], + right: Union[Criterion, "CriterionCombination"], + category: CohortCategory, + exclude: bool = False, + ) -> "LogicalCriterionCombination": + """ + Create a CONDITIONAL_FILTER combination of criteria. + """ + return cls( + exclude=exclude, + operator=cls.Operator(cls.Operator.CONDITIONAL_FILTER), + category=category, + left=left, + right=right, + ) diff --git a/execution_engine/omop/criterion/factory.py b/execution_engine/omop/criterion/factory.py index 2eaa2994..779e75cb 100644 --- a/execution_engine/omop/criterion/factory.py +++ b/execution_engine/omop/criterion/factory.py @@ -4,6 +4,7 @@ from execution_engine.omop.criterion.combination.combination import CriterionCombination from execution_engine.omop.criterion.combination.logical import ( LogicalCriterionCombination, + NonCommutativeLogicalCriterionCombination, ) from execution_engine.omop.criterion.combination.temporal import ( TemporalIndicatorCombination, @@ -29,6 +30,7 @@ "ConceptCriterion": ConceptCriterion, "LogicalCriterionCombination": LogicalCriterionCombination, "TemporalCriterionCombination": TemporalIndicatorCombination, + "NonCommutativeLogicalCriterionCombination": NonCommutativeLogicalCriterionCombination, "ConditionOccurrence": ConditionOccurrence, "DrugExposure": DrugExposure, "Measurement": Measurement, diff --git a/execution_engine/task/task.py b/execution_engine/task/task.py index 2d2369d6..af1a30a2 100644 --- a/execution_engine/task/task.py +++ b/execution_engine/task/task.py @@ -160,7 +160,9 @@ def run( ), ): result = self.handle_binary_logical_operator(data) - elif isinstance(self.expr, logic.LeftDependentToggle): + elif isinstance( + self.expr, (logic.LeftDependentToggle, logic.ConditionalFilter) + ): result = self.handle_left_dependent_toggle( left=data[0], right=data[1], @@ -231,6 +233,18 @@ def handle_criterion( logging.debug(f"Processing data - '{criterion.description()}'") data = criterion.process_data(data, base_data, observation_window) + # fill remaining time with NEGATIVE (this is required as otherwise an AND operation on a NO_DATA and + # a non-existing (i.e. not-filled) interval yields NO_DATA, but should actually be NEGATIVE + if base_data is not None: + data_negative = process.complementary_intervals( + data, + reference=base_data, + observation_window=observation_window, + interval_type=IntervalType.NEGATIVE, + ) + + data = process.concat_intervals([data, data_negative]) + return data def handle_unary_logical_operator( @@ -344,9 +358,13 @@ def handle_left_dependent_toggle( observation_window: TimeRange, ) -> PersonIntervals: """ - Handles a left dependent toggle by merging the intervals of the left dependency with the intervals of the - right dependency according to the following rules: + Handles a left dependent toggle or a conditional filter by merging the intervals of the left dependency with the + intervals of the right dependency according to the following rules. The difference between the two is that + the conditional filter yields NEGATIVE if left dependence is NEGATIVE or NO_DATA, while the left dependent + toggle yields NOT_APPLICABLE in these cases. + LeftDependentToggle + ------------------- - If P is NEGATIVE or NO_DATA, the result is NOT_APPLICABLE (NO_DATA: because we cannot decide whether the recommendation is applicable or not). - If P is POSITIVE, the result is: @@ -364,6 +382,24 @@ def handle_left_dependent_toggle( | POSITIVE | NEGATIVE | NEGATIVE | | POSITIVE | NO_DATA | NO_DATA | + ConditionalFilter + ------------------- + - If P is NEGATIVE or NO_DATA, the result is NEGATIVE. + - If P is POSITIVE, the result is: + - POSITIVE if I is POSITIVE + - NEGATIVE if I is NEGATIVE + - NO_DATA if I is NO_DATA + + In tabular form: + + | P | I | Result | + |---|---|--------| + | NEGATIVE | * | NEGATIVE | + | NO_DATA | * | NEGATIVE | + | POSITIVE | POSITIVE | POSITIVE | + | POSITIVE | NEGATIVE | NEGATIVE | + | POSITIVE | NO_DATA | NO_DATA | + :param left: The intervals of the left dependency (the one that determines whether the right dependency is returned). :param right: The intervals of the right dependency (the one that is taken when the left dependency is @@ -373,19 +409,24 @@ def handle_left_dependent_toggle( :return: A DataFrame with the merged intervals. """ assert isinstance( - self.expr, logic.LeftDependentToggle - ), "Dependency is not a LeftDependentToggle expression." + self.expr, (logic.LeftDependentToggle, logic.ConditionalFilter) + ), "Dependency is not a LeftDependentToggle or ConditionalFilter expression." # data[0] is the left dependency (i.e. P) # data[1] is the right dependency (i.e. I) data_p = process.select_type(left, IntervalType.POSITIVE) + if isinstance(self.expr, logic.LeftDependentToggle): + interval_type = IntervalType.NOT_APPLICABLE + elif isinstance(self.expr, logic.ConditionalFilter): + interval_type = IntervalType.NEGATIVE + result_not_p = process.complementary_intervals( data_p, reference=base_data, observation_window=observation_window, - interval_type=IntervalType.NOT_APPLICABLE, + interval_type=interval_type, ) result_p_and_i = process.intersect_intervals([data_p, right]) diff --git a/execution_engine/util/cohort_logic.py b/execution_engine/util/cohort_logic.py index 3a430269..5bc1ab22 100644 --- a/execution_engine/util/cohort_logic.py +++ b/execution_engine/util/cohort_logic.py @@ -717,3 +717,28 @@ def left(self) -> Expr: def right(self) -> Expr: """Returns the right operand""" return self.args[1] + + +class ConditionalFilter(BooleanFunction): + """ + A ConditionalFilter object returns the right operand if the left operand is POSITIVE, + and NEGATIVE otherwise + """ + + def __new__( + cls, left: BaseExpr, right: BaseExpr, **kwargs: Any + ) -> "ConditionalFilter": + """ + Initialize a ConditionalFilter object. + """ + return cast(ConditionalFilter, super().__new__(cls, left, right, **kwargs)) + + @property + def left(self) -> Expr: + """Returns the left operand""" + return self.args[0] + + @property + def right(self) -> Expr: + """Returns the right operand""" + return self.args[1] diff --git a/execution_engine/util/interval.py b/execution_engine/util/interval.py index debe2437..c70f8cba 100644 --- a/execution_engine/util/interval.py +++ b/execution_engine/util/interval.py @@ -72,14 +72,14 @@ class IntervalType(StrEnum): __union_priority_order: list[str] = [POSITIVE, NO_DATA, NOT_APPLICABLE, NEGATIVE] """Union priority order starting with the highest priority.""" - # until Jan 13: + # until Jan 13, 2024: # POSITIVE has higher priority than NO_DATA, as in measurements we return NO_DATA intervals for all intervals # inbetween measurements (and outside), and these are &-ed with the POSITIVE intervals for e.g. conditions. # previously, NO_DATA was higher priority than POSITIVE because in POPULATION_INTERVENTION, when the POPULATION is # POSITIVE but the INTERVENTION is NO_DATA, the result _should_ be NO_DATA (but is currently POSITIVE) - this is now # handled in the LeftDependentToggle handler in task.py - # from Jan 13: + # from Jan 13, 2024: # Now we have a problem in 36a PEEP: When the conditions (e.g. COVID 19) are POSITIVE but the measurement of FiO2 # is NO_DATA, the population is considered POSITIVE, although it should be NO_DATA # conversely in rec35 tidal volume we have two measurements: tidal volume and pplateau. If tidal volume is NO_DATA @@ -455,18 +455,22 @@ def sort_indices(_intervals: list[Atomic]) -> list[int]: a.left, a.lower, b.lower, - Bound.OPEN - if b.left is Bound.CLOSED - else Bound.CLOSED, + ( + Bound.OPEN + if b.left is Bound.CLOSED + else Bound.CLOSED + ), a.type, ) ), b, Atomic( *self._process_atomic( - Bound.OPEN - if b.right is Bound.CLOSED - else Bound.CLOSED, + ( + Bound.OPEN + if b.right is Bound.CLOSED + else Bound.CLOSED + ), b.upper, a.upper, a.right, @@ -489,9 +493,11 @@ def sort_indices(_intervals: list[Atomic]) -> list[int]: a.left, a.lower, b.lower, - Bound.OPEN - if b.left is Bound.CLOSED - else Bound.CLOSED, + ( + Bound.OPEN + if b.left is Bound.CLOSED + else Bound.CLOSED + ), a.type, ) ) @@ -504,9 +510,11 @@ def sort_indices(_intervals: list[Atomic]) -> list[int]: unions.append( Atomic( *self._process_atomic( - Bound.OPEN - if b.right is Bound.CLOSED - else Bound.CLOSED, + ( + Bound.OPEN + if b.right is Bound.CLOSED + else Bound.CLOSED + ), b.upper, a.upper, a.right, @@ -920,9 +928,11 @@ def ffill(self) -> "IntervalWithType": cur_interval.left, cur_interval.lower, next_interval.lower, - Bound.OPEN - if next_interval.left == Bound.CLOSED - else Bound.CLOSED, + ( + Bound.OPEN + if next_interval.left == Bound.CLOSED + else Bound.CLOSED + ), cur_interval.type, ) ) diff --git a/tests/_fixtures/concept.py b/tests/_fixtures/concept.py index ba6f6b30..3660f7c5 100644 --- a/tests/_fixtures/concept.py +++ b/tests/_fixtures/concept.py @@ -250,6 +250,17 @@ def unit_concept(): invalid_reason=None, ) +concept_body_weight = Concept( + concept_id=3025315, + concept_name="Body weight", + domain_id="Measurement", + vocabulary_id="LOINC", + concept_class_id="Clinical Observation", + standard_concept="S", + concept_code="29463-7", + invalid_reason=None, +) + """ The following list of concepts are heparin drugs and all of them directly map to heparin as ingredient (via ancestor, not relationship !) diff --git a/tests/execution_engine/omop/criterion/combination/test_logical_combination.py b/tests/execution_engine/omop/criterion/combination/test_logical_combination.py index 95e1b066..251c3a43 100644 --- a/tests/execution_engine/omop/criterion/combination/test_logical_combination.py +++ b/tests/execution_engine/omop/criterion/combination/test_logical_combination.py @@ -8,24 +8,31 @@ from execution_engine.constants import CohortCategory from execution_engine.omop.criterion.combination.logical import ( LogicalCriterionCombination, + NonCommutativeLogicalCriterionCombination, ) from execution_engine.omop.criterion.condition_occurrence import ConditionOccurrence from execution_engine.omop.criterion.drug_exposure import DrugExposure +from execution_engine.omop.criterion.measurement import Measurement from execution_engine.omop.criterion.procedure_occurrence import ProcedureOccurrence from execution_engine.task.process import get_processing_module from execution_engine.util.types import Dosage, TimeRange from execution_engine.util.value import ValueNumber from tests._fixtures.concept import ( concept_artificial_respiration, + concept_body_weight, concept_covid19, concept_heparin_ingredient, + concept_tidal_volume, + concept_unit_kg, concept_unit_mg, + concept_unit_ml, ) from tests._testdata import concepts from tests.execution_engine.omop.criterion.test_criterion import TestCriterion, date_set from tests.functions import ( create_condition, create_drug_exposure, + create_measurement, create_procedure, create_visit, ) @@ -269,9 +276,10 @@ def run_criteria_test( elif c.func == sympy.Or: operator = LogicalCriterionCombination.Operator.OR elif isinstance(c.func, sympy.core.function.UndefinedFunction): - assert args[0].is_number, "First argument must be a number (threshold)" - threshold = args[0] - args = args[1:] + if c.func.name in ["MinCount", "MaxCount", "ExactCount"]: + assert args[0].is_number, "First argument must be a number (threshold)" + threshold = args[0] + args = args[1:] if c.func.name == "MinCount": operator = LogicalCriterionCombination.Operator.AT_LEAST elif c.func.name == "MaxCount": @@ -280,6 +288,8 @@ def run_criteria_test( operator = LogicalCriterionCombination.Operator.EXACTLY elif c.func.name == "AllOrNone": operator = LogicalCriterionCombination.Operator.ALL_OR_NONE + elif c.func.name == "ConditionalFilter": + operator = None else: raise ValueError(f"Unknown operator {c.func}") else: @@ -287,6 +297,8 @@ def run_criteria_test( c1, c2, c3 = [c.copy() for c in criteria] + symbols = {"c1": c1, "c2": c2, "c3": c3} + for arg in args: if arg.is_Not: if arg.args[0].name == "c1": @@ -298,25 +310,30 @@ def run_criteria_test( else: raise ValueError(f"Unknown criterion {arg.args[0].name}") - comb = LogicalCriterionCombination( - exclude=exclude, - category=CohortCategory.POPULATION, - operator=LogicalCriterionCombination.Operator( - operator, threshold=threshold - ), - ) + if hasattr(c.func, "name") and c.func.name == "ConditionalFilter": + assert len(c.args) == 2 - for symbol in c.atoms(): - if symbol.is_number: - continue - elif symbol.name == "c1": - comb.add(c1) - elif symbol.name == "c2": - comb.add(c2) - elif symbol.name == "c3": - comb.add(c3) - else: - raise ValueError(f"Unknown criterion {symbol.name}") + comb = NonCommutativeLogicalCriterionCombination.ConditionalFilter( + exclude=exclude, + category=CohortCategory.POPULATION, + left=symbols[str(c.args[0])], + right=symbols[str(c.args[1])], + ) + + else: + comb = LogicalCriterionCombination( + exclude=exclude, + category=CohortCategory.POPULATION, + operator=LogicalCriterionCombination.Operator( + operator, threshold=threshold + ), + ) + + for symbol in c.atoms(): + if symbol.is_number: + continue + else: + comb.add(symbols[str(symbol)]) self.insert_criterion_combination( db_session, comb, base_criterion, observation_window @@ -645,3 +662,353 @@ def test_overlapping_combination_on_database( observation_window, persons, ) + + +class TestCriterionCombinationNoData(TestCriterionCombinationDatabase): + """ + Test class for testing criterion combinations on the database. + + The purpose of this class is to test the behavior for a combination like + NOT(AND(c1, c2)), where c1 returns NO_DATA intervals and c2 returns nothing. + """ + + @pytest.fixture + def observation_window(self) -> TimeRange: + return TimeRange( + start="2023-03-01 04:00:00Z", end="2023-03-04 18:00:00Z", name="observation" + ) + + @pytest.fixture + def criteria(self, db_session): + c1 = DrugExposure( + exclude=False, + category=CohortCategory.POPULATION, + ingredient_concept=concept_heparin_ingredient, + dose=Dosage( + dose=ValueNumber(value=100, unit=concept_unit_mg), + frequency=1, + interval="d", + ), + route=None, + ) + + c2 = Measurement( + exclude=False, + category=CohortCategory.POPULATION, + concept=concept_tidal_volume, + value=ValueNumber(value_min=500, unit=concept_unit_ml), + ) + + c3 = Measurement( + exclude=False, + category=CohortCategory.POPULATION, + concept=concept_body_weight, + value=ValueNumber(value_min=70, unit=concept_unit_kg), + ) + + c1.id = 1 + c2.id = 2 + c3.id = 3 + + self.register_criterion(c1, db_session) + self.register_criterion(c2, db_session) + self.register_criterion(c3, db_session) + + return [c1, c2, c3] + + @pytest.fixture + def patient_events(self, db_session, person_visit): + # no patient events added + pass + + @pytest.mark.parametrize( + "combination,expected", + [ + ("c1 & c2", {1: {}, 2: {}, 3: {}}), + ("c1 | c2", {1: {}, 2: {}, 3: {}}), + ( + "~(c1 & c2)", + { + 1: { + "2023-03-01", + "2023-03-02", + "2023-03-03", + "2023-03-04", + }, # admitted on 2023-03-01 + 2: { + "2023-03-02", + "2023-03-03", + "2023-03-04", + }, # admitted on 2023-03-02 + 3: {"2023-03-03", "2023-03-04"}, # admitted on 2023-03-03 + }, + ), + ( + "~(c1 | c2)", # c1 is NEGATIVE, c2 is NO_DATA -> c1 | c2 is still NO_DATA + # -> ~(c1 | c2) = ~c1 & ~c2 = (POSITIVE & NO_DATA) = NO_DATA + # -> interpreted as "NEGATIVE" in the final result (no POSITIVE intervals there) + { + 1: {}, + 2: {}, + 3: {}, + }, + ), + ("~c1 & c2", {1: {}, 2: {}, 3: {}}), + ( + "~c1 | c2", # c1 is NEGATIVE, c2 is NO_DATA -> ~c1 | c2 = POSITIVE | NO_DATA = POSITIVE + { + 1: {"2023-03-01", "2023-03-02", "2023-03-03", "2023-03-04"}, + 2: {"2023-03-02", "2023-03-03", "2023-03-04"}, + 3: {"2023-03-03", "2023-03-04"}, + }, + ), + ("c1 & ~c2", {1: {}, 2: {}, 3: {}}), + ( + "c1 | ~c2", # c1 is NEGATIVE, c2 is NO_DATA -> c1 | ~c2 = NEGATIVE | NO_DATA = NO_DATA -> interpreted as "NEGATIVE" in the final result (no POSITIVE intervals there) + { + 1: {}, + 2: {}, + 3: {}, + }, + ), + ], + ) + def test_combination_on_database( + self, + person_visit, + db_session, + base_criterion, + patient_events, + criteria, + combination, + expected, + observation_window, + ): + persons = [pv[0] for pv in person_visit] + self.run_criteria_test( + combination, + expected, + db_session, + criteria, + base_criterion, + observation_window, + persons, + ) + + +class TestCriterionCombinationConditionalFilter(TestCriterionCombinationDatabase): + """ + Test class for testing criterion combinations on the database. + + The purpose of this class is to test the behavior for a combination like + + NOT(AND(c1, c2)), where c1 returns NO_DATA intervals and c2 returns nothing. + + The result should be a positive interval (because "nothing" is interpreted as NEGATIVE, and ~(NO_DATA & NEGATIVE) + should be positive. This is a regression test related to bug found when processing body-weight-related drug + combinations (in rec17) when no body weight is given. + """ + + @pytest.fixture + def observation_window(self) -> TimeRange: + return TimeRange( + start="2023-03-01 04:00:00Z", end="2023-03-04 18:00:00Z", name="observation" + ) + + @pytest.fixture + def criteria(self, db_session): + c1 = DrugExposure( + exclude=False, + category=CohortCategory.POPULATION, + ingredient_concept=concept_heparin_ingredient, + dose=Dosage( + dose=ValueNumber(value=100, unit=concept_unit_mg), + frequency=1, + interval="d", + ), + route=None, + ) + + c2 = Measurement( + exclude=False, + category=CohortCategory.POPULATION, + concept=concept_body_weight, + value=ValueNumber(value_min=70, unit=concept_unit_kg), + ) + + c3 = ConditionOccurrence( + exclude=False, + category=CohortCategory.POPULATION, + concept=concept_covid19, + ) + + c1.id = 1 + c2.id = 2 + c3.id = 3 + + self.register_criterion(c1, db_session) + self.register_criterion(c2, db_session) + self.register_criterion(c3, db_session) + + return [c1, c2, c3] + + @pytest.fixture + def patient_events(self, db_session, person_visit): + _, visit_occurrence = person_visit[0] + e1 = create_drug_exposure( + vo=visit_occurrence, + drug_concept_id=concept_heparin_ingredient.concept_id, + start_datetime=pendulum.parse( + "2023-03-03 18:00:00+01:00" + ), # -> this results in full day positive (because drug exposure with quantity 50 on this day) + end_datetime=pendulum.parse( + "2023-03-03 18:00:00+01:00" + ), # -> this results in full day positive (because drug exposure with quantity 50 on this day) + quantity=100, + ) + + e2 = create_drug_exposure( + vo=visit_occurrence, + drug_concept_id=concept_heparin_ingredient.concept_id, + start_datetime=pendulum.parse( + "2023-03-04 06:00:00+01:00" + ), # -> this results in full day positive (because drug exposure with quantity 50 on this day) + end_datetime=pendulum.parse( + "2023-03-04 06:00:00+01:00" + ), # -> this results in full day positive (because drug exposure with quantity 50 on this day) + quantity=100, + ) + + e3 = create_measurement( + vo=visit_occurrence, + measurement_concept_id=concept_body_weight.concept_id, + measurement_datetime=pendulum.parse("2023-03-04 05:00:00+01:00"), + value_as_number=70, + unit_concept_id=concept_unit_kg.concept_id, + ) + + db_session.add_all([e1, e2, e3]) + + ##################################### + _, visit_occurrence = person_visit[1] + e1 = create_drug_exposure( + vo=visit_occurrence, + drug_concept_id=concept_heparin_ingredient.concept_id, + start_datetime=pendulum.parse( + "2023-03-03 18:00:00+01:00" + ), # -> this results in full day positive (because drug exposure with quantity 50 on this day) + end_datetime=pendulum.parse( + "2023-03-03 18:00:00+01:00" + ), # -> this results in full day positive (because drug exposure with quantity 50 on this day) + quantity=100, + ) + + e2 = create_drug_exposure( + vo=visit_occurrence, + drug_concept_id=concept_heparin_ingredient.concept_id, + start_datetime=pendulum.parse( + "2023-03-04 06:00:00+01:00" + ), # -> this results in full day positive (because drug exposure with quantity 50 on this day) + end_datetime=pendulum.parse( + "2023-03-04 06:00:00+01:00" + ), # -> this results in full day positive (because drug exposure with quantity 50 on this day) + quantity=100, + ) + + db_session.add_all([e1, e2]) + + ##################################### + _, visit_occurrence = person_visit[2] + e1 = create_drug_exposure( + vo=visit_occurrence, + drug_concept_id=concept_heparin_ingredient.concept_id, + start_datetime=pendulum.parse( + "2023-03-03 18:00:00+01:00" + ), # -> this results in full day positive (because drug exposure with quantity 50 on this day) + end_datetime=pendulum.parse( + "2023-03-03 18:00:00+01:00" + ), # -> this results in full day positive (because drug exposure with quantity 50 on this day) + quantity=100, + ) + + e2 = create_drug_exposure( + vo=visit_occurrence, + drug_concept_id=concept_heparin_ingredient.concept_id, + start_datetime=pendulum.parse( + "2023-03-04 06:00:00+01:00" + ), # -> this results in full day positive (because drug exposure with quantity 50 on this day) + end_datetime=pendulum.parse( + "2023-03-04 06:00:00+01:00" + ), # -> this results in full day positive (because drug exposure with quantity 50 on this day) + quantity=100, + ) + + e3 = create_condition( + vo=visit_occurrence, + condition_concept_id=concept_covid19.concept_id, + condition_start_datetime=pendulum.parse("2023-03-03 18:00:00+01:00"), + condition_end_datetime=pendulum.parse("2023-03-06 18:00:00+01:00"), + ) + + db_session.add_all([e1, e2, e3]) + + db_session.commit() + + @pytest.mark.parametrize( + "combination,expected", + [ + ( + "ConditionalFilter(c1, c2)", + { + 1: {"2023-03-03", "2023-03-04"}, # admitted on 2023-03-01 + 2: {}, # admitted on 2023-03-02 + 3: {}, # admitted on 2023-03-03 + }, + ), + ( + "ConditionalFilter(c2, c1)", + { + 1: {"2023-03-03", "2023-03-04"}, # admitted on 2023-03-01 + 2: {}, # admitted on 2023-03-02 + 3: {}, # admitted on 2023-03-03 + }, + ), + ( + "ConditionalFilter(c1, c3)", + { + 1: {}, # admitted on 2023-03-01 + 2: {}, # admitted on 2023-03-02 + 3: {"2023-03-04"}, # admitted on 2023-03-03 + }, + ), + ( + "ConditionalFilter(c3, c1)", + { + 1: {}, # admitted on 2023-03-01 + 2: {}, # admitted on 2023-03-02 + 3: {"2023-03-04"}, # admitted on 2023-03-03 + }, + ), + ], + ) + def test_combination_on_database( + self, + person_visit, + db_session, + base_criterion, + patient_events, + criteria, + combination, + expected, + observation_window, + ): + persons = [pv[0] for pv in person_visit] + self.run_criteria_test( + combination, + expected, + db_session, + criteria, + base_criterion, + observation_window, + persons, + ) From abb94ddad1ba7e83b47c62356f63cac62f5afe4d Mon Sep 17 00:00:00 2001 From: Gregor Lichtner Date: Tue, 6 Aug 2024 21:11:16 +0200 Subject: [PATCH 2/2] revert: no explicit adding of negative intervals --- execution_engine/task/task.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/execution_engine/task/task.py b/execution_engine/task/task.py index af1a30a2..f40bba9a 100644 --- a/execution_engine/task/task.py +++ b/execution_engine/task/task.py @@ -233,18 +233,6 @@ def handle_criterion( logging.debug(f"Processing data - '{criterion.description()}'") data = criterion.process_data(data, base_data, observation_window) - # fill remaining time with NEGATIVE (this is required as otherwise an AND operation on a NO_DATA and - # a non-existing (i.e. not-filled) interval yields NO_DATA, but should actually be NEGATIVE - if base_data is not None: - data_negative = process.complementary_intervals( - data, - reference=base_data, - observation_window=observation_window, - interval_type=IntervalType.NEGATIVE, - ) - - data = process.concat_intervals([data, data_negative]) - return data def handle_unary_logical_operator(