Skip to content

Commit

Permalink
Merge branch 'main' into feature/json-line-logging
Browse files Browse the repository at this point in the history
  • Loading branch information
underdarknl authored Sep 16, 2024
2 parents 7e99081 + ae790c3 commit eb16de9
Show file tree
Hide file tree
Showing 175 changed files with 3,482 additions and 981 deletions.
16 changes: 12 additions & 4 deletions boefjes/.ci/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ services:
command: sh -c 'python -m pytest -v tests/integration'
depends_on:
- ci_katalogus-db
- ci_katalogus
env_file:
- .ci/.env.test
volumes:
Expand Down Expand Up @@ -84,7 +85,7 @@ services:
- .ci/.env.test

ci_xtdb:
image: "ghcr.io/dekkers/xtdb-http-multinode:v1.0.8"
image: "ghcr.io/dekkers/xtdb-http-multinode:v1.1.0"

ci_octopoes_api_worker:
build:
Expand All @@ -103,8 +104,15 @@ services:
hard: 262144

ci_katalogus:
image: "docker.io/wiremock/wiremock:2.34.0"
volumes:
- .ci/wiremock:/home/wiremock
build:
context: ..
dockerfile: boefjes/Dockerfile
args:
- ENVIRONMENT=dev
command: uvicorn boefjes.katalogus.root:app --host 0.0.0.0 --port 8080
depends_on:
- ci_katalogus-db
env_file:
- .ci/.env.test
volumes:
- .:/app/boefjes
15 changes: 0 additions & 15 deletions boefjes/.ci/wiremock/mappings/organisations.json

This file was deleted.

28 changes: 13 additions & 15 deletions boefjes/boefjes/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
from boefjes.clients.bytes_client import BytesAPIClient
from boefjes.clients.scheduler_client import SchedulerAPIClient, TaskStatus
from boefjes.config import settings
from boefjes.dependencies.plugins import PluginService, get_plugin_service
from boefjes.job_handler import get_environment_settings, get_octopoes_api_connector
from boefjes.job_models import BoefjeMeta
from boefjes.local_repository import LocalPluginRepository, get_local_repository
from boefjes.models import PluginType
from boefjes.plugins.models import _default_mime_types
from octopoes.models import Reference
from octopoes.models.exception import ObjectNotFoundException
Expand Down Expand Up @@ -88,14 +89,15 @@ async def root():
def boefje_input(
task_id: UUID,
scheduler_client: SchedulerAPIClient = Depends(get_scheduler_client),
local_repository: LocalPluginRepository = Depends(get_local_repository),
plugin_service: PluginService = Depends(get_plugin_service),
):
task = get_task(task_id, scheduler_client)

if task.status is not TaskStatus.RUNNING:
raise HTTPException(status_code=403, detail="Task does not have status running")

boefje_meta = create_boefje_meta(task, local_repository)
plugin = plugin_service.by_plugin_id(task.data.boefje.id, task.data.organization)
boefje_meta = create_boefje_meta(task, plugin)

output_url = str(settings.api).rstrip("/") + f"/api/v0/tasks/{task_id}"
return BoefjeInput(task_id=task_id, output_url=output_url, boefje_meta=boefje_meta)
Expand All @@ -107,22 +109,23 @@ def boefje_output(
boefje_output: BoefjeOutput,
scheduler_client: SchedulerAPIClient = Depends(get_scheduler_client),
bytes_client: BytesAPIClient = Depends(get_bytes_client),
local_repository: LocalPluginRepository = Depends(get_local_repository),
plugin_service: PluginService = Depends(get_plugin_service),
):
task = get_task(task_id, scheduler_client)

if task.status is not TaskStatus.RUNNING:
raise HTTPException(status_code=403, detail="Task does not have status running")

boefje_meta = create_boefje_meta(task, local_repository)
plugin = plugin_service.by_plugin_id(task.data.boefje.id, task.data.organization)
boefje_meta = create_boefje_meta(task, plugin)
boefje_meta.started_at = task.modified_at
boefje_meta.ended_at = datetime.now(timezone.utc)

bytes_client.login()
bytes_client.save_boefje_meta(boefje_meta)

if boefje_output.files:
mime_types = _default_mime_types(task.data.boefje)
mime_types = _default_mime_types(boefje_meta.boefje).union(plugin.produces)
for file in boefje_output.files:
raw = base64.b64decode(file.content)
# when supported, also save file.name to Bytes
Expand All @@ -148,15 +151,10 @@ def get_task(task_id, scheduler_client):
return task


def create_boefje_meta(task, local_repository):
boefje = task.data.boefje
boefje_resource = local_repository.by_id(boefje.id)
env_keys = boefje_resource.environment_keys
environment = get_environment_settings(task.data, env_keys) if env_keys else {}

def create_boefje_meta(task, plugin: PluginType) -> BoefjeMeta:
organization = task.data.organization
input_ooi = task.data.input_ooi
arguments = {"oci_arguments": boefje_resource.oci_arguments}
arguments = {"oci_arguments": plugin.oci_arguments}

if input_ooi:
reference = Reference.from_str(input_ooi)
Expand All @@ -169,10 +167,10 @@ def create_boefje_meta(task, local_repository):

boefje_meta = BoefjeMeta(
id=task.id,
boefje=boefje,
boefje=task.data.boefje,
input_ooi=input_ooi,
arguments=arguments,
organization=organization,
environment=environment,
environment=get_environment_settings(task.data, plugin.schema),
)
return boefje_meta
24 changes: 20 additions & 4 deletions boefjes/boefjes/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,18 @@
import structlog
from httpx import HTTPError
from pydantic import ValidationError
from sqlalchemy.orm import sessionmaker

from boefjes.clients.scheduler_client import SchedulerAPIClient, SchedulerClientInterface, Task, TaskStatus
from boefjes.config import Settings
from boefjes.dependencies.plugins import PluginService
from boefjes.job_handler import BoefjeHandler, NormalizerHandler, bytes_api_client
from boefjes.local import LocalBoefjeJobRunner, LocalNormalizerJobRunner
from boefjes.local_repository import get_local_repository
from boefjes.runtime_interfaces import Handler, WorkerManager
from boefjes.sql.config_storage import create_config_storage
from boefjes.sql.db import get_engine
from boefjes.sql.plugin_storage import create_plugin_storage

logger = structlog.get_logger(__name__)

Expand Down Expand Up @@ -169,7 +174,7 @@ def _cleanup_pending_worker_task(self, worker: mp.Process) -> None:
try:
task = self.scheduler_client.get_task(handling_task_id)

if task.status is TaskStatus.DISPATCHED:
if task.status is TaskStatus.DISPATCHED or task.status is TaskStatus.RUNNING:
try:
self.scheduler_client.patch_task(task.id, TaskStatus.FAILED)
logger.warning("Set status to failed in the scheduler for task[id=%s]", handling_task_id)
Expand Down Expand Up @@ -239,6 +244,7 @@ def _start_working(
handling_tasks[os.getpid()] = str(p_item.id)

try:
scheduler_client.patch_task(p_item.id, TaskStatus.RUNNING)
handler.handle(p_item.data)
status = TaskStatus.COMPLETED
except Exception: # noqa
Expand All @@ -248,17 +254,27 @@ def _start_working(
raise
finally:
try:
scheduler_client.patch_task(p_item.id, status) # Note: implicitly, we have p_item.id == task_id
logger.info("Set status to %s in the scheduler for task[id=%s]", status, p_item.data.id)
if scheduler_client.get_task(p_item.id).status == TaskStatus.RUNNING:
# The docker runner could have handled this already
scheduler_client.patch_task(p_item.id, status) # Note that implicitly, we have p_item.id == task_id
logger.info("Set status to %s in the scheduler for task[id=%s]", status, p_item.data.id)
except HTTPError:
logger.exception("Could not patch scheduler task to %s", status.value)


def get_runtime_manager(settings: Settings, queue: WorkerManager.Queue, log_level: str) -> WorkerManager:
local_repository = get_local_repository()

session = sessionmaker(bind=get_engine())()
plugin_service = PluginService(
create_plugin_storage(session),
create_config_storage(session),
local_repository,
)

item_handler: Handler
if queue is WorkerManager.Queue.BOEFJES:
item_handler = BoefjeHandler(LocalBoefjeJobRunner(local_repository), local_repository, bytes_api_client)
item_handler = BoefjeHandler(LocalBoefjeJobRunner(local_repository), plugin_service, bytes_api_client)
else:
item_handler = NormalizerHandler(
LocalNormalizerJobRunner(local_repository), bytes_api_client, settings.scan_profile_whitelist
Expand Down
23 changes: 16 additions & 7 deletions boefjes/boefjes/clients/bytes_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import typing
import uuid
from base64 import b64encode
from collections.abc import Callable, Set
from functools import wraps
from typing import Any
Expand Down Expand Up @@ -99,17 +100,25 @@ def get_normalizer_meta(self, normalizer_meta_id: uuid.UUID) -> NormalizerMeta:

@retry_with_login
def save_raw(self, boefje_meta_id: str, raw: str | bytes, mime_types: Set[str] = frozenset()) -> UUID:
headers = {"content-type": "application/octet-stream"}
headers.update(self.headers)
file_name = "raw" # The name provides a key for all ids returned, so this is arbitrary as we only upload 1 file

response = self._session.post(
"/bytes/raw",
content=raw,
headers=headers,
params={"mime_types": list(mime_types), "boefje_meta_id": boefje_meta_id},
json={
"files": [
{
"name": file_name,
"content": b64encode(raw if isinstance(raw, bytes) else raw.encode()).decode(),
"tags": list(mime_types),
}
]
},
headers=self.headers,
params={"boefje_meta_id": str(boefje_meta_id)},
)

self._verify_response(response)
return UUID(response.json()["id"])

return UUID(response.json()[file_name])

@retry_with_login
def get_raw(self, raw_data_id: str) -> bytes:
Expand Down
25 changes: 9 additions & 16 deletions boefjes/boefjes/dependencies/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def clone_settings_to_organisation(self, from_organisation: str, to_organisation
self.set_enabled_by_id(plugin_id, to_organisation, enabled=True)

def upsert_settings(self, settings: dict, organisation_id: str, plugin_id: str):
self._assert_settings_match_schema(settings, organisation_id, plugin_id)
self._assert_settings_match_schema(settings, plugin_id)
self._put_boefje(plugin_id)

return self.config_storage.upsert(organisation_id, plugin_id, settings=settings)
Expand All @@ -122,14 +122,14 @@ def _put_boefje(self, boefje_id: str) -> None:

try:
self.plugin_storage.boefje_by_id(boefje_id)
except PluginNotFound:
except PluginNotFound as e:
try:
plugin = self.local_repo.by_id(boefje_id)
except KeyError:
raise
raise e

if plugin.type != "boefje":
raise
raise e
self.plugin_storage.create_boefje(plugin)

def _put_normalizer(self, normalizer_id: str) -> None:
Expand All @@ -150,12 +150,7 @@ def _put_normalizer(self, normalizer_id: str) -> None:
def delete_settings(self, organisation_id: str, plugin_id: str):
self.config_storage.delete(organisation_id, plugin_id)

try:
self._assert_settings_match_schema({}, organisation_id, plugin_id)
except SettingsNotConformingToSchema:
logger.warning("Making sure %s is disabled for %s because settings are deleted", plugin_id, organisation_id)

self.set_enabled_by_id(plugin_id, organisation_id, False)
# We don't check the schema anymore because we can provide entries through the global environment as well

def schema(self, plugin_id: str) -> dict | None:
try:
Expand Down Expand Up @@ -184,9 +179,7 @@ def description(self, plugin_id: str, organisation_id: str) -> str:
return ""

def set_enabled_by_id(self, plugin_id: str, organisation_id: str, enabled: bool):
if enabled:
all_settings = self.get_all_settings(organisation_id, plugin_id)
self._assert_settings_match_schema(all_settings, organisation_id, plugin_id)
# We don't check the schema anymore because we can provide entries through the global environment as well

try:
self._put_boefje(plugin_id)
Expand All @@ -195,14 +188,14 @@ def set_enabled_by_id(self, plugin_id: str, organisation_id: str, enabled: bool)

self.config_storage.upsert(organisation_id, plugin_id, enabled=enabled)

def _assert_settings_match_schema(self, all_settings: dict, organisation_id: str, plugin_id: str):
def _assert_settings_match_schema(self, all_settings: dict, plugin_id: str):
schema = self.schema(plugin_id)

if schema: # No schema means that there is nothing to assert
try:
validate(instance=all_settings, schema=schema)
except ValidationError as e:
raise SettingsNotConformingToSchema(organisation_id, plugin_id, e.message) from e
raise SettingsNotConformingToSchema(plugin_id, e.message) from e

def _set_plugin_enabled(self, plugin: PluginType, organisation_id: str) -> PluginType:
with contextlib.suppress(KeyError, NotFound):
Expand All @@ -211,7 +204,7 @@ def _set_plugin_enabled(self, plugin: PluginType, organisation_id: str) -> Plugi
return plugin


def get_plugin_service(organisation_id: str) -> Iterator[PluginService]:
def get_plugin_service() -> Iterator[PluginService]:
def closure(session: Session):
return PluginService(
create_plugin_storage(session),
Expand Down
1 change: 0 additions & 1 deletion boefjes/boefjes/docker_boefjes_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ def run(self) -> None:
stderr_mime_types = boefjes.plugins.models._default_mime_types(self.boefje_meta.boefje)

task_id = self.boefje_meta.id
self.scheduler_client.patch_task(task_id, TaskStatus.RUNNING)
self.boefje_meta.started_at = datetime.now(timezone.utc)

try:
Expand Down
Loading

0 comments on commit eb16de9

Please sign in to comment.