-
Notifications
You must be signed in to change notification settings - Fork 249
/
app.py
27 lines (22 loc) · 807 Bytes
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from celery import Celery, Task
from flask import Flask
def celery_init_app(app: Flask) -> Celery:
class FlaskTask(Task):
def __call__(self, *args: object, **kwargs: object) -> object:
with app.app_context():
return self.run(*args, **kwargs)
celery_app = Celery(app.name, task_cls=FlaskTask)
celery_app.config_from_object(app.config["CELERY"])
celery_app.set_default()
app.extensions["celery"] = celery_app
return celery_app
if __name__ == '__main__':
app = Flask(__name__)
app.config.from_mapping(
CELERY=dict(
broker_url="db+sqlite:///celeryresults.sqlite3",
result_backend="sqlite:///celeryresults.sqlite3",
task_ignore_result=True,
),
)
celery_app = celery_init_app(app)