Skip to content

Commit

Permalink
Merge pull request #234 from ds-wizard/hotfix/4.10.5
Browse files Browse the repository at this point in the history
Hotfix 4.10.5 (develop)
  • Loading branch information
MarekSuchanek committed Sep 13, 2024
2 parents b567f81 + 9b42d92 commit 47967e9
Show file tree
Hide file tree
Showing 38 changed files with 254 additions and 107 deletions.
7 changes: 7 additions & 0 deletions packages/dsw-command-queue/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]


## [4.10.5]

### Fixed

- Fixed command queue job timeout (moved support from workers to queue, reliable timeout approach)

## [4.10.4]

Released for version consistency with other DSW tools.
Expand Down Expand Up @@ -256,3 +262,4 @@ Released for version consistency with other DSW tools.
[4.10.2]: /../../tree/v4.10.2
[4.10.3]: /../../tree/v4.10.3
[4.10.4]: /../../tree/v4.10.4
[4.10.5]: /../../tree/v4.10.5
55 changes: 47 additions & 8 deletions packages/dsw-command-queue/dsw/command_queue/command_queue.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import abc
import datetime
import func_timeout
import logging
import os
import platform
Expand Down Expand Up @@ -45,21 +46,26 @@ class CommandWorker:
def work(self, payload: PersistentCommand):
pass

def process_exception(self, e: Exception):
def process_timeout(self, e: BaseException):
pass

def process_exception(self, e: BaseException):
pass


class CommandQueue:

def __init__(self, worker: CommandWorker, db: Database,
channel: str, component: str, timeout: float):
channel: str, component: str, wait_timeout: float,
work_timeout: int | None = None):
self.worker = worker
self.db = db
self.queries = CommandQueries(
channel=channel,
component=component
)
self.timeout = timeout
self.wait_timeout = wait_timeout
self.work_timeout = work_timeout

@tenacity.retry(
reraise=True,
Expand All @@ -84,15 +90,15 @@ def run(self):
self._fetch_and_process_queued()

LOG.debug('Waiting for notifications')
w = select.select(fds, [], [], self.timeout)
w = select.select(fds, [], [], self.wait_timeout)

if INTERRUPTED:
LOG.debug('Interrupt signal received, ending...')
break

if w == ([], [], []):
LOG.debug(f'Nothing received in this cycle '
f'(timeouted after {self.timeout} seconds)')
f'(timeouted after {self.wait_timeout} seconds)')
else:
notifications = 0
for n in psycopg.generators.notifies(queue_conn.connection.pgconn):
Expand Down Expand Up @@ -141,13 +147,46 @@ def fetch_and_process(self) -> bool:
LOG.debug(f'Previous state: {command.state}')
LOG.debug(f'Attempts: {command.attempts} / {command.max_attempts}')
LOG.debug(f'Last error: {command.last_error_message}')
attempt_number = command.attempts + 1

try:
self.worker.work(command)
self.db.execute_query(
query=self.queries.query_command_start(),
attempts=attempt_number,
updated_at=datetime.datetime.now(tz=datetime.UTC),
uuid=command.uuid,
)
self.db.conn_query.connection.commit()

def work():
self.worker.work(command)

if self.work_timeout is None:
LOG.info('Processing (without any timeout set)')
work()
else:
LOG.info(f'Processing (with timeout set to {self.work_timeout} seconds)')
func_timeout.func_timeout(
timeout=self.work_timeout,
func=work,
args=(),
kwargs=None,
)

self.db.execute_query(
query=self.queries.query_command_done(),
attempts=command.attempts + 1,
attempts=attempt_number,
updated_at=datetime.datetime.now(tz=datetime.UTC),
uuid=command.uuid,
)
except func_timeout.exceptions.FunctionTimedOut as e:
msg = f'Processing exceeded time limit ({self.work_timeout} seconds)'
LOG.warning(msg)
self.worker.process_timeout(e)
self.db.execute_query(
query=self.queries.query_command_error(),
attempts=attempt_number,
error_message=msg,
updated_at=datetime.datetime.now(tz=datetime.UTC),
uuid=command.uuid,
)
Expand All @@ -157,7 +196,7 @@ def fetch_and_process(self) -> bool:
self.worker.process_exception(e)
self.db.execute_query(
query=self.queries.query_command_error(),
attempts=command.attempts + 1,
attempts=attempt_number,
error_message=msg,
updated_at=datetime.datetime.now(tz=datetime.UTC),
uuid=command.uuid,
Expand Down
9 changes: 9 additions & 0 deletions packages/dsw-command-queue/dsw/command_queue/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,12 @@ def query_command_done() -> str:
updated_at = %(updated_at)s
WHERE uuid = %(uuid)s;
"""

@staticmethod
def query_command_start() -> str:
return """
UPDATE persistent_command
SET attempts = %(attempts)s,
updated_at = %(updated_at)s
WHERE uuid = %(uuid)s;
"""
5 changes: 3 additions & 2 deletions packages/dsw-command-queue/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = 'setuptools.build_meta'

[project]
name = 'dsw-command-queue'
version = "4.10.4"
version = "4.10.5"
description = 'Library for working with command queue and persistent commands'
readme = 'README.md'
keywords = ['dsw', 'subscriber', 'publisher', 'database', 'queue', 'processing']
Expand All @@ -24,8 +24,9 @@ classifiers = [
]
requires-python = '>=3.10, <4'
dependencies = [
'func-timeout',
# DSW
"dsw-database==4.10.4",
"dsw-database==4.10.5",
]

[project.urls]
Expand Down
1 change: 1 addition & 0 deletions packages/dsw-command-queue/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
func_timeout==4.3.5
psycopg==3.2.1
psycopg-binary==3.2.1
PyYAML==6.0.2
Expand Down
5 changes: 5 additions & 0 deletions packages/dsw-config/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]


## [4.10.5]

Released for version consistency with other DSW tools.

## [4.10.4]

Released for version consistency with other DSW tools.
Expand Down Expand Up @@ -268,3 +272,4 @@ Released for version consistency with other DSW tools.
[4.10.2]: /../../tree/v4.10.2
[4.10.3]: /../../tree/v4.10.3
[4.10.4]: /../../tree/v4.10.4
[4.10.5]: /../../tree/v4.10.5
5 changes: 3 additions & 2 deletions packages/dsw-config/dsw/config/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ def __init__(self, enabled: bool, workers_dsn: Optional[str],

class DatabaseConfig(ConfigModel):

def __init__(self, connection_string: str, connection_timeout: int, queue_timout: int):
def __init__(self, connection_string: str, connection_timeout: int,
queue_timeout: int):
self.connection_string = connection_string
self.connection_timeout = connection_timeout
self.queue_timout = queue_timout
self.queue_timeout = queue_timeout


class S3Config(ConfigModel):
Expand Down
2 changes: 1 addition & 1 deletion packages/dsw-config/dsw/config/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def db(self) -> DatabaseConfig:
return DatabaseConfig(
connection_string=self.get(self.keys.database.connection_string),
connection_timeout=self.get(self.keys.database.connection_timeout),
queue_timout=self.get(self.keys.database.queue_timeout),
queue_timeout=self.get(self.keys.database.queue_timeout),
)

@property
Expand Down
2 changes: 1 addition & 1 deletion packages/dsw-config/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = 'setuptools.build_meta'

[project]
name = 'dsw-config'
version = "4.10.4"
version = "4.10.5"
description = 'Library for DSW config manipulation'
readme = 'README.md'
keywords = ['dsw', 'config', 'yaml', 'parser']
Expand Down
7 changes: 7 additions & 0 deletions packages/dsw-data-seeder/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]


## [4.10.5]

### Fixed

- Fixed command queue job timeout (moved support from workers to queue, reliable timeout approach)

## [4.10.4]

Released for version consistency with other DSW tools.
Expand Down Expand Up @@ -314,3 +320,4 @@ Released for version consistency with other DSW tools.
[4.10.2]: /../../tree/v4.10.2
[4.10.3]: /../../tree/v4.10.3
[4.10.4]: /../../tree/v4.10.4
[4.10.5]: /../../tree/v4.10.5
39 changes: 36 additions & 3 deletions packages/dsw-data-seeder/dsw/data_seeder/config.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,42 @@
from dsw.config import DSWConfigParser
from dsw.config.keys import ConfigKey, cast_str, cast_int
from dsw.config.model import DatabaseConfig, S3Config, \
from dsw.config.keys import ConfigKey, ConfigKeys, ConfigKeysContainer, \
cast_str, cast_int, cast_optional_int
from dsw.config.model import ConfigModel, DatabaseConfig, S3Config, \
LoggingConfig, SentryConfig, CloudConfig, GeneralConfig


class _ExperimentalKeys(ConfigKeysContainer):
job_timeout = ConfigKey(
yaml_path=['experimental', 'jobTimeout'],
var_names=['EXPERIMENTAL_JOB_TIMEOUT'],
default=None,
cast=cast_optional_int,
)


class MailerConfigKeys(ConfigKeys):
experimental = _ExperimentalKeys


class ExperimentalConfig(ConfigModel):

def __init__(self, job_timeout: int | None):
self.job_timeout = job_timeout


class SeederConfig:

def __init__(self, db: DatabaseConfig, s3: S3Config, log: LoggingConfig,
sentry: SentryConfig, cloud: CloudConfig, general: GeneralConfig,
extra_dbs: dict[str, DatabaseConfig]):
extra_dbs: dict[str, DatabaseConfig], experimental: ExperimentalConfig):
self.general = general
self.db = db
self.s3 = s3
self.log = log
self.sentry = sentry
self.cloud = cloud
self.extra_dbs = extra_dbs
self.experimental = experimental

def __str__(self):
return f'SeederConfig\n' \
Expand All @@ -26,11 +47,16 @@ def __str__(self):
f'{self.log}' \
f'{self.sentry}' \
f'{self.cloud}' \
f'{self.experimental}' \
f'====================\n'


class SeederConfigParser(DSWConfigParser):

def __init__(self):
super().__init__(keys=MailerConfigKeys)
self.keys = MailerConfigKeys # type: type[MailerConfigKeys]

@property
def extra_dbs(self) -> dict[str, DatabaseConfig]:
result = {}
Expand All @@ -57,6 +83,12 @@ def extra_dbs(self) -> dict[str, DatabaseConfig]:

return result

@property
def experimental(self) -> ExperimentalConfig:
return ExperimentalConfig(
job_timeout=self.get(self.keys.experimental.job_timeout),
)

@property
def config(self) -> SeederConfig:
return SeederConfig(
Expand All @@ -67,4 +99,5 @@ def config(self) -> SeederConfig:
cloud=self.cloud,
general=self.general,
extra_dbs=self.extra_dbs,
experimental=self.experimental,
)
2 changes: 1 addition & 1 deletion packages/dsw-data-seeder/dsw/data_seeder/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
DEFAULT_PLACEHOLDER = '<<|TENANT-ID|>>'
NULL_UUID = '00000000-0000-0000-0000-000000000000'
PROG_NAME = 'dsw-data-seeder'
VERSION = '4.10.4'
VERSION = '4.10.5'

VAR_APP_CONFIG_PATH = 'APPLICATION_CONFIG_PATH'
VAR_WORKDIR_PATH = 'WORKDIR_PATH'
Expand Down
11 changes: 10 additions & 1 deletion packages/dsw-data-seeder/dsw/data_seeder/seeder.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ def _run_preparation(self, recipe_name: str) -> CommandQueue:
db=Context.get().app.db,
channel=CMD_CHANNEL,
component=CMD_COMPONENT,
timeout=Context.get().app.cfg.db.queue_timout,
wait_timeout=Context.get().app.cfg.db.queue_timeout,
work_timeout=Context.get().app.cfg.experimental.job_timeout,
)
return queue

Expand Down Expand Up @@ -288,6 +289,14 @@ def work(self, cmd: PersistentCommand):
Context.get().update_trace_id('-')
SentryReporter.set_context('cmd_uuid', '-')

def process_timeout(self, e: BaseException):
LOG.info('Failed with timeout')
SentryReporter.capture_exception(e)

def process_exception(self, e: BaseException):
LOG.info('Failed with unexpected error', exc_info=e)
SentryReporter.capture_exception(e)

@staticmethod
def _update_component_info():
built_at = dateutil.parser.parse(BUILD_INFO.built_at)
Expand Down
10 changes: 5 additions & 5 deletions packages/dsw-data-seeder/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = 'setuptools.build_meta'

[project]
name = 'dsw-data-seeder'
version = "4.10.4"
version = "4.10.5"
description = 'Worker for seeding DSW data'
readme = 'README.md'
keywords = ['data', 'database', 'seed', 'storage']
Expand All @@ -29,10 +29,10 @@ dependencies = [
'sentry-sdk',
'tenacity',
# DSW
"dsw-command-queue==4.10.4",
"dsw-config==4.10.4",
"dsw-database==4.10.4",
"dsw-storage==4.10.4",
"dsw-command-queue==4.10.5",
"dsw-config==4.10.5",
"dsw-database==4.10.5",
"dsw-storage==4.10.5",
]

[project.urls]
Expand Down
1 change: 1 addition & 0 deletions packages/dsw-data-seeder/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
certifi==2024.7.4
click==8.1.7
func_timeout==4.3.5
minio==7.2.8
psycopg==3.2.1
psycopg-binary==3.2.1
Expand Down
5 changes: 5 additions & 0 deletions packages/dsw-database/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]


## [4.10.5]

Released for version consistency with other DSW tools.

## [4.10.4]

Released for version consistency with other DSW tools.
Expand Down Expand Up @@ -279,3 +283,4 @@ Released for version consistency with other DSW tools.
[4.10.2]: /../../tree/v4.10.2
[4.10.3]: /../../tree/v4.10.3
[4.10.4]: /../../tree/v4.10.4
[4.10.5]: /../../tree/v4.10.5
Loading

0 comments on commit 47967e9

Please sign in to comment.