Skip to content

Commit

Permalink
Merge branch 'main' into feature/stylelint-fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Darwinkel authored Jul 13, 2023
2 parents fdd2d71 + 9f647b6 commit d17e15f
Show file tree
Hide file tree
Showing 47 changed files with 879 additions and 259 deletions.
1 change: 1 addition & 0 deletions .env-dist
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ RABBITMQ_DEFAULT_VHOST=kat
RABBITMQ_DEFAULT_USER={%QUEUE_USERNAME}
RABBITMQ_DEFAULT_PASS={%QUEUE_PASSWORD}

# Boefjes and Normalizers
QUEUE_NAME_BOEFJES=boefjes
QUEUE_NAME_NORMALIZERS=normalizers
QUEUE_URI=amqp://${RABBITMQ_DEFAULT_USER}:${RABBITMQ_DEFAULT_PASS}@rabbitmq:5672/${RABBITMQ_DEFAULT_VHOST}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/build-debian-docker-image.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
workflow_dispatch: {}
pull_request:
paths:
- "packaging"
- "packaging/**"
- ".github/workflows/build-debian-docker-image.yml"

env:
Expand Down
1 change: 1 addition & 0 deletions boefjes/.env-dist
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ BYTES_PASSWORD=secret

KATALOGUS_API=
KATALOGUS_DB_URI=
WORKER_HEARTBEAT=
38 changes: 19 additions & 19 deletions boefjes/boefjes/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
import signal
import time
from typing import Callable, Dict, List
from typing import Dict, List, Tuple

from pydantic import ValidationError
from requests import HTTPError
Expand All @@ -27,26 +27,27 @@ class SchedulerWorkerManager(WorkerManager):
def __init__(
self,
item_handler: Handler,
client_factory: Callable[[], SchedulerClientInterface],
scheduler_client: SchedulerClientInterface,
settings: Settings,
log_level: str, # TODO: (re)move?
):
self.item_handler = item_handler
self.client_factory = client_factory
self.scheduler_client = client_factory()
self.scheduler_client = scheduler_client
self.settings = settings

self.task_queue = mp.Queue()
self.handling_tasks = mp.Manager().dict()
manager = mp.Manager()

self.task_queue = manager.Queue() # multiprocessing.Queue() will not work on macOS, see mp.Queue.qsize()
self.handling_tasks = manager.dict()
self.workers = []

logger.setLevel(log_level)

def run(self, queue_type: WorkerManager.Queue) -> None:
logger.info("Created worker pool for queue '%s'", queue_type.value)

self.worker_args = (self.task_queue, self.item_handler, self.client_factory, self.handling_tasks)
self.workers = [
mp.Process(target=_start_working, args=self.worker_args) for _ in range(self.settings.pool_size)
mp.Process(target=_start_working, args=self._worker_args()) for _ in range(self.settings.pool_size)
]
for worker in self.workers:
worker.start()
Expand Down Expand Up @@ -132,18 +133,19 @@ def _check_workers(self) -> None:
new_workers = []

for worker in self.workers:
if worker.is_alive():
if not worker._closed and worker.is_alive():
new_workers.append(worker)
continue

logger.warning(
"Worker[pid=%s, %s] not alive, creating new worker...", worker.pid, _format_exit_code(worker.exitcode)
)

self._cleanup_pending_worker_task(worker)
worker.close()
if not worker._closed: # Closed workers do not have a pid, so cleaning up would fail
self._cleanup_pending_worker_task(worker)
worker.close()

new_worker = mp.Process(target=_start_working, args=self.worker_args)
new_worker = mp.Process(target=_start_working, args=self._worker_args())
new_worker.start()
new_workers.append(new_worker)

Expand All @@ -170,6 +172,9 @@ def _cleanup_pending_worker_task(self, worker: mp.Process) -> None:
except HTTPError:
logger.exception("Could not get scheduler task[id=%s]", handling_task_id)

def _worker_args(self) -> Tuple:
return self.task_queue, self.item_handler, self.scheduler_client, self.handling_tasks

def exit(self, queue_type: WorkerManager.Queue):
if not self.task_queue.empty():
items: List[QueuePrioritizedItem] = [self.task_queue.get() for _ in range(self.task_queue.qsize())]
Expand Down Expand Up @@ -200,10 +205,9 @@ def _format_exit_code(exitcode: int) -> str:
def _start_working(
task_queue: mp.Queue,
handler: Handler,
client_factory: Callable[[], SchedulerClientInterface],
scheduler_client: SchedulerClientInterface,
handling_tasks: Dict[int, str],
):
scheduler_client = client_factory()
logger.info("Started listening for tasks from worker[pid=%s]", os.getpid())

while True:
Expand All @@ -228,18 +232,14 @@ def _start_working(


def get_runtime_manager(settings: Settings, queue: WorkerManager.Queue, log_level: str) -> WorkerManager:
# Not a lambda since multiprocessing tries and fails to pickle lambda's
def client_factory():
return SchedulerAPIClient(settings.scheduler_api)

if queue is WorkerManager.Queue.BOEFJES:
item_handler = BoefjeHandler(LocalBoefjeJobRunner(get_local_repository()), get_local_repository())
else:
item_handler = NormalizerHandler(LocalNormalizerJobRunner(get_local_repository()))

return SchedulerWorkerManager(
item_handler,
client_factory, # Do not share a session between workers
SchedulerAPIClient(settings.scheduler_api), # Do not share a session between workers
settings,
log_level,
)
35 changes: 15 additions & 20 deletions boefjes/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import multiprocessing
import time
from datetime import datetime, timezone
from multiprocessing import Queue as MultiprocessingQueue
from multiprocessing import Manager
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union

Expand All @@ -15,10 +15,6 @@
from boefjes.runtime_interfaces import Handler, WorkerManager
from tests.stubs import get_dummy_data

_tasks = multiprocessing.Manager().dict()
_popped_items = multiprocessing.Manager().dict()
_pushed_items = multiprocessing.Manager().dict()


class MockSchedulerClient(SchedulerClientInterface):
def __init__(
Expand All @@ -29,7 +25,7 @@ def __init__(
log_path: Path,
raise_on_empty_queue: Exception = KeyboardInterrupt,
iterations_to_wait_for_exception: int = 0,
sleep_time: int = 0.05,
sleep_time: int = 0.1,
):
self.queue_response = queue_response
self.boefje_responses = boefje_responses
Expand All @@ -40,11 +36,12 @@ def __init__(
self.sleep_time = sleep_time

self._iterations = 0
self._tasks: Dict[str, Task] = _tasks
self._popped_items: Dict[str, QueuePrioritizedItem] = _popped_items
self._pushed_items: Dict[str, Tuple[str, QueuePrioritizedItem]] = _pushed_items
self._tasks: Dict[str, Task] = multiprocessing.Manager().dict()
self._popped_items: Dict[str, QueuePrioritizedItem] = multiprocessing.Manager().dict()
self._pushed_items: Dict[str, Tuple[str, QueuePrioritizedItem]] = multiprocessing.Manager().dict()

def get_queues(self) -> List[Queue]:
time.sleep(self.sleep_time)
return parse_raw_as(List[Queue], self.queue_response)

def pop_item(self, queue: str) -> Optional[QueuePrioritizedItem]:
Expand Down Expand Up @@ -97,7 +94,7 @@ def push_item(self, queue_id: str, p_item: QueuePrioritizedItem) -> None:
class MockHandler(Handler):
def __init__(self, exception=Exception):
self.sleep_time = 0
self.queue = MultiprocessingQueue()
self.queue = Manager().Queue()
self.exception = exception

def handle(self, item: Union[BoefjeMeta, NormalizerMeta]):
Expand All @@ -118,13 +115,11 @@ def item_handler(tmp_path: Path):

@pytest.fixture
def manager(item_handler: MockHandler, tmp_path: Path) -> SchedulerWorkerManager:
def client_factory():
return MockSchedulerClient(
get_dummy_data("scheduler/queues_response.json"),
2 * [get_dummy_data("scheduler/pop_response_boefje.json")]
+ [get_dummy_data("scheduler/should_crash.json")],
[get_dummy_data("scheduler/pop_response_normalizer.json")],
tmp_path / "patch_task_log",
)

return SchedulerWorkerManager(item_handler, client_factory, Settings(pool_size=1, poll_interval=0.01), "DEBUG")
scheduler_client = MockSchedulerClient(
get_dummy_data("scheduler/queues_response.json"),
2 * [get_dummy_data("scheduler/pop_response_boefje.json")] + [get_dummy_data("scheduler/should_crash.json")],
[get_dummy_data("scheduler/pop_response_normalizer.json")],
tmp_path / "patch_task_log",
)

return SchedulerWorkerManager(item_handler, scheduler_client, Settings(pool_size=1, poll_interval=0.01), "DEBUG")
12 changes: 4 additions & 8 deletions boefjes/tests/test_app.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import json
from multiprocessing import Queue
from multiprocessing import Manager
from pathlib import Path

import pytest
Expand Down Expand Up @@ -29,7 +29,7 @@ def test_one_process(manager: SchedulerWorkerManager, item_handler: MockHandler)

def test_two_processes(manager: SchedulerWorkerManager, item_handler: MockHandler) -> None:
manager.settings.pool_size = 2
manager.task_queue = Queue(maxsize=2)
manager.task_queue = Manager().Queue()

with pytest.raises(KeyboardInterrupt):
manager.run(WorkerManager.Queue.BOEFJES)
Expand All @@ -50,7 +50,6 @@ def test_two_processes_exception(manager: SchedulerWorkerManager, item_handler:
[get_dummy_data("scheduler/pop_response_normalizer.json")],
tmp_path / "patch_task_log",
)
manager.client_factory = lambda: manager.scheduler_client

manager.settings.pool_size = 2
with pytest.raises(KeyboardInterrupt):
Expand All @@ -67,10 +66,9 @@ def test_two_processes_handler_exception(manager: SchedulerWorkerManager, item_h
[get_dummy_data("scheduler/pop_response_normalizer.json")],
tmp_path / "patch_task_log",
)
manager.client_factory = lambda: manager.scheduler_client

manager.settings.pool_size = 2
manager.task_queue = Queue(maxsize=2)
manager.task_queue = Manager().Queue()
with pytest.raises(KeyboardInterrupt):
manager.run(WorkerManager.Queue.BOEFJES)

Expand Down Expand Up @@ -101,9 +99,8 @@ def test_two_processes_cleanup_unfinished_tasks(
[],
tmp_path / "patch_task_log",
)
manager.client_factory = lambda: manager.scheduler_client
manager.settings.pool_size = 2
manager.task_queue = Queue(maxsize=2)
manager.task_queue = Manager().Queue()

item_handler.sleep_time = 200

Expand Down Expand Up @@ -143,7 +140,6 @@ def test_null(manager: SchedulerWorkerManager, tmp_path: Path, item_handler: Moc
tmp_path / "patch_task_log",
iterations_to_wait_for_exception=2,
)
manager.client_factory = lambda: manager.scheduler_client

with pytest.raises(KeyboardInterrupt):
manager.run(WorkerManager.Queue.BOEFJES)
Expand Down
25 changes: 17 additions & 8 deletions bytes/bytes/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,37 @@ def __init__(self, queue_uri: str):
logger.info("Connected to RabbitMQ")

def publish(self, event: Event) -> None:
self._check_connection()

event_data = event.json()
logger.debug("Publishing event: %s", event_data)
queue_name = self._queue_name(event)

logger.debug("Publishing event: %s", event_data)
try:
self.channel.queue_declare(queue_name, durable=True)
except pika.exceptions.AMQPChannelError as e:
except pika.exceptions.AMQPError as e:
logger.info("Channel error %s, recreating channel", e)
self.channel = self.connection.channel()
self._check_connection()
self.channel.queue_declare(queue_name, durable=True)

try:
self.channel.basic_publish("", queue_name, event_data.encode())
except pika.exceptions.ConnectionClosed:
logger.exception("RabbitMQ connection was closed: retrying with a new connection.")
except pika.exceptions.AMQPError:
logger.info("RabbitMQ connection was closed: retrying with a new connection.")
self._check_connection()
self.channel.basic_publish("", queue_name, event_data.encode())

logger.info("Published event [event_id=%s] to queue %s", event.event_id, queue_name)

def _check_connection(self):
if self.connection.is_closed:
self.connection = pika.BlockingConnection(pika.URLParameters(self.queue_uri))
self.channel = self.connection.channel()
logger.warning("Reconnected to RabbitMQ because connection was closed")

self.channel.basic_publish("", queue_name, event_data.encode())

logger.info("Published event [event_id=%s] to queue %s", event.event_id, queue_name)
if self.channel.is_closed:
self.channel = self.connection.channel()
logger.warning("Recreated RabbitMQ channel because channel was closed")

@staticmethod
def _queue_name(event: Event) -> str:
Expand Down
25 changes: 13 additions & 12 deletions docs/source/developer_documentation/boefjes.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,19 @@ will set the default value for the `TOP_PORTS` setting (used by the nmap Boefje)
This default value can be overridden by setting any value for `TOP_PORTS` in the KAT-alogus.


| Environment variable | Value | Description |
|----------------------------|------------------------------|---------------------------------------------------|
| QUEUE_NAME_BOEFJES | "boefjes" | Queue name for boefjes |
| QUEUE_NAME_NORMALIZERS | "normalizers" | Queue name for normalizers |
| QUEUE_HOST | "rabbitmq" | The RabbitMQ host |
| OCTOPOES_API | "http://octopoes_api:80" | URI for the Octopoes API |
| BYTES_API | "http://bytes:8000" | URI for the Bytes API |
| KATALOGUS_API | "http://katalogus:8000" | URI for the Katalogus API |
| KATALOGUS_DB_URI | "postgresql:// ..." | URI for the Postgresql DB |
| ENCRYPTION_MIDDLEWARE | "IDENTITY" or "NACL_SEALBOX" | Encryption to use for the katalogus settings |
| KATALOGUS_PRIVATE_KEY_B_64 | "..." | KATalogus NaCl Sealbox base-64 private key string |
| KATALOGUS_PUBLIC_KEY_B_64 | "..." | KATalogus NaCl Sealbox base-64 public key string |
| Environment variable | Value | Description |
|----------------------------|------------------------------|------------------------------------------------------------------|
| QUEUE_NAME_BOEFJES | "boefjes" | Queue name for boefjes |
| QUEUE_NAME_NORMALIZERS | "normalizers" | Queue name for normalizers |
| QUEUE_HOST | "rabbitmq" | The RabbitMQ host |
| WORKER_HEARTBEAT | "1.0" | Seconds to wait before checking the workers when queues are full |
| OCTOPOES_API | "http://octopoes_api:80" | URI for the Octopoes API |
| BYTES_API | "http://bytes:8000" | URI for the Bytes API |
| KATALOGUS_API | "http://katalogus:8000" | URI for the Katalogus API |
| KATALOGUS_DB_URI | "postgresql:// ..." | URI for the Postgresql DB |
| ENCRYPTION_MIDDLEWARE | "IDENTITY" or "NACL_SEALBOX" | Encryption to use for the katalogus settings |
| KATALOGUS_PRIVATE_KEY_B_64 | "..." | KATalogus NaCl Sealbox base-64 private key string |
| KATALOGUS_PUBLIC_KEY_B_64 | "..." | KATalogus NaCl Sealbox base-64 public key string |

## Design

Expand Down
5 changes: 5 additions & 0 deletions mula/.env-dist
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ SCHEDULER_MONITOR_ORGANISATIONS_INTERVAL=
# RabbitMQ host address
SCHEDULER_RABBITMQ_DSN=

# RabbitMQ prefetch_count for channel.basic_qos(), which is the number of unacknowledged messages on a channel.
# Also see https://www.rabbitmq.com/consumer-prefetch.html.

SCHEDULER_RABBITMQ_PREFETCH_COUNT=

# Host url's of external service connectors
KATALOGUS_API=
BYTES_API=
Expand Down
4 changes: 4 additions & 0 deletions mula/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,8 @@ creation of their schedulers. Default is `60`.

`SCHEDULER_RABBITMQ_DSN` is the url of the RabbitMQ host.

`SCHEDULER_RABBITMQ_PREFETCH_COUNT` is the RabbitMQ prefetch count for
`channel.basic_qos()`, i.e. the number of unacknowledged messages on a channel.
Also see https://www.rabbitmq.com/consumer-prefetch.html.

`SCHEDULER_DB_DSN` is the locator of the database
2 changes: 0 additions & 2 deletions mula/scheduler/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ def create_normalizer_scheduler(self, org: Organisation) -> schedulers.Normalize
scheduler_id=identifier,
queue=queue,
ranker=ranker,
populate_queue_enabled=self.ctx.config.normalizer_populate,
organisation=org,
callback=self.remove_scheduler,
)
Expand Down Expand Up @@ -119,7 +118,6 @@ def create_boefje_scheduler(self, org: Organisation) -> schedulers.BoefjeSchedul
scheduler_id=identifier,
queue=queue,
ranker=ranker,
populate_queue_enabled=self.ctx.config.boefje_populate,
organisation=org,
callback=self.remove_scheduler,
)
Expand Down
1 change: 1 addition & 0 deletions mula/scheduler/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class Settings(BaseSettings):
host_octopoes: str = Field(..., env="OCTOPOES_API")
host_mutation: str = Field(..., env="SCHEDULER_RABBITMQ_DSN")
host_raw_data: str = Field(..., env="SCHEDULER_RABBITMQ_DSN")
queue_prefetch_count: int = Field(100, env="SCHEDULER_QUEUE_PREFETCH_COUNT")
host_normalizer_meta: str = Field(..., env="SCHEDULER_RABBITMQ_DSN")
span_export_grpc_endpoint: Optional[str] = Field(None, env="SPAN_EXPORT_GRPC_ENDPOINT")

Expand Down
3 changes: 2 additions & 1 deletion mula/scheduler/connectors/listeners/listeners.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ def dispatch(self, body: bytes) -> None:
"""Dispatch a message without a return value"""
raise NotImplementedError

def basic_consume(self, queue: str, durable: bool) -> None:
def basic_consume(self, queue: str, durable: bool, prefetch_count: int) -> None:
self.channel.queue_declare(queue=queue, durable=durable)
self.channel.basic_qos(prefetch_count=prefetch_count)
self.channel.basic_consume(queue, on_message_callback=self.callback)
self.channel.start_consuming()

Expand Down
Loading

0 comments on commit d17e15f

Please sign in to comment.