Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler rate limit functionality #1611

Draft
wants to merge 51 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
3248e35
Start with rate limit implementation
jpbruinsslot Jul 12, 2023
7444ef2
Start with rate limitting implementation
jpbruinsslot Jul 17, 2023
a6f5d62
Formatting
jpbruinsslot Jul 18, 2023
c6fa041
Start writing tests
jpbruinsslot Jul 18, 2023
a9aac16
Rename to delayed tasks, update tests
jpbruinsslot Jul 18, 2023
10a7f9f
Leverage tasks instead of in memory datastructure to store delayed tasks
jpbruinsslot Jul 19, 2023
0b552a0
Fix consuming of the rate limiter
jpbruinsslot Jul 19, 2023
fda6991
Add optional consume parameter
jpbruinsslot Jul 19, 2023
8ec4416
Fix rate limit tests
jpbruinsslot Jul 19, 2023
f5a71ea
Fix formatting and type hint suggestions
jpbruinsslot Jul 20, 2023
90786b9
Debugging and writing tests for edge cases
jpbruinsslot Jul 20, 2023
2c4a1c2
Add order_by argument for task store
jpbruinsslot Jul 24, 2023
46c1e6e
Add migration
jpbruinsslot Jul 24, 2023
83e8f30
Fix migration
jpbruinsslot Jul 24, 2023
f70c8fb
Merge branch 'main' into feature/mula/rate-limit
jpbruinsslot Jul 24, 2023
7f31f24
Fix tests
jpbruinsslot Jul 24, 2023
7b162ac
Update requirements.txt
jpbruinsslot Jul 24, 2023
8485870
Update tests timeout
jpbruinsslot Jul 24, 2023
fc17de7
Finaly made the tests work
jpbruinsslot Jul 24, 2023
d659a16
Fixes for precommit
jpbruinsslot Jul 24, 2023
81223a8
Update tests
jpbruinsslot Jul 24, 2023
fe1197d
Implement lock
jpbruinsslot Jul 24, 2023
1362292
Remove comment
jpbruinsslot Jul 25, 2023
f92e3d3
Adding valueerror tests
jpbruinsslot Jul 25, 2023
3077aa6
Fix pre-commit suggestions
jpbruinsslot Jul 25, 2023
880fca8
Merge branch 'main' into feature/mula/rate-limit
jpbruinsslot Jul 25, 2023
56daa2e
Merge branch 'main' into feature/mula/rate-limit
jpbruinsslot Jul 27, 2023
3496344
Merge branch 'main' into feature/mula/rate-limit
jpbruinsslot Jul 31, 2023
2829e86
Implement and update rate limiting attributes from katalogus
jpbruinsslot Jul 31, 2023
443c31e
Merge branch 'main' into feature/mula/rate-limit
jpbruinsslot Aug 1, 2023
cae249d
Formatting
jpbruinsslot Aug 1, 2023
42b6b12
Added rate limit field in boefje model
ammar92 Aug 1, 2023
4194078
Added rate limit in boefje scheduler model and identifier template re…
ammar92 Aug 1, 2023
b6a7d31
A bit of refactoring
ammar92 Aug 1, 2023
0dd56d7
Fixed errors
ammar92 Aug 1, 2023
ad72318
Making tests work
jpbruinsslot Aug 1, 2023
cce498e
Make tests work
jpbruinsslot Aug 1, 2023
9bb95ac
Merge branch 'main' into feature/mula/rate-limit
jpbruinsslot Aug 2, 2023
ab7d4d1
Mypy
jpbruinsslot Aug 2, 2023
244ded1
Merge branch 'main' into feature/mula/rate-limit
jpbruinsslot Aug 7, 2023
0639733
Merge branch 'main' into feature/mula/rate-limit
jpbruinsslot Aug 8, 2023
4d39b20
Merge branch 'main' into feature/mula/rate-limit
jpbruinsslot Aug 8, 2023
1dd2c00
Merge branch 'main' into feature/mula/rate-limit-2
jpbruinsslot Aug 14, 2023
4f12d5f
Add organisational rate limiting support
jpbruinsslot Aug 14, 2023
10984c0
Add test for organisation rate limiting
jpbruinsslot Aug 14, 2023
a85001d
Merge branch 'main' into feature/mula/rate-limit-2
jpbruinsslot Aug 15, 2023
92a1e97
Trying to fix tests
jpbruinsslot Aug 15, 2023
6a2c6d3
Merge branch 'main' into feature/mula/rate-limit-2
jpbruinsslot Aug 22, 2023
b51ec09
Merge branch 'main' into feature/mula/rate-limit-2
jpbruinsslot Aug 23, 2023
9586009
Merge branch 'main' into feature/mula/rate-limit-2
jpbruinsslot Aug 23, 2023
eebb018
Merge branch 'main' into feature/mula/rate-limit-2
jpbruinsslot Sep 4, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion boefjes/boefjes/katalogus/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from enum import Enum
from typing import List, Literal, NewType, Optional, Set, Union

from pydantic import AnyHttpUrl, BaseModel, Field
from pydantic import AnyHttpUrl, BaseModel, Field, constr

RESERVED_LOCAL_ID = "LOCAL"

Expand All @@ -18,6 +18,11 @@ class Organisation(BaseModel):
name: str


class RateLimit(BaseModel):
interval: str
identifier: str = constr(min_length=1)


class Plugin(BaseModel):
id: str
repository_id: str = RESERVED_LOCAL_ID
Expand All @@ -41,6 +46,7 @@ class Boefje(Plugin):
produces: List[str] = Field(default_factory=list)
options: Optional[List[str]]
runnable_hash: Optional[str]
rate_limit: Optional[RateLimit]


class Normalizer(Plugin):
Expand Down
1,024 changes: 528 additions & 496 deletions mula/poetry.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions mula/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ license = "EUPL"
python = "^3.8"
alembic = "^1.8.1"
celery = "^5.2.7"
limits = "^3.5.0"
fastapi = "^0.101.0"
mmh3 = "^4.0.0"
pika = "^1.2.0"
Expand All @@ -28,6 +29,7 @@ opentelemetry-instrumentation-fastapi = "^0.40b0"
opentelemetry-instrumentation-psycopg2 = "^0.40b0"
opentelemetry-instrumentation-requests = "^0.40b0"
certifi = "^2023.7.22"
jinja2 = "^3.1.2"

[tool.poetry.group.dev.dependencies]
factory_boy = "^3.2.1"
Expand Down
933 changes: 462 additions & 471 deletions mula/requirements-dev.txt

Large diffs are not rendered by default.

670 changes: 337 additions & 333 deletions mula/requirements.txt

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""Add DELAYED status for tasks

Revision ID: 0008
Revises: 0007_add_cancelled_status_for_tasks
Create Date: 2023-07-24 12:22:42.329666

"""
from alembic import op

# revision identifiers, used by Alembic.
revision = "0008"
down_revision = "0007"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.execute("ALTER TYPE taskstatus ADD VALUE 'DELAYED'")
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.execute("ALTER TYPE taskstatus DROP VALUE 'DELAYED'")
# ### end Alembic commands ###
6 changes: 6 additions & 0 deletions mula/scheduler/context/context.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import json
import logging.config
import threading
from pathlib import Path
from types import SimpleNamespace

from limits import storage, strategies
from prometheus_client import CollectorRegistry, Gauge, Info

import scheduler
Expand Down Expand Up @@ -79,6 +81,10 @@ def __init__(self) -> None:
self.task_store: stores.TaskStorer = sqlalchemy.TaskStore(datastore)
self.pq_store: stores.PriorityQueueStorer = sqlalchemy.PriorityQueueStore(datastore)

# Rate limiter
self.rate_limiter = strategies.MovingWindowRateLimiter(storage=storage.MemoryStorage())
self.rate_limiter_lock = threading.Lock()

# Metrics collector registry
self.metrics_registry: CollectorRegistry = CollectorRegistry()

Expand Down
2 changes: 1 addition & 1 deletion mula/scheduler/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .base import Base
from .boefje import Boefje, BoefjeMeta
from .boefje import Boefje, BoefjeMeta, RateLimit
from .events import NormalizerMetaReceivedEvent, RawData, RawDataReceivedEvent
from .filter import Filter
from .health import ServiceHealth
Expand Down
8 changes: 7 additions & 1 deletion mula/scheduler/models/boefje.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@
from pydantic import BaseModel, Field


class RateLimit(BaseModel):
identifier: str
interval: str


class Boefje(BaseModel):
"""Boefje representation."""

id: str
version: Optional[str] = Field(default=None)
version: Optional[str]
rate_limit: Optional[RateLimit]


class BoefjeMeta(BaseModel):
Expand Down
1 change: 1 addition & 0 deletions mula/scheduler/models/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class TaskStatus(str, enum.Enum):
"""Status of a task."""

PENDING = "pending"
DELAYED = "delayed"
QUEUED = "queued"
DISPATCHED = "dispatched"
RUNNING = "running"
Expand Down
14 changes: 14 additions & 0 deletions mula/scheduler/repositories/sqlalchemy/task_store.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import datetime
from typing import List, Optional, Tuple

from sqlalchemy import asc, desc

from scheduler import models

from ..stores import TaskStorer # noqa: TID252
Expand All @@ -27,6 +29,7 @@ def get_tasks(
status: Optional[str] = None,
min_created_at: Optional[datetime.datetime] = None,
max_created_at: Optional[datetime.datetime] = None,
order_by: Optional[str] = None,
filters: Optional[List[models.Filter]] = None,
) -> Tuple[List[models.Task], int]:
with self.datastore.session.begin() as session:
Expand All @@ -49,6 +52,12 @@ def get_tasks(
if max_created_at is not None:
query = query.filter(models.TaskORM.created_at <= max_created_at)

if order_by is not None:
if order_by.startswith("-"):
query = query.order_by(desc(order_by[1:]))
else:
query = query.order_by(asc(order_by))

if filters is not None:
for f in filters:
query.filter(models.TaskORM.p_item[f.get_field()].astext == f.value)
Expand Down Expand Up @@ -120,6 +129,11 @@ def update_task(self, task: models.Task) -> None:
with self.datastore.session.begin() as session:
(session.query(models.TaskORM).filter(models.TaskORM.id == task.id).update(task.dict()))

@retry()
def update_task_status(self, task_id: str, status: models.TaskStatus) -> None:
with self.datastore.session.begin() as session:
session.query(models.TaskORM).filter(models.TaskORM.id == task_id).update({"status": status.name})

@retry()
def cancel_tasks(self, scheduler_id: str, task_ids: List[str]) -> None:
with self.datastore.session.begin() as session:
Expand Down
4 changes: 4 additions & 0 deletions mula/scheduler/repositories/stores.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def get_tasks(
status: Optional[str] = None,
min_created_at: Optional[datetime.datetime] = None,
max_created_at: Optional[datetime.datetime] = None,
order_by: Optional[str] = None,
filters: Optional[List[models.Filter]] = None,
):
raise NotImplementedError
Expand All @@ -47,6 +48,9 @@ def create_task(self, task: models.Task) -> Optional[models.Task]:
def update_task(self, task: models.Task) -> Optional[models.Task]:
raise NotImplementedError

def update_task_status(self, task_id: str, status: models.TaskStatus) -> None:
raise NotImplementedError

def cancel_tasks(self, scheduler_id: str, task_ids: List[str]) -> None:
raise NotImplementedError

Expand Down
Loading
Loading