Skip to content

Commit

Permalink
EventLogging: Adding logging of module errors.
Browse files Browse the repository at this point in the history
  • Loading branch information
xsedla1o committed Jul 24, 2023
1 parent 826e4c0 commit e9cc807
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 13 deletions.
6 changes: 4 additions & 2 deletions dp3/snapshots/snapshooter.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,10 @@ def __init__(
self.worker_cnt = platform_config.num_processes
self.config = SnapShooterConfig.parse_obj(platform_config.config.get("snapshots"))

self._timeseries_hooks = SnapshotTimeseriesHookContainer(self.log, self.model_spec)
self._correlation_hooks = SnapshotCorrelationHookContainer(self.log, self.model_spec)
elog = task_executor.elog

self._timeseries_hooks = SnapshotTimeseriesHookContainer(self.log, self.model_spec, elog)
self._correlation_hooks = SnapshotCorrelationHookContainer(self.log, self.model_spec, elog)

queue = f"{platform_config.app_name}-worker-{platform_config.process_index}-snapshots"
self.snapshot_queue_reader = TaskQueueReader(
Expand Down
14 changes: 11 additions & 3 deletions dp3/snapshots/snapshot_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
)
from dp3.common.config import ModelSpec
from dp3.common.task import DataPointTask
from dp3.task_processing.task_hooks import EventGroupType


class SnapshotTimeseriesHookContainer:
"""Container for timeseries analysis hooks"""

def __init__(self, log: logging.Logger, model_spec: ModelSpec):
def __init__(self, log: logging.Logger, model_spec: ModelSpec, elog: EventGroupType):
self.log = log.getChild("TimeseriesHooks")
self.elog = elog
self.model_spec = model_spec

self._hooks = defaultdict(list)
Expand Down Expand Up @@ -64,15 +66,17 @@ def run(
new_tasks = hook(entity_type, attr_type, attr_history)
tasks.extend(new_tasks)
except Exception as e:
self.elog.log("module_error")
self.log.error(f"Error during running hook {hook}: {e}")
return tasks


class SnapshotCorrelationHookContainer:
"""Container for data fusion and correlation hooks."""

def __init__(self, log: logging.Logger, model_spec: ModelSpec):
def __init__(self, log: logging.Logger, model_spec: ModelSpec, elog: EventGroupType):
self.log = log.getChild("CorrelationHooks")
self.elog = elog
self.model_spec = model_spec

self._hooks: defaultdict[str, list[tuple[str, Callable]]] = defaultdict(list)
Expand Down Expand Up @@ -263,7 +267,11 @@ def run(self, entities: dict):
for hook_id, hook, etype in hook_subset:
for eid, entity_values in entities_by_etype[etype].items():
self.log.debug("Running hook %s on entity %s", hook_id, eid)
hook(etype, entity_values)
try:
hook(etype, entity_values)
except Exception as e:
self.elog.log("module_error")
self.log.error(f"Error during running hook {hook_id}: {e}")

def _restore_hook_order(self, hooks: list[tuple[str, Callable]]):
topological_order = self._dependency_graph.topological_sort()
Expand Down
6 changes: 3 additions & 3 deletions dp3/task_processing/task_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,17 @@ def __init__(
)

# Hooks
self._task_generic_hooks = TaskGenericHooksContainer(self.log)
self._task_generic_hooks = TaskGenericHooksContainer(self.log, self.elog)
self._task_entity_hooks = {}
self._task_attr_hooks = {}

for entity in self.model_spec.entities:
self._task_entity_hooks[entity] = TaskEntityHooksContainer(entity, self.log)
self._task_entity_hooks[entity] = TaskEntityHooksContainer(entity, self.log, self.elog)

for entity, attr in self.model_spec.attributes:
attr_type = self.model_spec.attributes[entity, attr].t
self._task_attr_hooks[entity, attr] = TaskAttrHooksContainer(
entity, attr, attr_type, self.log
entity, attr, attr_type, self.log, self.elog
)

def register_task_hook(self, hook_type: str, hook: Callable):
Expand Down
26 changes: 22 additions & 4 deletions dp3/task_processing/task_hooks.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import logging
from typing import Callable
from typing import Callable, Union

from event_count_logger import DummyEventGroup, EventGroup

from dp3.common.attrspec import AttrType
from dp3.common.datapoint import DataPointBase
from dp3.common.task import DataPointTask

EventGroupType = Union[EventGroup, DummyEventGroup]


class TaskGenericHooksContainer:
"""Container for generic hooks
Expand All @@ -14,8 +18,9 @@ class TaskGenericHooksContainer:
- `on_task_start`: receives Task, no return value requirements
"""

def __init__(self, log: logging.Logger):
def __init__(self, log: logging.Logger, elog: EventGroupType):
self.log = log.getChild("genericHooks")
self.elog = elog

self._on_start = []

Expand All @@ -33,6 +38,7 @@ def run_on_start(self, task: DataPointTask):
try:
hook(task)
except Exception as e:
self.elog.log("module_error")
self.log.error(f"Error during running hook {hook}: {e}")


Expand All @@ -46,9 +52,10 @@ class TaskEntityHooksContainer:
- `on_entity_creation`: receives eid and Task, may return list of DataPointTasks
"""

def __init__(self, entity: str, log: logging.Logger):
def __init__(self, entity: str, log: logging.Logger, elog: EventGroupType):
self.entity = entity
self.log = log.getChild(f"entityHooks.{entity}")
self.elog = elog

self._allow_creation = []
self._on_creation = []
Expand All @@ -72,6 +79,7 @@ def run_allow_creation(self, eid: str, task: DataPointTask):
)
return False
except Exception as e:
self.elog.log("module_error")
self.log.error(f"Error during running hook {hook}: {e}")

return True
Expand All @@ -88,6 +96,7 @@ def run_on_creation(self, eid: str, task: DataPointTask):
if type(hook_new_tasks) is list:
new_tasks += hook_new_tasks
except Exception as e:
self.elog.log("module_error")
self.log.error(f"Error during running hook {hook}: {e}")

return new_tasks
Expand All @@ -102,10 +111,18 @@ class TaskAttrHooksContainer:
receives eid and DataPointBase, may return a list of DataPointTasks
"""

def __init__(self, entity: str, attr: str, attr_type: AttrType, log: logging.Logger):
def __init__(
self,
entity: str,
attr: str,
attr_type: AttrType,
log: logging.Logger,
elog: EventGroupType,
):
self.entity = entity
self.attr = attr
self.log = log.getChild(f"attributeHooks.{entity}.{attr}")
self.elog = elog

if attr_type == AttrType.PLAIN:
self.on_new_hook_type = "on_new_plain"
Expand Down Expand Up @@ -140,6 +157,7 @@ def run_on_new(self, eid: str, dp: DataPointBase):
if type(hook_new_tasks) is list:
new_tasks += hook_new_tasks
except Exception as e:
self.elog.log("module_error")
self.log.error(f"Error during running hook {hook}: {e}")

return new_tasks
6 changes: 5 additions & 1 deletion tests/test_common/test_snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import unittest
from functools import partial, update_wrapper

from event_count_logger import DummyEventGroup

from dp3.common.config import ModelSpec, PlatformConfig, read_config_dir
from dp3.common.task import Task
from dp3.snapshots.snapshooter import SnapShooter
Expand All @@ -26,7 +28,7 @@ def setUp(self) -> None:
)
self.model_spec = ModelSpec(config.get("db_entities"))
self.container = SnapshotCorrelationHookContainer(
log=logging.getLogger("TestLogger"), model_spec=self.model_spec
log=logging.getLogger("TestLogger"), model_spec=self.model_spec, elog=DummyEventGroup()
)

def test_basic_function(self):
Expand Down Expand Up @@ -143,6 +145,8 @@ def ack(self, msg_tag):


class MockTaskExecutor:
elog = DummyEventGroup()

def register_attr_hook(self, *args, **kwargs):
...

Expand Down

0 comments on commit e9cc807

Please sign in to comment.