Skip to content

Commit

Permalink
Fixed indentation and updated language file
Browse files Browse the repository at this point in the history
  • Loading branch information
ammar92 committed Oct 4, 2024
2 parents 01b7acf + a3f0d1f commit 2c07c41
Show file tree
Hide file tree
Showing 522 changed files with 21,374 additions and 13,897 deletions.
16 changes: 12 additions & 4 deletions boefjes/.ci/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ services:
command: sh -c 'python -m pytest -v tests/integration'
depends_on:
- ci_katalogus-db
- ci_katalogus
env_file:
- .ci/.env.test
volumes:
Expand Down Expand Up @@ -84,7 +85,7 @@ services:
- .ci/.env.test

ci_xtdb:
image: "ghcr.io/dekkers/xtdb-http-multinode:v1.0.8"
image: "ghcr.io/dekkers/xtdb-http-multinode:v1.1.0"

ci_octopoes_api_worker:
build:
Expand All @@ -103,8 +104,15 @@ services:
hard: 262144

ci_katalogus:
image: "docker.io/wiremock/wiremock:2.34.0"
volumes:
- .ci/wiremock:/home/wiremock
build:
context: ..
dockerfile: boefjes/Dockerfile
args:
- ENVIRONMENT=dev
command: uvicorn boefjes.katalogus.root:app --host 0.0.0.0 --port 8080
depends_on:
- ci_katalogus-db
env_file:
- .ci/.env.test
volumes:
- .:/app/boefjes
15 changes: 0 additions & 15 deletions boefjes/.ci/wiremock/mappings/organisations.json

This file was deleted.

34 changes: 17 additions & 17 deletions boefjes/boefjes/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import multiprocessing
from datetime import datetime, timezone
from enum import Enum
from multiprocessing.context import ForkContext, ForkProcess
from uuid import UUID

import structlog
Expand All @@ -13,18 +14,20 @@
from boefjes.clients.bytes_client import BytesAPIClient
from boefjes.clients.scheduler_client import SchedulerAPIClient, TaskStatus
from boefjes.config import settings
from boefjes.dependencies.plugins import PluginService, get_plugin_service
from boefjes.job_handler import get_environment_settings, get_octopoes_api_connector
from boefjes.job_models import BoefjeMeta
from boefjes.local_repository import LocalPluginRepository, get_local_repository
from boefjes.models import PluginType
from boefjes.plugins.models import _default_mime_types
from octopoes.models import Reference
from octopoes.models.exception import ObjectNotFoundException

app = FastAPI(title="Boefje API")
logger = structlog.get_logger(__name__)
ctx: ForkContext = multiprocessing.get_context("fork")


class UvicornServer(multiprocessing.Process):
class UvicornServer(ForkProcess):
def __init__(self, config: Config):
super().__init__()
self.server = Server(config=config)
Expand Down Expand Up @@ -58,7 +61,7 @@ class StatusEnum(str, Enum):

class File(BaseModel):
name: str | None = None
content: str = Field(..., contentEncoding="base64")
content: str = Field(json_schema_extra={"contentEncoding": "base64"})
tags: list[str] | None = None


Expand Down Expand Up @@ -88,14 +91,15 @@ async def root():
def boefje_input(
task_id: UUID,
scheduler_client: SchedulerAPIClient = Depends(get_scheduler_client),
local_repository: LocalPluginRepository = Depends(get_local_repository),
plugin_service: PluginService = Depends(get_plugin_service),
):
task = get_task(task_id, scheduler_client)

if task.status is not TaskStatus.RUNNING:
raise HTTPException(status_code=403, detail="Task does not have status running")

boefje_meta = create_boefje_meta(task, local_repository)
plugin = plugin_service.by_plugin_id(task.data.boefje.id, task.data.organization)
boefje_meta = create_boefje_meta(task, plugin)

output_url = str(settings.api).rstrip("/") + f"/api/v0/tasks/{task_id}"
return BoefjeInput(task_id=task_id, output_url=output_url, boefje_meta=boefje_meta)
Expand All @@ -107,22 +111,23 @@ def boefje_output(
boefje_output: BoefjeOutput,
scheduler_client: SchedulerAPIClient = Depends(get_scheduler_client),
bytes_client: BytesAPIClient = Depends(get_bytes_client),
local_repository: LocalPluginRepository = Depends(get_local_repository),
plugin_service: PluginService = Depends(get_plugin_service),
):
task = get_task(task_id, scheduler_client)

if task.status is not TaskStatus.RUNNING:
raise HTTPException(status_code=403, detail="Task does not have status running")

boefje_meta = create_boefje_meta(task, local_repository)
plugin = plugin_service.by_plugin_id(task.data.boefje.id, task.data.organization)
boefje_meta = create_boefje_meta(task, plugin)
boefje_meta.started_at = task.modified_at
boefje_meta.ended_at = datetime.now(timezone.utc)

bytes_client.login()
bytes_client.save_boefje_meta(boefje_meta)

if boefje_output.files:
mime_types = _default_mime_types(task.data.boefje)
mime_types = _default_mime_types(boefje_meta.boefje)
for file in boefje_output.files:
raw = base64.b64decode(file.content)
# when supported, also save file.name to Bytes
Expand All @@ -148,15 +153,10 @@ def get_task(task_id, scheduler_client):
return task


def create_boefje_meta(task, local_repository):
boefje = task.data.boefje
boefje_resource = local_repository.by_id(boefje.id)
env_keys = boefje_resource.environment_keys
environment = get_environment_settings(task.data, env_keys) if env_keys else {}

def create_boefje_meta(task, plugin: PluginType) -> BoefjeMeta:
organization = task.data.organization
input_ooi = task.data.input_ooi
arguments = {"oci_arguments": boefje_resource.oci_arguments}
arguments = {"oci_arguments": plugin.oci_arguments}

if input_ooi:
reference = Reference.from_str(input_ooi)
Expand All @@ -169,10 +169,10 @@ def create_boefje_meta(task, local_repository):

boefje_meta = BoefjeMeta(
id=task.id,
boefje=boefje,
boefje=task.data.boefje,
input_ooi=input_ooi,
arguments=arguments,
organization=organization,
environment=environment,
environment=get_environment_settings(task.data, plugin.boefje_schema),
)
return boefje_meta
59 changes: 39 additions & 20 deletions boefjes/boefjes/app.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,30 @@
import multiprocessing as mp
import multiprocessing
import os
import signal
import sys
import time
from multiprocessing.context import ForkContext
from multiprocessing.process import BaseProcess
from queue import Queue

import structlog
from httpx import HTTPError
from pydantic import ValidationError
from sqlalchemy.orm import sessionmaker

from boefjes.clients.scheduler_client import SchedulerAPIClient, SchedulerClientInterface, Task, TaskStatus
from boefjes.config import Settings
from boefjes.dependencies.plugins import PluginService
from boefjes.job_handler import BoefjeHandler, NormalizerHandler, bytes_api_client
from boefjes.local import LocalBoefjeJobRunner, LocalNormalizerJobRunner
from boefjes.local_repository import get_local_repository
from boefjes.runtime_interfaces import Handler, WorkerManager
from boefjes.sql.config_storage import create_config_storage
from boefjes.sql.db import get_engine
from boefjes.sql.plugin_storage import create_plugin_storage

logger = structlog.get_logger(__name__)
ctx: ForkContext = multiprocessing.get_context("fork")


class SchedulerWorkerManager(WorkerManager):
Expand All @@ -31,11 +39,11 @@ def __init__(
self.scheduler_client = scheduler_client
self.settings = settings

manager = mp.Manager()
manager = ctx.Manager()

self.task_queue = manager.Queue() # multiprocessing.Queue() will not work on macOS, see mp.Queue.qsize()
self.handling_tasks = manager.dict()
self.workers: list[mp.Process] = []
self.workers: list[BaseProcess] = []

logger.setLevel(log_level)

Expand All @@ -45,13 +53,13 @@ def run(self, queue_type: WorkerManager.Queue) -> None:
logger.info("Created worker pool for queue '%s'", queue_type.value)

self.workers = [
mp.Process(target=_start_working, args=self._worker_args()) for _ in range(self.settings.pool_size)
ctx.Process(target=_start_working, args=self._worker_args()) for _ in range(self.settings.pool_size)
]
for worker in self.workers:
worker.start()

signal.signal(signal.SIGINT, lambda signum, _: self.exit(queue_type, signum))
signal.signal(signal.SIGTERM, lambda signum, _: self.exit(queue_type, signum))
signal.signal(signal.SIGINT, lambda signum, _: self.exit(signum))
signal.signal(signal.SIGTERM, lambda signum, _: self.exit(signum))

while True:
try:
Expand All @@ -68,7 +76,7 @@ def run(self, queue_type: WorkerManager.Queue) -> None:
# been called yet.
if not self.exited:
logger.exception("Exiting worker...")
self.exit(queue_type)
self.exit()

raise

Expand All @@ -93,18 +101,18 @@ def _fill_queue(self, task_queue: Queue, queue_type: WorkerManager.Queue):

all_queues_empty = True

for queue_type in queues:
logger.debug("Popping from queue %s", queue_type.id)
for queue in queues:
logger.debug("Popping from queue %s", queue.id)

try:
p_item = self.scheduler_client.pop_item(queue_type.id)
p_item = self.scheduler_client.pop_item(queue.id)
except (HTTPError, ValidationError):
logger.exception("Popping task from scheduler failed, sleeping 10 seconds")
time.sleep(10)
continue

if not p_item:
logger.debug("Queue %s empty", queue_type.id)
logger.debug("Queue %s empty", queue.id)
continue

all_queues_empty = False
Expand Down Expand Up @@ -153,13 +161,13 @@ def _check_workers(self) -> None:
self._cleanup_pending_worker_task(worker)
worker.close()

new_worker = mp.Process(target=_start_working, args=self._worker_args())
new_worker = ctx.Process(target=_start_working, args=self._worker_args())
new_worker.start()
new_workers.append(new_worker)

self.workers = new_workers

def _cleanup_pending_worker_task(self, worker: mp.Process) -> None:
def _cleanup_pending_worker_task(self, worker: BaseProcess) -> None:
if worker.pid not in self.handling_tasks:
logger.debug("No pending task found for Worker[pid=%s, %s]", worker.pid, _format_exit_code(worker.exitcode))
return
Expand All @@ -169,7 +177,7 @@ def _cleanup_pending_worker_task(self, worker: mp.Process) -> None:
try:
task = self.scheduler_client.get_task(handling_task_id)

if task.status is TaskStatus.DISPATCHED:
if task.status is TaskStatus.DISPATCHED or task.status is TaskStatus.RUNNING:
try:
self.scheduler_client.patch_task(task.id, TaskStatus.FAILED)
logger.warning("Set status to failed in the scheduler for task[id=%s]", handling_task_id)
Expand All @@ -181,7 +189,7 @@ def _cleanup_pending_worker_task(self, worker: mp.Process) -> None:
def _worker_args(self) -> tuple:
return self.task_queue, self.item_handler, self.scheduler_client, self.handling_tasks

def exit(self, queue_type: WorkerManager.Queue, signum: int | None = None):
def exit(self, signum: int | None = None):
try:
if signum:
logger.info("Received %s, exiting", signal.Signals(signum).name)
Expand All @@ -191,7 +199,7 @@ def exit(self, queue_type: WorkerManager.Queue, signum: int | None = None):

for p_item in items:
try:
self.scheduler_client.push_item(queue_type.value, p_item)
self.scheduler_client.push_item(p_item)
except HTTPError:
logger.exception("Rescheduling task failed[id=%s]", p_item.id)

Expand Down Expand Up @@ -226,7 +234,7 @@ def _format_exit_code(exitcode: int | None) -> str:


def _start_working(
task_queue: mp.Queue,
task_queue: multiprocessing.Queue,
handler: Handler,
scheduler_client: SchedulerClientInterface,
handling_tasks: dict[int, str],
Expand All @@ -239,6 +247,7 @@ def _start_working(
handling_tasks[os.getpid()] = str(p_item.id)

try:
scheduler_client.patch_task(p_item.id, TaskStatus.RUNNING)
handler.handle(p_item.data)
status = TaskStatus.COMPLETED
except Exception: # noqa
Expand All @@ -248,17 +257,27 @@ def _start_working(
raise
finally:
try:
scheduler_client.patch_task(p_item.id, status) # Note: implicitly, we have p_item.id == task_id
logger.info("Set status to %s in the scheduler for task[id=%s]", status, p_item.data.id)
if scheduler_client.get_task(p_item.id).status == TaskStatus.RUNNING:
# The docker runner could have handled this already
scheduler_client.patch_task(p_item.id, status) # Note that implicitly, we have p_item.id == task_id
logger.info("Set status to %s in the scheduler for task[id=%s]", status, p_item.data.id)
except HTTPError:
logger.exception("Could not patch scheduler task to %s", status.value)


def get_runtime_manager(settings: Settings, queue: WorkerManager.Queue, log_level: str) -> WorkerManager:
local_repository = get_local_repository()

session = sessionmaker(bind=get_engine())()
plugin_service = PluginService(
create_plugin_storage(session),
create_config_storage(session),
local_repository,
)

item_handler: Handler
if queue is WorkerManager.Queue.BOEFJES:
item_handler = BoefjeHandler(LocalBoefjeJobRunner(local_repository), local_repository, bytes_api_client)
item_handler = BoefjeHandler(LocalBoefjeJobRunner(local_repository), plugin_service, bytes_api_client)
else:
item_handler = NormalizerHandler(
LocalNormalizerJobRunner(local_repository), bytes_api_client, settings.scan_profile_whitelist
Expand Down
Loading

0 comments on commit 2c07c41

Please sign in to comment.