Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to deal with priorities? #207

Open
anne-mrl opened this issue Sep 3, 2024 · 0 comments
Open

How to deal with priorities? #207

anne-mrl opened this issue Sep 3, 2024 · 0 comments

Comments

@anne-mrl
Copy link

anne-mrl commented Sep 3, 2024

Hello,

Is it possible to implement celery priorities options with celery-director?
Redis or RabbitMQ broker, it doesn't matter, I tried with both but I failed to set them up.

docker-compose.yml file:

services:

  worker:
    image: myapp
    container_name: worker-myapp
    restart: always
    volumes:
      - ./config/director.env:/opt/director/workflows/.env
      - ./config/workflows.yml:/opt/director/workflows/workflows.yml
      - ./config/config.py:/opt/director/workflows/drivers/config.py
    command: director celery worker --hostname test@%h --concurrency=4 --queues highpriority:9,lowpriority:3, --loglevel=DEBUG

Note:

  • App is currently well running since a lot of months, configuration is OK without priorities setup
  • 'MY_TASK' is defined in workflows.yml with lowpriority queue.
  • Also tried with lowpriority:3: failed result
  • Also tried to set queues priority in workflows.yml using priority field: failed result

Redis (based on https://docs.celeryq.dev/en/latest/userguide/routing.html#redis-message-priorities):

1. As broker_transport_options conf is hard coded in director.settings, I built new celery-director python package with following custom CELERY_CONF:

        # Celery configuration
        self.CELERY_CONF = {
            "task_always_eager": False,
            "broker_url": env.str("DIRECTOR_BROKER_URI", "redis://localhost:6379/0"),
            "result_backend": env.str(
                "DIRECTOR_RESULT_BACKEND_URI", "redis://localhost:6379/1"
            ),
            "broker_transport_options": {
                "master_name": "director",
                "priority_steps": [0, 3, 6, 9],
                "sep": ":",
                "queue_order_strategy": "priority"
            },
        },

I faced up error when building my newer project docker image:

 => ERROR [10/10] RUN director dlassets                                                                                                                                                                                  2.2s 
------                                                                                                                                                                                                                        
 > [10/10] RUN director dlassets:                                                                                                                                                                                             
2.061 Traceback (most recent call last):                                                                                                                                                                                      
2.061   File "/usr/local/bin/director", line 8, in <module>                                                                                                                                                                   
2.061     sys.exit(cli())
2.061   File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1157, in __call__
2.062     return self.main(*args, **kwargs)
2.062   File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1078, in main
2.062     rv = self.invoke(ctx)
2.062   File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1688, in invoke
2.062     return _process_result(sub_ctx.command.invoke(sub_ctx))
2.062   File "/usr/local/lib/python3.10/site-packages/click/core.py", line 1434, in invoke
2.063     return ctx.invoke(self.callback, **ctx.params)
2.063   File "/usr/local/lib/python3.10/site-packages/click/core.py", line 783, in invoke
2.063     return __callback(*args, **kwargs)
2.063   File "/usr/local/lib/python3.10/site-packages/click/decorators.py", line 81, in new_func
2.063     obj = ctx.ensure_object(object_type)
2.063   File "/usr/local/lib/python3.10/site-packages/click/core.py", line 643, in ensure_object
2.063     self.obj = rv = object_type()
2.063   File "/usr/local/lib/python3.10/site-packages/director/context.py", line 10, in __init__
2.063     self.app = create_app()
2.063   File "/usr/local/lib/python3.10/site-packages/director/__init__.py", line 80, in create_app
2.063     cel.init_app(app)
2.063   File "/usr/local/lib/python3.10/site-packages/director/extensions.py", line 102, in init_app
2.064     self.conf.update(app.config.get("CELERY_CONF", {}))
2.064   File "/usr/local/lib/python3.10/site-packages/celery/utils/collections.py", line 302, in update
2.064     result = self.changes.update(*args, **kwargs)
2.064   File "/usr/local/lib/python3.10/site-packages/celery/app/base.py", line 119, in update
2.064     self._data.update(*args, **kwargs)
2.064 ValueError: dictionary update sequence element #0 has length 4; 2 is required
------
Dockerfile:40
--------------------
  38 |     RUN mkdir -p ./workflows/static
  39 |     RUN chown director:director workflows/static
  40 | >>> RUN director dlassets
  41 |     
  42 |     USER director
--------------------
ERROR: failed to solve: process "/bin/sh -c director dlassets" did not complete successfully: exit code: 1

2. Also tried to implement custom conf using .env file:

DIRECTOR_BROKER_TRANSPORT_OPTIONS={"master_name":"director","priority_steps":[0, 3, 6, 9],"sep":":","queue_order_strategy":"priority"}

and

DIRECTOR_broker_transport_options={"master_name":"director","priority_steps":[0, 3, 6, 9],"sep":":","queue_order_strategy":"priority"}

But it does not overwrite hard coded broker_transport_options:

from celery import current_app
from director import task

@task(bind=True, name="MY_TASK", autoretry_for=(Exception,), retry_backoff=10, retry_kwargs={'max_retries': 3})
def my_task(self, *args, **kwargs):

    # Get payload info
    payload = kwargs["payload"]
    filename = payload["filename"]

    # Initialize context
    context = {
        "filename": filename,
    }

    # Access current Celery app's configuration
    app = current_app
    # Print broker transport options
    print(app.conf.broker_transport_options)
    # Print request info
    print(f"delivery_info={self.request.delivery_info},\nproperties={self.request.properties}")

    return context

Result:

[2024-09-03 13:37:10,831: WARNING/ForkPoolWorker-4] {'master_name': 'director'}
[2024-09-03 13:37:10,831: WARNING/ForkPoolWorker-4] delivery_info={'exchange': '', 'routing_key': 'lowpriority', 'priority': 0, 'redelivered': None},
properties={'correlation_id': '2021a758-19a4-4cf6-b94d-708688d816d1', 'reply_to': 'b2a244cc-6186-3317-bd7d-107d40a188d8', 'delivery_mode': 2, 'delivery_info': {'exchange': '', 'routing_key': 'lowpriority'}, 'priority': 0, 'body_encoding': 'base64', 'delivery_tag': '4430265b-606c-481c-9ac5-de55bab3f569'}

3. Same result as before when I tried to implement custom DIRECTOR_BROKER_TRANSPORT_OPTIONS conf with ENV in Dockerfile

4. Same result as before when I tried to dynamically set custom conf using celery apply_async()
Updated task:

from celery import current_app
from director import task

@task(bind=True, name="MY_TASK", autoretry_for=(Exception,), retry_backoff=10, retry_kwargs={'max_retries': 3})
def my_task(self, *args, **kwargs):

    # Get payload info
    payload = kwargs["payload"]
    filename = payload["filename"]

    # Dynamically set custom conf
    self.apply_async(routing_key="highpriority", priority=9)

    # Initialize context
    context = {
        "filename": filename,
    }

    # Access current Celery app's configuration
    app = current_app
    # Print broker transport options
    print(app.conf.broker_transport_options)
    # Print request info
    print(f"delivery_info={self.request.delivery_info},\nproperties={self.request.properties}")

    return context

RabbitMQ (based on https://docs.celeryq.dev/en/latest/userguide/routing.html#rabbitmq-message-priorities):

5. Tried to implement custom conf using .env file:

DIRECTOR_QUEUES=lowpriority,highpriority
DIRECTOR_BROKER_TRANSPORT_OPTIONS={"priority_steps":[0,3,6,9]}
DIRECTOR_QUEUE_PRIORITIES=lowpriority:3,highpriority:9

But as previous results, it does not overwrite hard coded broker_transport_options

Any help would be appreciate,
Many thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant