{% translate "This is the OpenKAT report for organization" %} {{ organization.name }}
- {% translate "Created with data from:" %} {{ observed_at }} {{ TIME_ZONE }}
+ {% translate "Created with data from:" %} {{ report_ooi.observed_at }} {{ TIME_ZONE }}
- {% translate "Created on:" %} {{ created_at }}
+ {% translate "Created on:" %} {{ report_ooi.date_generated }}
{% translate "Created by:" %} {{ organization_member.user.full_name }}
diff --git a/rocky/reports/report_types/definitions.py b/rocky/reports/report_types/definitions.py
index e36661e3821..0aff9af1dfa 100644
--- a/rocky/reports/report_types/definitions.py
+++ b/rocky/reports/report_types/definitions.py
@@ -13,8 +13,21 @@
class ReportPlugins(TypedDict):
- required: list[str]
- optional: list[str]
+ required: set[str]
+ optional: set[str]
+
+
+def report_plugins_union(report_types: list[type["BaseReport"]]) -> ReportPlugins:
+ """Take the union of the required and optional plugin sets and remove optional plugins that are required"""
+
+ plugins: ReportPlugins = {"required": set(), "optional": set()}
+
+ for report_type in report_types:
+ plugins["required"].update(report_type.plugins["required"])
+ plugins["optional"].update(report_type.plugins["optional"])
+ plugins["optional"].difference_update(report_type.plugins["required"])
+
+ return plugins
class BaseReport:
diff --git a/rocky/reports/report_types/dns_report/report.py b/rocky/reports/report_types/dns_report/report.py
index 2ae8f888f49..0fd6b62eb52 100644
--- a/rocky/reports/report_types/dns_report/report.py
+++ b/rocky/reports/report_types/dns_report/report.py
@@ -18,7 +18,7 @@ class DNSReport(Report):
id = "dns-report"
name = _("DNS Report")
description = _("DNS reports focus on domain name system configuration and potential weaknesses.")
- plugins = {"required": ["dns-records", "dns-sec"], "optional": ["dns-zone"]}
+ plugins = {"required": {"dns-records", "dns-sec"}, "optional": {"dns-zone"}}
input_ooi_types = {Hostname}
template_path = "dns_report/report.html"
diff --git a/rocky/reports/report_types/findings_report/report.html b/rocky/reports/report_types/findings_report/report.html
index 5e9f476a23d..5eb65cef851 100644
--- a/rocky/reports/report_types/findings_report/report.html
+++ b/rocky/reports/report_types/findings_report/report.html
@@ -49,6 +49,8 @@
{% translate "Findings" %}
{% translate "Description" %}
{{ info.finding_type.description }}
+ {% translate "Source" %}
+ {{ info.finding_type.source }}
{% translate "Impact" %}
{{ info.finding_type.impact }}
{% translate "Recommendation" %}
diff --git a/rocky/reports/report_types/findings_report/report.py b/rocky/reports/report_types/findings_report/report.py
index 9b05e6231b9..9ca4951d9d2 100644
--- a/rocky/reports/report_types/findings_report/report.py
+++ b/rocky/reports/report_types/findings_report/report.py
@@ -3,20 +3,27 @@
from django.utils.translation import gettext_lazy as _
+import octopoes.models.ooi.reports as report_models
from octopoes.models import Reference
from octopoes.models.ooi.findings import Finding, FindingType, RiskLevelSeverity
+from octopoes.models.ooi.monitoring import Incident
+from octopoes.models.ooi.question import Question
+from octopoes.models.ooi.web import RESTAPI, ImageMetadata
from octopoes.models.types import ALL_TYPES
from reports.report_types.definitions import Report, ReportPlugins
TREE_DEPTH = 9
SEVERITY_OPTIONS = [severity.value for severity in RiskLevelSeverity]
+_EXCLUDE_OOI_TYPES = [Question, RESTAPI, Incident, ImageMetadata, report_models.ReportData, report_models.Report]
+_INPUT_OOI_TYPES = {ooi_type for ooi_type in ALL_TYPES if ooi_type not in _EXCLUDE_OOI_TYPES}
+
class FindingsReport(Report):
id = "findings-report"
name = _("Findings Report")
description = _("Shows all the finding types and their occurrences.")
- plugins: ReportPlugins = {"required": [], "optional": []}
+ plugins: ReportPlugins = {"required": set(), "optional": set()}
input_ooi_types = ALL_TYPES
template_path = "findings_report/report.html"
label_style = "3-light"
diff --git a/rocky/reports/report_types/helpers.py b/rocky/reports/report_types/helpers.py
index 4df28901cd2..1dbc1772935 100644
--- a/rocky/reports/report_types/helpers.py
+++ b/rocky/reports/report_types/helpers.py
@@ -1,7 +1,7 @@
from octopoes.models import OOI, Reference
from reports.report_types.aggregate_organisation_report.report import AggregateOrganisationReport
from reports.report_types.concatenated_report.report import ConcatenatedReport
-from reports.report_types.definitions import AggregateReport, MultiReport, Report
+from reports.report_types.definitions import AggregateReport, BaseReport, Report
from reports.report_types.dns_report.report import DNSReport
from reports.report_types.findings_report.report import FindingsReport
from reports.report_types.ipv6_report.report import IPv6Report
@@ -36,6 +36,8 @@
CONCATENATED_REPORTS = [ConcatenatedReport]
+ALL_REPORT_TYPES = REPORTS + AGGREGATE_REPORTS + MULTI_REPORTS + CONCATENATED_REPORTS
+
def get_ooi_types_with_report() -> set[type[OOI]]:
"""
@@ -44,7 +46,7 @@ def get_ooi_types_with_report() -> set[type[OOI]]:
return {ooi_type for report in REPORTS for ooi_type in report.input_ooi_types}
-def get_report_types_for_ooi(ooi_pk: str) -> list[type[Report]]:
+def get_report_types_for_ooi(ooi_pk: str) -> list[type[BaseReport]]:
"""
Get all report types that can be generated for a given OOI
"""
@@ -53,26 +55,26 @@ def get_report_types_for_ooi(ooi_pk: str) -> list[type[Report]]:
return [report for report in REPORTS if ooi_type in report.input_ooi_types]
-def get_report_types_for_oois(ooi_pks: list[str]) -> set[type[Report]]:
+def get_report_types_for_oois(oois: list[str]) -> set[type[BaseReport]]:
"""
Get all report types that can be generated for a given list of OOIs
"""
- return {report for ooi_pk in ooi_pks for report in get_report_types_for_ooi(ooi_pk)}
+
+ return {report for ooi_pk in oois for report in get_report_types_for_ooi(ooi_pk)}
-def get_report_by_id(report_id: str) -> type[Report] | type[MultiReport] | type[AggregateReport]:
+def get_report_by_id(report_id: str) -> type[BaseReport]:
"""
Get report type by id
"""
- if report_id is None:
- return ConcatenatedReport
- for report in REPORTS + MULTI_REPORTS + AGGREGATE_REPORTS + CONCATENATED_REPORTS:
+
+ for report in ALL_REPORT_TYPES:
if report.id == report_id:
return report
raise ValueError(f"Report with id {report_id} not found")
-def get_reports(report_ids: list[str]) -> list[type[Report] | type[MultiReport] | type[AggregateReport]]:
+def get_reports(report_ids: list[str]) -> list[type[BaseReport]]:
return [get_report_by_id(report_id) for report_id in report_ids]
diff --git a/rocky/reports/report_types/ipv6_report/report.py b/rocky/reports/report_types/ipv6_report/report.py
index ae864dca348..98ce048086c 100644
--- a/rocky/reports/report_types/ipv6_report/report.py
+++ b/rocky/reports/report_types/ipv6_report/report.py
@@ -20,7 +20,7 @@ class IPv6Report(Report):
id = "ipv6-report"
name = _("IPv6 Report")
description = _("Check whether hostnames point to IPv6 addresses.")
- plugins = {"required": ["dns-records"], "optional": []}
+ plugins = {"required": {"dns-records"}, "optional": set()}
input_ooi_types = {Hostname, IPAddressV4, IPAddressV6}
template_path = "ipv6_report/report.html"
label_style = "4-light"
diff --git a/rocky/reports/report_types/mail_report/report.py b/rocky/reports/report_types/mail_report/report.py
index 69f166a7ec9..0adf2ab9750 100644
--- a/rocky/reports/report_types/mail_report/report.py
+++ b/rocky/reports/report_types/mail_report/report.py
@@ -15,7 +15,7 @@ class MailReport(Report):
id = "mail-report"
name = _("Mail Report")
description = _("System specific Mail Report that focusses on IP addresses and hostnames.")
- plugins = {"required": ["dns-records"], "optional": []}
+ plugins = {"required": {"dns-records"}, "optional": set()}
input_ooi_types = {Hostname, IPAddressV4, IPAddressV6}
template_path = "mail_report/report.html"
label_style = "2-light"
diff --git a/rocky/reports/report_types/multi_organization_report/report.py b/rocky/reports/report_types/multi_organization_report/report.py
index b32146f3f55..6bc74209f32 100644
--- a/rocky/reports/report_types/multi_organization_report/report.py
+++ b/rocky/reports/report_types/multi_organization_report/report.py
@@ -23,7 +23,7 @@ class MultiOrganizationReport(MultiReport):
id = "multi-organization-report"
name = _("Multi Organization Report")
description = _("Multi Organization Report")
- plugins: ReportPlugins = {"required": [], "optional": []}
+ plugins: ReportPlugins = {"required": set(), "optional": set()}
input_ooi_types = {ReportData}
template_path = "multi_organization_report/report.html"
@@ -257,6 +257,6 @@ def collect_report_data(
):
report_data = {}
for ooi in [x for x in input_ooi_references if Reference.from_str(x).class_type == ReportData]:
- report_data[ooi] = connector.get(Reference.from_str(ooi), observed_at).dict()
+ report_data[ooi] = connector.get(Reference.from_str(ooi), observed_at).model_dump()
return report_data
diff --git a/rocky/reports/report_types/name_server_report/report.py b/rocky/reports/report_types/name_server_report/report.py
index 9398e728719..a0eb7f01606 100644
--- a/rocky/reports/report_types/name_server_report/report.py
+++ b/rocky/reports/report_types/name_server_report/report.py
@@ -52,12 +52,12 @@ class NameServerSystemReport(Report):
name = _("Name Server Report")
description = _("Name Server Report checks name servers on basic security standards.")
plugins = {
- "required": [
+ "required": {
"nmap",
"dns-records",
"dns-sec",
- ],
- "optional": [],
+ },
+ "optional": set(),
}
input_ooi_types = {Hostname, IPAddressV4, IPAddressV6}
template_path = "name_server_report/report.html"
diff --git a/rocky/reports/report_types/open_ports_report/report.py b/rocky/reports/report_types/open_ports_report/report.py
index e868237dad5..05fbd1e6c0d 100644
--- a/rocky/reports/report_types/open_ports_report/report.py
+++ b/rocky/reports/report_types/open_ports_report/report.py
@@ -14,8 +14,8 @@ class OpenPortsReport(Report):
name = _("Open Ports Report")
description = _("Find open ports of IP addresses")
plugins = {
- "required": ["nmap"],
- "optional": ["shodan", "nmap-udp", "nmap-ports", "nmap-ip-range", "masscan"],
+ "required": {"nmap"},
+ "optional": {"shodan", "nmap-udp", "nmap-ports", "nmap-ip-range", "masscan"},
}
input_ooi_types = {Hostname, IPAddressV4, IPAddressV6}
template_path = "open_ports_report/report.html"
diff --git a/rocky/reports/report_types/rpki_report/report.py b/rocky/reports/report_types/rpki_report/report.py
index ed2d05576e0..1e11541ec1f 100644
--- a/rocky/reports/report_types/rpki_report/report.py
+++ b/rocky/reports/report_types/rpki_report/report.py
@@ -22,7 +22,7 @@ class RPKIReport(Report):
"Shows whether the IP is covered by a valid RPKI ROA. For a hostname it shows "
"the IP addresses and whether they are covered by a valid RPKI ROA."
)
- plugins = {"required": ["dns-records", "rpki"], "optional": []}
+ plugins = {"required": {"dns-records", "rpki"}, "optional": set()}
input_ooi_types = {Hostname, IPAddressV4, IPAddressV6}
template_path = "rpki_report/report.html"
label_style = "4-light"
diff --git a/rocky/reports/report_types/safe_connections_report/report.py b/rocky/reports/report_types/safe_connections_report/report.py
index 13554d39562..f4192d7254d 100644
--- a/rocky/reports/report_types/safe_connections_report/report.py
+++ b/rocky/reports/report_types/safe_connections_report/report.py
@@ -20,8 +20,8 @@ class SafeConnectionsReport(Report):
name = _("Safe Connections Report")
description: str = _("Shows whether the IPService contains safe ciphers.")
plugins = {
- "required": ["dns-records", "testssl-sh-ciphers", "nmap"],
- "optional": [],
+ "required": {"dns-records", "testssl-sh-ciphers", "nmap"},
+ "optional": set(),
}
input_ooi_types = {Hostname, IPAddressV4, IPAddressV6}
template_path = "safe_connections_report/report.html"
diff --git a/rocky/reports/report_types/systems_report/report.py b/rocky/reports/report_types/systems_report/report.py
index db71c686271..af804290acb 100644
--- a/rocky/reports/report_types/systems_report/report.py
+++ b/rocky/reports/report_types/systems_report/report.py
@@ -53,7 +53,7 @@ class SystemReport(Report):
id = "systems-report"
name = _("System Report")
description = _("Combine IP addresses, hostnames and services into systems.")
- plugins = {"required": ["dns-records", "nmap"], "optional": ["nmap-udp"]}
+ plugins = {"required": {"dns-records", "nmap"}, "optional": {"nmap-udp"}}
input_ooi_types = {Hostname, IPAddressV4, IPAddressV6}
template_path = "systems_report/report.html"
label_style = "6-light"
diff --git a/rocky/reports/report_types/tls_report/report.py b/rocky/reports/report_types/tls_report/report.py
index e56cd5cf8ba..eae1802093d 100644
--- a/rocky/reports/report_types/tls_report/report.py
+++ b/rocky/reports/report_types/tls_report/report.py
@@ -20,7 +20,7 @@ class TLSReport(Report):
id = "tls-report"
name = _("TLS Report")
description: str = _("TLS Report assesses the security of data encryption and transmission protocols.")
- plugins = {"required": ["testssl-sh-ciphers"], "optional": []}
+ plugins = {"required": {"testssl-sh-ciphers"}, "optional": set()}
input_ooi_types = {IPService}
template_path = "tls_report/report.html"
label_style = "3-light"
diff --git a/rocky/reports/report_types/vulnerability_report/report.py b/rocky/reports/report_types/vulnerability_report/report.py
index 021e88978d5..d8b88888ed9 100644
--- a/rocky/reports/report_types/vulnerability_report/report.py
+++ b/rocky/reports/report_types/vulnerability_report/report.py
@@ -21,8 +21,8 @@ class VulnerabilityReport(Report):
name = _("Vulnerability Report")
description: str = _("Vulnerabilities found are grouped for each system.")
plugins = {
- "required": ["dns-records", "nmap", "webpage-analysis"],
- "optional": ["nmap-udp", "nmap-ports", "shodan"],
+ "required": {"dns-records", "nmap", "webpage-analysis"},
+ "optional": {"nmap-udp", "nmap-ports", "shodan"},
}
input_ooi_types = {Hostname, IPAddressV4, IPAddressV6}
template_path = "vulnerability_report/report.html"
diff --git a/rocky/reports/report_types/web_system_report/report.py b/rocky/reports/report_types/web_system_report/report.py
index 75aece24111..f55a3ebc51c 100644
--- a/rocky/reports/report_types/web_system_report/report.py
+++ b/rocky/reports/report_types/web_system_report/report.py
@@ -92,7 +92,7 @@ class WebSystemReport(Report):
name = _("Web System Report")
description = _("Web System Reports check web systems on basic security standards.")
plugins = {
- "required": [
+ "required": {
"nmap",
"dns-records",
"security_txt_downloader",
@@ -100,8 +100,8 @@ class WebSystemReport(Report):
"ssl-version",
"ssl-certificates",
"webpage-analysis",
- ],
- "optional": [],
+ },
+ "optional": set(),
}
input_ooi_types = {Hostname, IPAddressV4, IPAddressV6}
template_path = "web_system_report/report.html"
diff --git a/rocky/reports/runner/__init__.py b/rocky/reports/runner/__init__.py
new file mode 100644
index 00000000000..e69de29bb2d
diff --git a/rocky/reports/runner/local.py b/rocky/reports/runner/local.py
new file mode 100644
index 00000000000..a1e5a4e5fa2
--- /dev/null
+++ b/rocky/reports/runner/local.py
@@ -0,0 +1,48 @@
+from datetime import datetime, timezone
+
+from django.conf import settings
+from katalogus.client import KATalogusError, get_katalogus
+from tools.models import Organization
+
+from octopoes.connector.octopoes import OctopoesAPIConnector
+from octopoes.models import Reference
+from octopoes.models.ooi.reports import ReportRecipe
+from reports.report_types.helpers import get_report_by_id
+from reports.runner.models import JobRuntimeError, ReportJobRunner
+from reports.views.base import format_plugin_data, hydrate_plugins
+from reports.views.mixins import collect_reports, save_report_data
+from rocky.bytes_client import get_bytes_client
+from rocky.scheduler import ReportTask
+
+
+class LocalReportJobRunner(ReportJobRunner):
+ def run(self, report_task: ReportTask) -> None:
+ now = datetime.now(timezone.utc)
+ connector = OctopoesAPIConnector(settings.OCTOPOES_API, report_task.organisation_id)
+ recipe: ReportRecipe = connector.get(
+ Reference.from_str(f"ReportRecipe|{report_task.report_recipe_id}"), datetime.now(timezone.utc)
+ )
+ parsed_report_types = [get_report_by_id(report_type_id) for report_type_id in recipe.report_types]
+
+ error_reports, report_data = collect_reports(
+ now,
+ connector,
+ list(recipe.input_recipe["input_oois"]),
+ parsed_report_types,
+ )
+
+ try:
+ report_type_plugins = hydrate_plugins(parsed_report_types, get_katalogus(report_task.organisation_id))
+ plugins = format_plugin_data(report_type_plugins)
+ except KATalogusError as e:
+ raise JobRuntimeError("Failed to hydrate plugins from KATalogus") from e
+
+ save_report_data(
+ get_bytes_client(report_task.organisation_id),
+ now,
+ connector,
+ Organization.objects.get(code=report_task.organisation_id),
+ plugins,
+ report_data,
+ [(recipe.report_name_format, recipe.report_name_format)],
+ )
diff --git a/rocky/reports/runner/models.py b/rocky/reports/runner/models.py
new file mode 100644
index 00000000000..6ea2409cbd3
--- /dev/null
+++ b/rocky/reports/runner/models.py
@@ -0,0 +1,15 @@
+from octopoes.models.ooi.reports import ReportRecipe
+
+
+class ReportJobRunner:
+ def run(self, recipe: ReportRecipe) -> None:
+ raise NotImplementedError()
+
+
+class WorkerManager:
+ def run(self) -> None:
+ raise NotImplementedError()
+
+
+class JobRuntimeError(RuntimeError):
+ """Base exception class for exceptions raised during running of jobs"""
diff --git a/rocky/reports/runner/worker.py b/rocky/reports/runner/worker.py
new file mode 100644
index 00000000000..c411c365766
--- /dev/null
+++ b/rocky/reports/runner/worker.py
@@ -0,0 +1,262 @@
+import multiprocessing as mp
+import os
+import signal
+import sys
+import time
+from queue import Queue
+
+import structlog
+from django.conf import settings
+from httpx import HTTPError
+from pydantic import ValidationError
+
+from reports.runner.local import LocalReportJobRunner
+from reports.runner.models import ReportJobRunner, WorkerManager
+from rocky.scheduler import SchedulerClient, Task, TaskStatus, scheduler_client
+
+logger = structlog.get_logger(__name__)
+
+
+class SchedulerWorkerManager(WorkerManager):
+ def __init__(
+ self,
+ runner: ReportJobRunner,
+ scheduler: SchedulerClient,
+ pool_size: int,
+ poll_interval: int,
+ worker_heartbeat: int,
+ ):
+ self.runner = runner
+ self.scheduler = scheduler
+ self.pool_size = pool_size
+ self.poll_interval = poll_interval
+ self.worker_heartbeat = worker_heartbeat
+
+ manager = mp.Manager()
+
+ self.task_queue = manager.Queue() # multiprocessing.Queue() will not work on macOS, see mp.Queue.qsize()
+ self.handling_tasks = manager.dict()
+ self.workers: list[mp.Process] = []
+
+ self.exited = False
+
+ def run(self) -> None:
+ logger.info("Created worker pool for queue 'report'")
+
+ self.workers = [mp.Process(target=_start_working, args=self._worker_args()) for _ in range(self.pool_size)]
+ for worker in self.workers:
+ worker.start()
+
+ signal.signal(signal.SIGINT, lambda signum, _: self.exit(signum))
+ signal.signal(signal.SIGTERM, lambda signum, _: self.exit(signum))
+
+ while True:
+ try:
+ self._check_workers()
+ self._fill_queue(self.task_queue)
+ except Exception as e: # noqa
+ logger.exception("Unhandled Exception:")
+ logger.info("Continuing worker...")
+ continue
+ except: # noqa
+ # Calling sys.exit() in self.exit() will raise SystemExit. We
+ # should only log the exception and call self.exit() when the
+ # exception is caused by something else and self.exit() hasn't
+ # been called yet.
+ if not self.exited:
+ logger.exception("Exiting worker...")
+ self.exit()
+
+ raise
+
+ def _fill_queue(self, task_queue: Queue):
+ if task_queue.qsize() > self.pool_size:
+ time.sleep(self.worker_heartbeat)
+ return
+
+ try:
+ queues = self.scheduler.get_queues()
+ except HTTPError:
+ # Scheduler is having issues, so make note of it and try again
+ logger.exception("Getting the queues from the scheduler failed")
+ time.sleep(10 * self.poll_interval) # But not immediately
+ return
+
+ # We do not target a specific queue since we start one runtime for all organisations
+ # and queue ids contain the organisation_id
+ queues = [q for q in queues if q.id.startswith("report")]
+
+ logger.debug("Found queues: %s", [queue.id for queue in queues])
+
+ all_queues_empty = True
+
+ for queue in queues:
+ logger.debug("Popping from queue %s", queue.id)
+
+ try:
+ p_item = self.scheduler.pop_item(queue.id)
+ except (HTTPError, ValidationError):
+ logger.error("Popping task from scheduler failed")
+ time.sleep(self.poll_interval)
+ continue
+
+ if not p_item:
+ logger.debug("Queue %s empty", queue.id)
+ continue
+
+ all_queues_empty = False
+
+ logger.info("Handling task[%s]", p_item.id)
+
+ try:
+ task_queue.put(p_item)
+ logger.info("Dispatched task[%s]", p_item.id)
+ except: # noqa
+ logger.error("Exiting worker...")
+ logger.info("Patching scheduler task[id=%s] to %s", p_item.id, TaskStatus.FAILED.value)
+
+ try:
+ self.scheduler.patch_task(p_item.id, TaskStatus.FAILED)
+ logger.info("Set task status to %s in the scheduler for task[id=%s]", TaskStatus.FAILED, p_item.id)
+ except HTTPError:
+ logger.error("Could not patch scheduler task to %s", TaskStatus.FAILED.value)
+
+ raise
+
+ if all_queues_empty:
+ logger.debug("All queues empty, sleeping %f seconds", self.poll_interval)
+ time.sleep(self.poll_interval)
+
+ def _check_workers(self) -> None:
+ new_workers = []
+
+ for worker in self.workers:
+ closed = False
+
+ try:
+ if worker.is_alive():
+ new_workers.append(worker)
+ continue
+ except ValueError:
+ closed = True # worker is closed, so we create a new one
+
+ logger.warning(
+ "Worker[pid=%s, %s] not alive, creating new worker...", worker.pid, _format_exit_code(worker.exitcode)
+ )
+
+ if not closed: # Closed workers do not have a pid, so cleaning up would fail
+ self._cleanup_pending_worker_task(worker)
+ worker.close()
+
+ new_worker = mp.Process(target=_start_working, args=self._worker_args())
+ new_worker.start()
+ new_workers.append(new_worker)
+
+ self.workers = new_workers
+
+ def _cleanup_pending_worker_task(self, worker: mp.Process) -> None:
+ if worker.pid not in self.handling_tasks:
+ logger.debug("No pending task found for Worker[pid=%s, %s]", worker.pid, _format_exit_code(worker.exitcode))
+ return
+
+ handling_task_id = self.handling_tasks[worker.pid]
+
+ try:
+ task = self.scheduler.get_task_details(handling_task_id)
+
+ if task.status is TaskStatus.DISPATCHED or task.status is TaskStatus.RUNNING:
+ try:
+ self.scheduler.patch_task(task.id, TaskStatus.FAILED)
+ logger.warning("Set status to failed in the scheduler for task[id=%s]", handling_task_id)
+ except HTTPError:
+ logger.exception("Could not patch scheduler task to failed")
+ except HTTPError:
+ logger.exception("Could not get scheduler task[id=%s]", handling_task_id)
+
+ def _worker_args(self) -> tuple:
+ return self.task_queue, self.runner, self.scheduler, self.handling_tasks
+
+ def exit(self, signum: int | None = None):
+ try:
+ if signum:
+ logger.info("Received %s, exiting", signal.Signals(signum).name)
+
+ if not self.task_queue.empty():
+ tasks: list[Task] = [self.task_queue.get() for _ in range(self.task_queue.qsize())]
+
+ for task in tasks:
+ try:
+ self.scheduler.push_task(task)
+ except HTTPError:
+ logger.exception("Rescheduling task failed[id=%s]", task.id)
+
+ killed_workers = []
+
+ for worker in self.workers: # Send all signals before joining, speeding up shutdowns
+ try:
+ if worker.is_alive():
+ worker.kill()
+ killed_workers.append(worker)
+ except ValueError:
+ pass # worker is already closed
+
+ for worker in killed_workers:
+ worker.join()
+ self._cleanup_pending_worker_task(worker)
+ worker.close()
+ finally:
+ self.exited = True
+ # If we are called from the main run loop we are already in the
+ # process of exiting, so we only need to call sys.exit() in the
+ # signal handler.
+ if signum:
+ sys.exit()
+
+
+def _format_exit_code(exitcode: int | None) -> str:
+ if exitcode is None or exitcode >= 0:
+ return f"exitcode={exitcode}"
+
+ return f"signal={signal.Signals(-exitcode).name}"
+
+
+def _start_working(
+ task_queue: mp.Queue,
+ runner: ReportJobRunner,
+ scheduler: SchedulerClient,
+ handling_tasks: dict[int, str],
+):
+ logger.info("Started listening for tasks from worker[pid=%s]", os.getpid())
+
+ while True:
+ p_item = task_queue.get()
+ status = TaskStatus.FAILED
+ handling_tasks[os.getpid()] = str(p_item.id)
+
+ try:
+ scheduler.patch_task(p_item.id, TaskStatus.RUNNING)
+ runner.run(p_item.data)
+ status = TaskStatus.COMPLETED
+ except Exception: # noqa
+ logger.exception("An error occurred handling scheduler item[id=%s]", p_item.id)
+ except: # noqa
+ logger.exception("An unhandled error occurred handling scheduler item[id=%s]", p_item.id)
+ raise
+ finally:
+ try:
+ # The docker runner could have handled this already
+ if scheduler.get_task_details(p_item.id).status == TaskStatus.RUNNING:
+ scheduler.patch_task(p_item.id, status) # Note that implicitly, we have p_item.id == task_id
+ logger.info("Set status to %s in the scheduler for task[id=%s]", status, p_item.id)
+ except HTTPError:
+ logger.exception("Could not patch scheduler task to %s", status.value)
+
+
+def get_runtime_manager() -> WorkerManager:
+ return SchedulerWorkerManager(
+ LocalReportJobRunner(),
+ scheduler_client(None),
+ settings.POOL_SIZE,
+ settings.POLL_INTERVAL,
+ settings.WORKER_HEARTBEAT,
+ )
diff --git a/rocky/reports/templates/aggregate_report.html b/rocky/reports/templates/aggregate_report.html
index 9ba1d1d23a3..8cdb08d1f79 100644
--- a/rocky/reports/templates/aggregate_report.html
+++ b/rocky/reports/templates/aggregate_report.html
@@ -5,7 +5,7 @@
{% block content %}
{% include "header.html" %}
- {% include template with data=post_processed_data %}
+ {% include report_ooi.template with data=report_data %}
{% endblock content %}
{% block html_at_end_body %}
diff --git a/rocky/reports/templates/aggregate_report_pdf.html b/rocky/reports/templates/aggregate_report_pdf.html
index 4047414e922..f1e79ff06b0 100644
--- a/rocky/reports/templates/aggregate_report_pdf.html
+++ b/rocky/reports/templates/aggregate_report_pdf.html
@@ -14,7 +14,7 @@
{% endcompress %}
- {% include template with data=post_processed_data %}
+ {% include report_ooi.template with data=report_data %}
|