Skip to content

Commit

Permalink
fix: Fix command queue job timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
MarekSuchanek committed Sep 13, 2024
1 parent b3defbc commit b7a558d
Show file tree
Hide file tree
Showing 17 changed files with 174 additions and 80 deletions.
55 changes: 47 additions & 8 deletions packages/dsw-command-queue/dsw/command_queue/command_queue.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import abc
import datetime
import func_timeout
import logging
import os
import platform
Expand Down Expand Up @@ -45,21 +46,26 @@ class CommandWorker:
def work(self, payload: PersistentCommand):
pass

def process_exception(self, e: Exception):
def process_timeout(self, e: BaseException):
pass

def process_exception(self, e: BaseException):
pass


class CommandQueue:

def __init__(self, worker: CommandWorker, db: Database,
channel: str, component: str, timeout: float):
channel: str, component: str, wait_timeout: float,
work_timeout: int | None = None):
self.worker = worker
self.db = db
self.queries = CommandQueries(
channel=channel,
component=component
)
self.timeout = timeout
self.wait_timeout = wait_timeout
self.work_timeout = work_timeout

@tenacity.retry(
reraise=True,
Expand All @@ -84,15 +90,15 @@ def run(self):
self._fetch_and_process_queued()

LOG.debug('Waiting for notifications')
w = select.select(fds, [], [], self.timeout)
w = select.select(fds, [], [], self.wait_timeout)

if INTERRUPTED:
LOG.debug('Interrupt signal received, ending...')
break

if w == ([], [], []):
LOG.debug(f'Nothing received in this cycle '
f'(timeouted after {self.timeout} seconds)')
f'(timeouted after {self.wait_timeout} seconds)')
else:
notifications = 0
for n in psycopg.generators.notifies(queue_conn.connection.pgconn):
Expand Down Expand Up @@ -141,13 +147,46 @@ def fetch_and_process(self) -> bool:
LOG.debug(f'Previous state: {command.state}')
LOG.debug(f'Attempts: {command.attempts} / {command.max_attempts}')
LOG.debug(f'Last error: {command.last_error_message}')
attempt_number = command.attempts + 1

try:
self.worker.work(command)
self.db.execute_query(
query=self.queries.query_command_start(),
attempts=attempt_number,
updated_at=datetime.datetime.now(tz=datetime.UTC),
uuid=command.uuid,
)
self.db.conn_query.connection.commit()

def work():
self.worker.work(command)

if self.work_timeout is None:
LOG.info('Processing (without any timeout set)')
work()
else:
LOG.info(f'Processing (with timeout set to {self.work_timeout} seconds)')
func_timeout.func_timeout(
timeout=self.work_timeout,
func=work,
args=(),
kwargs=None,
)

self.db.execute_query(
query=self.queries.query_command_done(),
attempts=command.attempts + 1,
attempts=attempt_number,
updated_at=datetime.datetime.now(tz=datetime.UTC),
uuid=command.uuid,
)
except func_timeout.exceptions.FunctionTimedOut as e:
msg = f'Processing exceeded time limit ({self.work_timeout} seconds)'
LOG.warning(msg)
self.worker.process_timeout(e)
self.db.execute_query(
query=self.queries.query_command_error(),
attempts=attempt_number,
error_message=msg,
updated_at=datetime.datetime.now(tz=datetime.UTC),
uuid=command.uuid,
)
Expand All @@ -157,7 +196,7 @@ def fetch_and_process(self) -> bool:
self.worker.process_exception(e)
self.db.execute_query(
query=self.queries.query_command_error(),
attempts=command.attempts + 1,
attempts=attempt_number,
error_message=msg,
updated_at=datetime.datetime.now(tz=datetime.UTC),
uuid=command.uuid,
Expand Down
9 changes: 9 additions & 0 deletions packages/dsw-command-queue/dsw/command_queue/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,12 @@ def query_command_done() -> str:
updated_at = %(updated_at)s
WHERE uuid = %(uuid)s;
"""

@staticmethod
def query_command_start() -> str:
return """
UPDATE persistent_command
SET attempts = %(attempts)s,
updated_at = %(updated_at)s
WHERE uuid = %(uuid)s;
"""
1 change: 1 addition & 0 deletions packages/dsw-command-queue/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ classifiers = [
]
requires-python = '>=3.10, <4'
dependencies = [
'func-timeout',
# DSW
"dsw-database==4.9.4",
]
Expand Down
1 change: 1 addition & 0 deletions packages/dsw-command-queue/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
func_timeout==4.3.5
psycopg==3.2.1
psycopg-binary==3.2.1
PyYAML==6.0.1
Expand Down
5 changes: 3 additions & 2 deletions packages/dsw-config/dsw/config/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ def __init__(self, enabled: bool, workers_dsn: Optional[str],

class DatabaseConfig(ConfigModel):

def __init__(self, connection_string: str, connection_timeout: int, queue_timout: int):
def __init__(self, connection_string: str, connection_timeout: int,
queue_timeout: int):
self.connection_string = connection_string
self.connection_timeout = connection_timeout
self.queue_timout = queue_timout
self.queue_timeout = queue_timeout


class S3Config(ConfigModel):
Expand Down
2 changes: 1 addition & 1 deletion packages/dsw-config/dsw/config/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def db(self) -> DatabaseConfig:
return DatabaseConfig(
connection_string=self.get(self.keys.database.connection_string),
connection_timeout=self.get(self.keys.database.connection_timeout),
queue_timout=self.get(self.keys.database.queue_timeout),
queue_timeout=self.get(self.keys.database.queue_timeout),
)

@property
Expand Down
39 changes: 36 additions & 3 deletions packages/dsw-data-seeder/dsw/data_seeder/config.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,42 @@
from dsw.config import DSWConfigParser
from dsw.config.keys import ConfigKey, cast_str, cast_int
from dsw.config.model import DatabaseConfig, S3Config, \
from dsw.config.keys import ConfigKey, ConfigKeys, ConfigKeysContainer, \
cast_str, cast_int, cast_optional_int
from dsw.config.model import ConfigModel, DatabaseConfig, S3Config, \
LoggingConfig, SentryConfig, CloudConfig, GeneralConfig


class _ExperimentalKeys(ConfigKeysContainer):
job_timeout = ConfigKey(
yaml_path=['experimental', 'jobTimeout'],
var_names=['EXPERIMENTAL_JOB_TIMEOUT'],
default=None,
cast=cast_optional_int,
)


class MailerConfigKeys(ConfigKeys):
experimental = _ExperimentalKeys


class ExperimentalConfig(ConfigModel):

def __init__(self, job_timeout: int | None):
self.job_timeout = job_timeout


class SeederConfig:

def __init__(self, db: DatabaseConfig, s3: S3Config, log: LoggingConfig,
sentry: SentryConfig, cloud: CloudConfig, general: GeneralConfig,
extra_dbs: dict[str, DatabaseConfig]):
extra_dbs: dict[str, DatabaseConfig], experimental: ExperimentalConfig):
self.general = general
self.db = db
self.s3 = s3
self.log = log
self.sentry = sentry
self.cloud = cloud
self.extra_dbs = extra_dbs
self.experimental = experimental

def __str__(self):
return f'SeederConfig\n' \
Expand All @@ -26,11 +47,16 @@ def __str__(self):
f'{self.log}' \
f'{self.sentry}' \
f'{self.cloud}' \
f'{self.experimental}' \
f'====================\n'


class SeederConfigParser(DSWConfigParser):

def __init__(self):
super().__init__(keys=MailerConfigKeys)
self.keys = MailerConfigKeys # type: type[MailerConfigKeys]

@property
def extra_dbs(self) -> dict[str, DatabaseConfig]:
result = {}
Expand All @@ -57,6 +83,12 @@ def extra_dbs(self) -> dict[str, DatabaseConfig]:

return result

@property
def experimental(self) -> ExperimentalConfig:
return ExperimentalConfig(
job_timeout=self.get(self.keys.experimental.job_timeout),
)

@property
def config(self) -> SeederConfig:
return SeederConfig(
Expand All @@ -67,4 +99,5 @@ def config(self) -> SeederConfig:
cloud=self.cloud,
general=self.general,
extra_dbs=self.extra_dbs,
experimental=self.experimental,
)
11 changes: 10 additions & 1 deletion packages/dsw-data-seeder/dsw/data_seeder/seeder.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ def _run_preparation(self, recipe_name: str) -> CommandQueue:
db=Context.get().app.db,
channel=CMD_CHANNEL,
component=CMD_COMPONENT,
timeout=Context.get().app.cfg.db.queue_timout,
wait_timeout=Context.get().app.cfg.db.queue_timeout,
work_timeout=Context.get().app.cfg.experimental.job_timeout,
)
return queue

Expand Down Expand Up @@ -288,6 +289,14 @@ def work(self, cmd: PersistentCommand):
Context.get().update_trace_id('-')
SentryReporter.set_context('cmd_uuid', '-')

def process_timeout(self, e: BaseException):
LOG.info('Failed with timeout')
SentryReporter.capture_exception(e)

def process_exception(self, e: BaseException):
LOG.info('Failed with unexpected error', exc_info=e)
SentryReporter.capture_exception(e)

@staticmethod
def _update_component_info():
built_at = dateutil.parser.parse(BUILD_INFO.built_at)
Expand Down
1 change: 1 addition & 0 deletions packages/dsw-data-seeder/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
certifi==2024.7.4
click==8.1.7
func_timeout==4.3.5
minio==7.2.7
psycopg==3.2.1
psycopg-binary==3.2.1
Expand Down
8 changes: 4 additions & 4 deletions packages/dsw-document-worker/dsw/document_worker/config.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import shlex
from typing import List, Optional, Type
from typing import List, Optional

from dsw.config import DSWConfigParser
from dsw.config.keys import ConfigKey, ConfigKeys, ConfigKeysContainer,\
from dsw.config.keys import ConfigKey, ConfigKeys, ConfigKeysContainer, \
cast_str, cast_optional_int
from dsw.config.model import GeneralConfig, SentryConfig, DatabaseConfig,\
from dsw.config.model import GeneralConfig, SentryConfig, DatabaseConfig, \
S3Config, LoggingConfig, CloudConfig, ConfigModel

from .consts import DocumentNamingStrategy
Expand Down Expand Up @@ -175,7 +175,7 @@ class DocumentWorkerConfigParser(DSWConfigParser):

def __init__(self):
super().__init__(keys=DocWorkerConfigKeys)
self.keys = DocWorkerConfigKeys # type: Type[DocWorkerConfigKeys]
self.keys = DocWorkerConfigKeys # type: type[DocWorkerConfigKeys]

@property
def documents(self) -> DocumentsConfig:
Expand Down
11 changes: 0 additions & 11 deletions packages/dsw-document-worker/dsw/document_worker/limits.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,3 @@ def check_size_usage(job_id: str, doc_size: int,
f'required {byte_size_format(doc_size)} but '
f'only {byte_size_format(remains)} remains.'
)

@staticmethod
def timeout_exceeded(job_id: str):
job_timeout = Context.get().app.cfg.experimental.job_timeout
if job_timeout is None:
return
raise JobException(
job_id=job_id,
msg=f'Document generation exceeded time limit '
f'({job_timeout} seconds).'
)
29 changes: 0 additions & 29 deletions packages/dsw-document-worker/dsw/document_worker/utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
import contextlib
import signal

from typing import Optional

_BYTE_SIZES = ["B", "kB", "MB", "GB", "TB", "PB", "EB", "ZB"]


Expand All @@ -16,27 +11,3 @@ def byte_size_format(num: float):
return f'{_round_size(num)} {unit}'
num /= 1000.0
return f'{_round_size(num)} YB'


class JobTimeoutError(TimeoutError):
pass


def _raise_timeout(signum, frame):
raise JobTimeoutError


@contextlib.contextmanager
def timeout(t: Optional[int]):
if t is not None:
signal.signal(signal.SIGALRM, _raise_timeout)
signal.alarm(t)
reached_timeout = False
try:
yield
except JobTimeoutError:
reached_timeout = True
finally:
signal.signal(signal.SIGALRM, signal.SIG_IGN)
if reached_timeout:
raise TimeoutError
Loading

0 comments on commit b7a558d

Please sign in to comment.