Skip to content

Commit

Permalink
Merge pull request #187 from CODEX-CELIDA/feat/no-data-inversion
Browse files Browse the repository at this point in the history
Feat/no data inversion
  • Loading branch information
glichtner authored Aug 7, 2024
2 parents 911070a + abb94dd commit 9a28c1b
Show file tree
Hide file tree
Showing 9 changed files with 662 additions and 62 deletions.
46 changes: 29 additions & 17 deletions execution_engine/converter/action/drug_administration.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from execution_engine.omop.criterion.abstract import Criterion
from execution_engine.omop.criterion.combination.logical import (
LogicalCriterionCombination,
NonCommutativeLogicalCriterionCombination,
)
from execution_engine.omop.criterion.drug_exposure import DrugExposure
from execution_engine.omop.criterion.point_in_time import PointInTimeCriterion
Expand All @@ -37,6 +38,7 @@ class ExtensionType(TypedDict):

code: Concept
value: Value
type: str


class DrugAdministrationAction(AbstractAction):
Expand Down Expand Up @@ -223,7 +225,7 @@ def process_dosage_extensions(cls, dosage: FHIRDosage) -> list[ExtensionType]:
code = parse_code(condition_type.valueCodeableConcept)
value = parse_value(condition_value, "value")

extensions.append({"code": code, "value": value})
extensions.append({"code": code, "value": value, "type": "conditional"})
else:
raise NotImplementedError(f"Unknown dosage extension {extension.url}")

Expand Down Expand Up @@ -298,28 +300,38 @@ def _to_criterion(self) -> Criterion | LogicalCriterionCombination | None:
route=dosage.get("route", None),
)

if dosage.get("extensions", None) is None:
extensions = dosage.get("extensions", None)
if extensions is None or len(extensions) == 0:
drug_actions.append(drug_action)
else:
comb = LogicalCriterionCombination(
exclude=drug_action.exclude, # need to pull up the exclude flag from the criterion into the combination
category=CohortCategory.INTERVENTION,
operator=LogicalCriterionCombination.Operator("AND"),
)
drug_action.exclude = False # reset the exclude flag, as it is now part of the combination
if len(extensions) > 1:
raise NotImplementedError(
"Multiple extensions in dosage not supported yet"
)

comb.add(drug_action)
extension = extensions[0]

for extension in dosage["extensions"]: # type: ignore # (extension is not None as per the if-block)
comb.add(
PointInTimeCriterion(
exclude=False, # extensions are always included (at least for now)
category=CohortCategory.INTERVENTION,
concept=extension["code"],
value=extension["value"],
)
if extension["type"] != "conditional":
raise NotImplementedError(
f"Extension type {extension['type']} not supported yet"
)

drug_action.exclude = False # reset the exclude flag, as it is now part of the combination

ext_criterion = PointInTimeCriterion(
exclude=False, # extensions are always included (at least for now)
category=CohortCategory.INTERVENTION,
concept=extension["code"],
value=extension["value"],
)

comb = NonCommutativeLogicalCriterionCombination.ConditionalFilter(
exclude=drug_action.exclude, # need to pull up the exclude flag from the criterion into the combination
category=CohortCategory.INTERVENTION,
left=ext_criterion,
right=drug_action,
)

drug_actions.append(comb)

if len(drug_actions) == 1:
Expand Down
3 changes: 3 additions & 0 deletions execution_engine/execution_graph/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from execution_engine.omop.criterion.combination.combination import CriterionCombination
from execution_engine.omop.criterion.combination.logical import (
LogicalCriterionCombination,
NonCommutativeLogicalCriterionCombination,
)
from execution_engine.omop.criterion.combination.temporal import (
TemporalIndicatorCombination,
Expand Down Expand Up @@ -381,6 +382,8 @@ def conjunction_from_combination(
f"Expected {LogicalCriterionCombination.Operator.AND}"
)
return logic.NonSimplifiableAnd
elif isinstance(comb, NonCommutativeLogicalCriterionCombination):
return logic.ConditionalFilter
elif comb.operator.operator == LogicalCriterionCombination.Operator.AND:
return logic.And
elif comb.operator.operator == LogicalCriterionCombination.Operator.OR:
Expand Down
143 changes: 142 additions & 1 deletion execution_engine/omop/criterion/combination/logical.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Union
from typing import Any, Dict, Iterator, Union

from execution_engine.constants import CohortCategory
from execution_engine.omop.criterion.abstract import Criterion
Expand Down Expand Up @@ -141,3 +141,144 @@ def AllOrNone(
category=category,
criteria=criteria,
)


class NonCommutativeLogicalCriterionCombination(LogicalCriterionCombination):
"""
A combination of criteria that is not commutative.
"""

_left: Union[Criterion, CriterionCombination]
_right: Union[Criterion, CriterionCombination]

class Operator(CriterionCombination.Operator):
"""Operators for criterion combinations."""

CONDITIONAL_FILTER = "CONDITIONAL_FILTER"

def __init__(self, operator: str, threshold: int | None = None):
assert operator in [
"CONDITIONAL_FILTER",
], f"Invalid operator {operator}"

self.operator = operator

def __init__(
self,
exclude: bool,
operator: "NonCommutativeLogicalCriterionCombination.Operator",
category: CohortCategory,
left: Union[Criterion, CriterionCombination] | None = None,
right: Union[Criterion, CriterionCombination] | None = None,
root_combination: bool = False,
):
"""
Initialize the criterion combination.
"""
super().__init__(exclude=exclude, operator=operator, category=category)

self._criteria = []
if left is not None:
self._left = left
if right is not None:
self._right = right

self._root = root_combination

@property
def left(self) -> Union[Criterion, CriterionCombination]:
"""
Get the left criterion.
"""
return self._left

@property
def right(self) -> Union[Criterion, CriterionCombination]:
"""
Get the right criterion.
"""
return self._right

def __str__(self) -> str:
"""
Get the string representation of the criterion combination.
"""
return f"{self.operator}({', '.join(str(c) for c in self._criteria)})"

def __repr__(self) -> str:
"""
Get the string representation of the criterion combination.
"""
return f'{self.__class__.__name__}("{self.operator}", {self._criteria})'

def __eq__(self, other: object) -> bool:
"""
Check if the criterion combination is equal to another criterion combination.
"""
if not isinstance(other, NonCommutativeLogicalCriterionCombination):
return NotImplemented
return (
self.operator == other.operator
and self._left == other._left
and self._right == other._right
and self.exclude == other.exclude
and self.category == other.category
)

def __iter__(self) -> Iterator[Union[Criterion, "CriterionCombination"]]:
"""
Iterate over the criteria in the combination.
"""
yield self._left
yield self._right

def dict(self) -> dict:
"""
Get the dictionary representation of the criterion combination.
"""
return {
"operator": self._operator.operator,
"exclude": self.exclude,
"category": self.category.value,
"left": self._left.dict(),
"right": self._right.dict(),
}

@classmethod
def from_dict(
cls, data: Dict[str, Any]
) -> "NonCommutativeLogicalCriterionCombination":
"""
Create a criterion combination from a dictionary.
"""

from execution_engine.omop.criterion.factory import (
criterion_factory, # needs to be here to avoid circular import
)

return cls(
exclude=data["exclude"],
operator=cls.Operator(data["operator"]),
category=CohortCategory(data["category"]),
left=criterion_factory(**data["left"]),
right=criterion_factory(**data["right"]),
)

@classmethod
def ConditionalFilter(
cls,
left: Union[Criterion, "CriterionCombination"],
right: Union[Criterion, "CriterionCombination"],
category: CohortCategory,
exclude: bool = False,
) -> "LogicalCriterionCombination":
"""
Create a CONDITIONAL_FILTER combination of criteria.
"""
return cls(
exclude=exclude,
operator=cls.Operator(cls.Operator.CONDITIONAL_FILTER),
category=category,
left=left,
right=right,
)
2 changes: 2 additions & 0 deletions execution_engine/omop/criterion/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from execution_engine.omop.criterion.combination.combination import CriterionCombination
from execution_engine.omop.criterion.combination.logical import (
LogicalCriterionCombination,
NonCommutativeLogicalCriterionCombination,
)
from execution_engine.omop.criterion.combination.temporal import (
TemporalIndicatorCombination,
Expand All @@ -29,6 +30,7 @@
"ConceptCriterion": ConceptCriterion,
"LogicalCriterionCombination": LogicalCriterionCombination,
"TemporalCriterionCombination": TemporalIndicatorCombination,
"NonCommutativeLogicalCriterionCombination": NonCommutativeLogicalCriterionCombination,
"ConditionOccurrence": ConditionOccurrence,
"DrugExposure": DrugExposure,
"Measurement": Measurement,
Expand Down
41 changes: 35 additions & 6 deletions execution_engine/task/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,9 @@ def run(
),
):
result = self.handle_binary_logical_operator(data)
elif isinstance(self.expr, logic.LeftDependentToggle):
elif isinstance(
self.expr, (logic.LeftDependentToggle, logic.ConditionalFilter)
):
result = self.handle_left_dependent_toggle(
left=data[0],
right=data[1],
Expand Down Expand Up @@ -344,9 +346,13 @@ def handle_left_dependent_toggle(
observation_window: TimeRange,
) -> PersonIntervals:
"""
Handles a left dependent toggle by merging the intervals of the left dependency with the intervals of the
right dependency according to the following rules:
Handles a left dependent toggle or a conditional filter by merging the intervals of the left dependency with the
intervals of the right dependency according to the following rules. The difference between the two is that
the conditional filter yields NEGATIVE if left dependence is NEGATIVE or NO_DATA, while the left dependent
toggle yields NOT_APPLICABLE in these cases.
LeftDependentToggle
-------------------
- If P is NEGATIVE or NO_DATA, the result is NOT_APPLICABLE (NO_DATA: because we cannot decide whether the
recommendation is applicable or not).
- If P is POSITIVE, the result is:
Expand All @@ -364,6 +370,24 @@ def handle_left_dependent_toggle(
| POSITIVE | NEGATIVE | NEGATIVE |
| POSITIVE | NO_DATA | NO_DATA |
ConditionalFilter
-------------------
- If P is NEGATIVE or NO_DATA, the result is NEGATIVE.
- If P is POSITIVE, the result is:
- POSITIVE if I is POSITIVE
- NEGATIVE if I is NEGATIVE
- NO_DATA if I is NO_DATA
In tabular form:
| P | I | Result |
|---|---|--------|
| NEGATIVE | * | NEGATIVE |
| NO_DATA | * | NEGATIVE |
| POSITIVE | POSITIVE | POSITIVE |
| POSITIVE | NEGATIVE | NEGATIVE |
| POSITIVE | NO_DATA | NO_DATA |
:param left: The intervals of the left dependency (the one that determines whether the right dependency is
returned).
:param right: The intervals of the right dependency (the one that is taken when the left dependency is
Expand All @@ -373,19 +397,24 @@ def handle_left_dependent_toggle(
:return: A DataFrame with the merged intervals.
"""
assert isinstance(
self.expr, logic.LeftDependentToggle
), "Dependency is not a LeftDependentToggle expression."
self.expr, (logic.LeftDependentToggle, logic.ConditionalFilter)
), "Dependency is not a LeftDependentToggle or ConditionalFilter expression."

# data[0] is the left dependency (i.e. P)
# data[1] is the right dependency (i.e. I)

data_p = process.select_type(left, IntervalType.POSITIVE)

if isinstance(self.expr, logic.LeftDependentToggle):
interval_type = IntervalType.NOT_APPLICABLE
elif isinstance(self.expr, logic.ConditionalFilter):
interval_type = IntervalType.NEGATIVE

result_not_p = process.complementary_intervals(
data_p,
reference=base_data,
observation_window=observation_window,
interval_type=IntervalType.NOT_APPLICABLE,
interval_type=interval_type,
)

result_p_and_i = process.intersect_intervals([data_p, right])
Expand Down
25 changes: 25 additions & 0 deletions execution_engine/util/cohort_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -717,3 +717,28 @@ def left(self) -> Expr:
def right(self) -> Expr:
"""Returns the right operand"""
return self.args[1]


class ConditionalFilter(BooleanFunction):
"""
A ConditionalFilter object returns the right operand if the left operand is POSITIVE,
and NEGATIVE otherwise
"""

def __new__(
cls, left: BaseExpr, right: BaseExpr, **kwargs: Any
) -> "ConditionalFilter":
"""
Initialize a ConditionalFilter object.
"""
return cast(ConditionalFilter, super().__new__(cls, left, right, **kwargs))

@property
def left(self) -> Expr:
"""Returns the left operand"""
return self.args[0]

@property
def right(self) -> Expr:
"""Returns the right operand"""
return self.args[1]
Loading

0 comments on commit 9a28c1b

Please sign in to comment.