Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upon connection error the ongoing tasks are not being marked as processed #8541

Open
14 of 18 tasks
acangiani opened this issue Sep 27, 2023 · 2 comments
Open
14 of 18 tasks

Comments

@acangiani
Copy link

acangiani commented Sep 27, 2023

Checklist

  • I have verified that the issue exists against the main branch of Celery.
  • This has already been asked to the discussions forum first.
  • I have read the relevant section in the
    contribution guide
    on reporting bugs.
  • I have checked the issues list
    for similar or identical bug reports.
  • I have checked the pull requests list
    for existing proposed fixes.
  • I have checked the commit log
    to find out if the bug was already fixed in the main branch.
  • I have included all related issues and possible duplicate issues
    in this issue (If there are none, check this box anyway).

Mandatory Debugging Information

  • I have included the output of celery -A proj report in the issue.
    (if you are not able to do this, then at least specify the Celery
    version affected).
  • I have verified that the issue exists against the main branch of Celery.
  • I have included the contents of pip freeze in the issue.
  • I have included all the versions of all the external dependencies required
    to reproduce this bug.

Optional Debugging Information

  • I have tried reproducing the issue on more than one Python version
    and/or implementation. Python 3.10 and 3.11.
  • I have tried reproducing the issue on more than one message broker and/or
    result backend.
  • I have tried reproducing the issue on more than one version of the message
    broker and/or result backend.
  • I have tried reproducing the issue on more than one operating system.
  • I have tried reproducing the issue on more than one workers pool. Same problem with prefork, event, processes and threads. It works with the solo pool.
  • I have tried reproducing the issue with autoscaling, retries,
    ETA/Countdown & rate limits disabled.
  • I have tried reproducing the issue after downgrading
    and/or upgrading Celery and its dependencies. Tested with Celery main, 5.3.4 and 5.3.1.

Related Issues and Possible Duplicates

Possible related Issues

These two seem somewhat related to the issue we are facing.

Possible Duplicates

  • None

Environment & Settings

Celery version: 5.3.4

celery report Output:

(.venv) /srv/sample # celery -A sample report

software -> celery:5.3.4 (emerald-rush) kombu:5.3.2 py:3.10.13
            billiard:4.1.0 py-amqp:5.1.1
platform -> system:Linux arch:64bit, ELF
            kernel version:6.3.13-linuxkit imp:CPython
loader   -> celery.loaders.app.AppLoader
settings -> transport:amqp results:disabled

ABSOLUTE_URL_OVERRIDES:
 }
ADMINS: []
ALLOWED_HOSTS: []
APPEND_SLASH: True
AUTHENTICATION_BACKENDS: ['django.contrib.auth.backends.ModelBackend']
AUTH_PASSWORD_VALIDATORS: '********'
AUTH_USER_MODEL: 'auth.User'
BASE_DIR: PosixPath('/srv/sample')
CACHES:
 'default': {'BACKEND': 'django.core.cache.backends.locmem.LocMemCache'}}
CACHE_MIDDLEWARE_ALIAS: 'default'
CACHE_MIDDLEWARE_KEY_PREFIX: '********'
CACHE_MIDDLEWARE_SECONDS: 600
CELERY_BROKER_URL: 'amqp://guest:********@rabbitmq:5672//'
CELERY_TASK_QUEUES: [<unbound Queue default_tasks -> <unbound Exchange ''(direct)> -> >]
CELERY_TASK_REJECT_ON_WORKER_LOST: True
CELERY_TASK_ROUTES:
 'sample.tasks.long_running_task': {'queue': 'default_tasks'}}
CELERY_TASK_TRACK_STARTED: True
CELERY_WORKER_PREFETCH_MULTIPLIER: 1
CSRF_COOKIE_AGE: 31449600
CSRF_COOKIE_DOMAIN: None
CSRF_COOKIE_HTTPONLY: False
CSRF_COOKIE_MASKED: False
CSRF_COOKIE_NAME: 'csrftoken'
CSRF_COOKIE_PATH: '/'
CSRF_COOKIE_SAMESITE: 'Lax'
CSRF_COOKIE_SECURE: False
CSRF_FAILURE_VIEW: 'django.views.csrf.csrf_failure'
CSRF_HEADER_NAME: 'HTTP_X_CSRFTOKEN'
CSRF_TRUSTED_ORIGINS: []
CSRF_USE_SESSIONS: False
DATABASES:
    'default': {   'ATOMIC_REQUESTS': False,
                   'AUTOCOMMIT': True,
                   'CONN_HEALTH_CHECKS': False,
                   'CONN_MAX_AGE': 0,
                   'ENGINE': 'django.db.backends.sqlite3',
                   'HOST': '',
                   'NAME': PosixPath('/srv/sample/db.sqlite3'),
                   'OPTIONS': {},
                   'PASSWORD': '********',
                   'PORT': '',
                   'TEST': {   'CHARSET': None,
                               'COLLATION': None,
                               'MIGRATE': True,
                               'MIRROR': None,
                               'NAME': None},
                   'TIME_ZONE': None,
                   'USER': ''}}
DATABASE_ROUTERS: '********'
DATA_UPLOAD_MAX_MEMORY_SIZE: 2621440
DATA_UPLOAD_MAX_NUMBER_FIELDS: 1000
DATA_UPLOAD_MAX_NUMBER_FILES: 100
DATETIME_FORMAT: 'N j, Y, P'
DATETIME_INPUT_FORMATS: ['%Y-%m-%d %H:%M:%S',
 '%Y-%m-%d %H:%M:%S.%f',
 '%Y-%m-%d %H:%M',
 '%m/%d/%Y %H:%M:%S',
 '%m/%d/%Y %H:%M:%S.%f',
 '%m/%d/%Y %H:%M',
 '%m/%d/%y %H:%M:%S',
 '%m/%d/%y %H:%M:%S.%f',
 '%m/%d/%y %H:%M']
DATE_FORMAT: 'N j, Y'
DATE_INPUT_FORMATS: ['%Y-%m-%d',
 '%m/%d/%Y',
 '%m/%d/%y',
 '%b %d %Y',
 '%b %d, %Y',
 '%d %b %Y',
 '%d %b, %Y',
 '%B %d %Y',
 '%B %d, %Y',
 '%d %B %Y',
 '%d %B, %Y']
DEBUG: True
DEBUG_PROPAGATE_EXCEPTIONS: False
DECIMAL_SEPARATOR: '.'
DEFAULT_AUTO_FIELD: 'django.db.models.BigAutoField'
DEFAULT_CHARSET: 'utf-8'
DEFAULT_EXCEPTION_REPORTER: 'django.views.debug.ExceptionReporter'
DEFAULT_EXCEPTION_REPORTER_FILTER: 'django.views.debug.SafeExceptionReporterFilter'
DEFAULT_FILE_STORAGE: 'django.core.files.storage.FileSystemStorage'
DEFAULT_FROM_EMAIL: 'webmaster@localhost'
DEFAULT_INDEX_TABLESPACE: ''
DEFAULT_TABLESPACE: ''
DISALLOWED_USER_AGENTS: []
EMAIL_BACKEND: 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_HOST: 'localhost'
EMAIL_HOST_PASSWORD: '********'
EMAIL_HOST_USER: ''
EMAIL_PORT: 25
EMAIL_SSL_CERTFILE: None
EMAIL_SSL_KEYFILE: '********'
EMAIL_SUBJECT_PREFIX: '[Django] '
EMAIL_TIMEOUT: None
EMAIL_USE_LOCALTIME: False
EMAIL_USE_SSL: False
EMAIL_USE_TLS: False
FILE_UPLOAD_DIRECTORY_PERMISSIONS: None
FILE_UPLOAD_HANDLERS: ['django.core.files.uploadhandler.MemoryFileUploadHandler',
 'django.core.files.uploadhandler.TemporaryFileUploadHandler']
FILE_UPLOAD_MAX_MEMORY_SIZE: 2621440
FILE_UPLOAD_PERMISSIONS: 420
FILE_UPLOAD_TEMP_DIR: None
FIRST_DAY_OF_WEEK: 0
FIXTURE_DIRS: []
FORCE_SCRIPT_NAME: None
FORMAT_MODULE_PATH: None
FORM_RENDERER: 'django.forms.renderers.DjangoTemplates'
IGNORABLE_404_URLS: []
INSTALLED_APPS: ['django.contrib.admin',
 'django.contrib.auth',
 'django.contrib.contenttypes',
 'django.contrib.sessions',
 'django.contrib.messages',
 'django.contrib.staticfiles']
INTERNAL_IPS: []
LANGUAGES: [('af', 'Afrikaans'),
 ('ar', 'Arabic'),
 ('ar-dz', 'Algerian Arabic'),
 ('ast', 'Asturian'),
 ('az', 'Azerbaijani'),
 ('bg', 'Bulgarian'),
 ('be', 'Belarusian'),
 ('bn', 'Bengali'),
 ('br', 'Breton'),
 ('bs', 'Bosnian'),
 ('ca', 'Catalan'),
 ('ckb', 'Central Kurdish (Sorani)'),
 ('cs', 'Czech'),
 ('cy', 'Welsh'),
 ('da', 'Danish'),
 ('de', 'German'),
 ('dsb', 'Lower Sorbian'),
 ('el', 'Greek'),
 ('en', 'English'),
 ('en-au', 'Australian English'),
 ('en-gb', 'British English'),
 ('eo', 'Esperanto'),
 ('es', 'Spanish'),
 ('es-ar', 'Argentinian Spanish'),
 ('es-co', 'Colombian Spanish'),
 ('es-mx', 'Mexican Spanish'),
 ('es-ni', 'Nicaraguan Spanish'),
 ('es-ve', 'Venezuelan Spanish'),
 ('et', 'Estonian'),
 ('eu', 'Basque'),
 ('fa', 'Persian'),
 ('fi', 'Finnish'),
 ('fr', 'French'),
 ('fy', 'Frisian'),
 ('ga', 'Irish'),
 ('gd', 'Scottish Gaelic'),
 ('gl', 'Galician'),
 ('he', 'Hebrew'),
 ('hi', 'Hindi'),
 ('hr', 'Croatian'),
 ('hsb', 'Upper Sorbian'),
 ('hu', 'Hungarian'),
 ('hy', 'Armenian'),
 ('ia', 'Interlingua'),
 ('id', 'Indonesian'),
 ('ig', 'Igbo'),
 ('io', 'Ido'),
 ('is', 'Icelandic'),
 ('it', 'Italian'),
 ('ja', 'Japanese'),
 ('ka', 'Georgian'),
 ('kab', 'Kabyle'),
 ('kk', 'Kazakh'),
 ('km', 'Khmer'),
 ('kn', 'Kannada'),
 ('ko', 'Korean'),
 ('ky', 'Kyrgyz'),
 ('lb', 'Luxembourgish'),
 ('lt', 'Lithuanian'),
 ('lv', 'Latvian'),
 ('mk', 'Macedonian'),
 ('ml', 'Malayalam'),
 ('mn', 'Mongolian'),
 ('mr', 'Marathi'),
 ('ms', 'Malay'),
 ('my', 'Burmese'),
 ('nb', 'Norwegian Bokmål'),
 ('ne', 'Nepali'),
 ('nl', 'Dutch'),
 ('nn', 'Norwegian Nynorsk'),
 ('os', 'Ossetic'),
 ('pa', 'Punjabi'),
 ('pl', 'Polish'),
 ('pt', 'Portuguese'),
 ('pt-br', 'Brazilian Portuguese'),
 ('ro', 'Romanian'),
 ('ru', 'Russian'),
 ('sk', 'Slovak'),
 ('sl', 'Slovenian'),
 ('sq', 'Albanian'),
 ('sr', 'Serbian'),
 ('sr-latn', 'Serbian Latin'),
 ('sv', 'Swedish'),
 ('sw', 'Swahili'),
 ('ta', 'Tamil'),
 ('te', 'Telugu'),
 ('tg', 'Tajik'),
 ('th', 'Thai'),
 ('tk', 'Turkmen'),
 ('tr', 'Turkish'),
 ('tt', 'Tatar'),
 ('udm', 'Udmurt'),
 ('uk', 'Ukrainian'),
 ('ur', 'Urdu'),
 ('uz', 'Uzbek'),
 ('vi', 'Vietnamese'),
 ('zh-hans', 'Simplified Chinese'),
 ('zh-hant', 'Traditional Chinese')]
LANGUAGES_BIDI: ['he', 'ar', 'ar-dz', 'ckb', 'fa', 'ur']
LANGUAGE_CODE: 'en-us'
LANGUAGE_COOKIE_AGE: None
LANGUAGE_COOKIE_DOMAIN: None
LANGUAGE_COOKIE_HTTPONLY: False
LANGUAGE_COOKIE_NAME: 'django_language'
LANGUAGE_COOKIE_PATH: '/'
LANGUAGE_COOKIE_SAMESITE: None
LANGUAGE_COOKIE_SECURE: False
LOCALE_PATHS: []
LOGGING:
 }
LOGGING_CONFIG: 'logging.config.dictConfig'
LOGIN_REDIRECT_URL: '/accounts/profile/'
LOGIN_URL: '/accounts/login/'
LOGOUT_REDIRECT_URL: None
MANAGERS: []
MEDIA_ROOT: ''
MEDIA_URL: '/'
MESSAGE_STORAGE: 'django.contrib.messages.storage.fallback.FallbackStorage'
MIDDLEWARE: ['django.middleware.security.SecurityMiddleware',
 'django.contrib.sessions.middleware.SessionMiddleware',
 'django.middleware.common.CommonMiddleware',
 'django.middleware.csrf.CsrfViewMiddleware',
 'django.contrib.auth.middleware.AuthenticationMiddleware',
 'django.contrib.messages.middleware.MessageMiddleware',
 'django.middleware.clickjacking.XFrameOptionsMiddleware']
MIGRATION_MODULES:
 }
MONTH_DAY_FORMAT: 'F j'
NUMBER_GROUPING: 0
PASSWORD_HASHERS: '********'
PASSWORD_RESET_TIMEOUT: '********'
PREPEND_WWW: False
ROOT_URLCONF: 'sample.urls'
SECRET_KEY: '********'
SECRET_KEY_FALLBACKS: '********'
SECURE_CONTENT_TYPE_NOSNIFF: True
SECURE_CROSS_ORIGIN_OPENER_POLICY: 'same-origin'
SECURE_HSTS_INCLUDE_SUBDOMAINS: False
SECURE_HSTS_PRELOAD: False
SECURE_HSTS_SECONDS: 0
SECURE_PROXY_SSL_HEADER: None
SECURE_REDIRECT_EXEMPT: []
SECURE_REFERRER_POLICY: 'same-origin'
SECURE_SSL_HOST: None
SECURE_SSL_REDIRECT: False
SERVER_EMAIL: 'root@localhost'
SESSION_CACHE_ALIAS: 'default'
SESSION_COOKIE_AGE: 1209600
SESSION_COOKIE_DOMAIN: None
SESSION_COOKIE_HTTPONLY: True
SESSION_COOKIE_NAME: 'sessionid'
SESSION_COOKIE_PATH: '/'
SESSION_COOKIE_SAMESITE: 'Lax'
SESSION_COOKIE_SECURE: False
SESSION_ENGINE: 'django.contrib.sessions.backends.db'
SESSION_EXPIRE_AT_BROWSER_CLOSE: False
SESSION_FILE_PATH: None
SESSION_SAVE_EVERY_REQUEST: False
SESSION_SERIALIZER: 'django.contrib.sessions.serializers.JSONSerializer'
SETTINGS_MODULE: 'sample.settings'
SHORT_DATETIME_FORMAT: 'm/d/Y P'
SHORT_DATE_FORMAT: 'm/d/Y'
SIGNING_BACKEND: 'django.core.signing.TimestampSigner'
SILENCED_SYSTEM_CHECKS: []
STATICFILES_DIRS: []
STATICFILES_FINDERS: ['django.contrib.staticfiles.finders.FileSystemFinder',
 'django.contrib.staticfiles.finders.AppDirectoriesFinder']
STATICFILES_STORAGE: 'django.contrib.staticfiles.storage.StaticFilesStorage'
STATIC_ROOT: None
STATIC_URL: '/static/'
STORAGES:
    'default': {'BACKEND': 'django.core.files.storage.FileSystemStorage'},
    'staticfiles': {   'BACKEND': 'django.contrib.staticfiles.storage.StaticFilesStorage'}}
TEMPLATES: [{'APP_DIRS': True,
  'BACKEND': 'django.template.backends.django.DjangoTemplates',
  'DIRS': [],
  'OPTIONS': {'context_processors': ['django.template.context_processors.debug',
                                     'django.template.context_processors.request',
                                     'django.contrib.auth.context_processors.auth',
                                     'django.contrib.messages.context_processors.messages']}}]
TEST_NON_SERIALIZED_APPS: []
TEST_RUNNER: 'django.test.runner.DiscoverRunner'
THOUSAND_SEPARATOR: ','
TIME_FORMAT: 'P'
TIME_INPUT_FORMATS: ['%H:%M:%S', '%H:%M:%S.%f', '%H:%M']
TIME_ZONE: 'UTC'
USE_DEPRECATED_PYTZ: False
USE_I18N: True
USE_L10N: True
USE_THOUSAND_SEPARATOR: False
USE_TZ: True
USE_X_FORWARDED_HOST: False
USE_X_FORWARDED_PORT: False
WSGI_APPLICATION: 'sample.wsgi.application'
X_FRAME_OPTIONS: 'DENY'
YEAR_MONTH_FORMAT: 'F Y'
is_overridden: <bound method Settings.is_overridden of <Settings "sample.settings">>
deprecated_settings: None

Steps to Reproduce

Required Dependencies

  • Minimal Python Version: N/A or Unknown
  • Minimal Celery Version: N/A or Unknown
  • Minimal Kombu Version: N/A or Unknown
  • Minimal Broker Version: N/A or Unknown
  • Minimal Result Backend Version: N/A or Unknown
  • Minimal OS and/or Kernel Version: Linux (Ubuntu) and macOS at least.
  • Minimal Broker Client Version: N/A or Unknown
  • Minimal Result Backend Client Version: N/A or Unknown

Python Packages

pip freeze Output:

> pip freeze
amqp==5.1.1
asgiref==3.7.2
billiard==4.1.0
celery==5.3.4
click==8.1.7
click-didyoumean==0.3.0
click-plugins==1.1.1
click-repl==0.3.0
Django==4.2.5
flower==2.0.1
humanize==4.8.0
kombu==5.3.2
prometheus-client==0.17.1
prompt-toolkit==3.0.39
python-dateutil==2.8.2
pytz==2023.3.post1
six==1.16.0
sqlparse==0.4.4
tornado==6.3.3
typing_extensions==4.8.0
tzdata==2023.3
vine==5.0.0
wcwidth==0.2.6

Other Dependencies

N/A

Minimally Reproducible Test Case

  1. Clone PoC repo.
  2. Start all of the components with docker compose up.
  3. Enqueue several instances of long_running_task by doing:
from sample.celery import long_running_task

long_running_task.delay()
  1. Restart RabbitMQ while one of the tasks is being processed with docker restart <RabbitMQ pod hash>.

Alternatively:

  1. Create a long-running task:
import time

@app.task()
def long_running_task():
    print("long running task: start")
    time.sleep(30)
    print("long running task: end")
    return "long running task: done"
  1. Run several instances of long_running_task.
  2. Restart RabbitMQ while of of those tasks is being processed.

Expected Behavior

  1. The long-running task should be successfully processed. Because we are not using CELERY_TASK_ACKS_LATE=True the pool worker should not be restarted or stopped.
  2. The worker should respect the max concurrency setting.
  3. The worker should consider tasks that started before the connection restart but were completed after as done and not as active.
  4. The worker main process should emit the task-succeeded event.

Actual Behavior

  1. The long-running task is successfully processed.
  2. The worker is not respecting the max concurrency setting. Informs 3 active tasks but concurrency is 2.
  3. The tasks that started before the connection restart but were completed after are considered as active.
  4. The worker main process is not emitting the task-succeeded event.

Example of the actual behaviour with a documented case:

  1. Settings of worker celery@ccd33a7f7638 (concurrency=2):

Screenshot 2023-09-27 at 18 45 49

  1. Ghost task long_running_task is considered as active with UUID 2cbdb8e8-aa29-45a1-aee1-e9afa244ea10 after the RabbitMQ restarted and the logs show the tasks succeded:

Screenshot 2023-09-27 at 18 27 13
(notice that concurrency is not being respected, active tasks = 3 !<= 2)

  1. Same condition from the worker summary:
    Screenshot 2023-09-27 at 18 27 27

  2. All of the tasks that were started and completed before or after the restart are correctly processed and accounted for:
    Screenshot 2023-09-27 at 18 27 37

  3. The task-succeeded event is never sent:
    Screenshot 2023-09-27 at 18 27 49

Summarized logs of task with UUID `2cbdb8e8-aa29-45a1-aee1-e9afa244ea10` that goes from `received` to `succeeded` with a `RabbitMQ` connection restart in between

sample-default-worker-1 | [2023-09-27 21:22:29,251: INFO/MainProcess] Task sample.celery.long_running_task[2cbdb8e8-aa29-45a1-aee1-e9afa244ea10] received
...
sample-default-worker-1 | [2023-09-27 21:23:12,642: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:@rabbitmq:5672//: [Errno 111] Connection refused.
...
sample-default-worker-1 | Trying again in 4.00 seconds... (2/100)
...
sample-default-worker-1 | [2023-09-27 21:23:22,689: INFO/MainProcess] Connected to amqp://guest:
@rabbitmq:5672//
...
sample-default-worker-1 | [2023-09-27 21:23:28,310: INFO/ForkPoolWorker-2] Task sample.celery.long_running_task[2cbdb8e8-aa29-45a1-aee1-e9afa244ea10] succeeded in 30.0159327230067s: 'long running task: done'

7.
Full logs of task with UUID `2cbdb8e8-aa29-45a1-aee1-e9afa244ea10` that goes from `received` to `succeeded` with a `RabbitMQ` connection restart in between

sample-default-worker-1 | [2023-09-27 21:22:28,222: INFO/MainProcess] Task sample.celery.long_running_task[db72b3cc-0ee8-4854-bc02-4381ad66aa17] received
sample-default-worker-1 | [2023-09-27 21:22:28,232: WARNING/ForkPoolWorker-2] long running task: start
sample-default-worker-1 | [2023-09-27 21:22:28,768: INFO/MainProcess] Task sample.celery.long_running_task[b8d2b761-bebb-43bc-90b0-7cdfb9b8b595] received
sample-default-worker-1 | [2023-09-27 21:22:28,785: WARNING/ForkPoolWorker-1] long running task: start
sample-default-worker-1 | [2023-09-27 21:22:29,251: INFO/MainProcess] Task sample.celery.long_running_task[2cbdb8e8-aa29-45a1-aee1-e9afa244ea10] received
sample-default-worker-1 | [2023-09-27 21:22:29,698: INFO/MainProcess] Task sample.celery.long_running_task[b25e8de3-8b69-45a2-a9e6-9afca2b6e5da] received
sample-flower-1 | [W 230927 21:22:44 web:1869] 404 GET /worker/celery%40ccd33a7f7638 (192.168.65.1): Unknown worker 'celery@ccd33a7f7638'
sample-default-worker-1 | [2023-09-27 21:22:58,250: WARNING/ForkPoolWorker-2] long running task: end
sample-default-worker-1 | [2023-09-27 21:22:58,259: INFO/ForkPoolWorker-2] Task sample.celery.long_running_task[db72b3cc-0ee8-4854-bc02-4381ad66aa17] succeeded in 30.02931193000404s: 'long running task: done'
sample-default-worker-1 | [2023-09-27 21:22:58,294: WARNING/ForkPoolWorker-2] long running task: start
sample-default-worker-1 | [2023-09-27 21:22:58,294: INFO/MainProcess] Task sample.celery.long_running_task[ab9bc5b2-1eed-49c0-b9fd-48be02d72cb2] received
sample-default-worker-1 | [2023-09-27 21:22:58,809: WARNING/ForkPoolWorker-1] long running task: end
sample-default-worker-1 | [2023-09-27 21:22:58,826: INFO/ForkPoolWorker-1] Task sample.celery.long_running_task[b8d2b761-bebb-43bc-90b0-7cdfb9b8b595] succeeded in 30.04512922099093s: 'long running task: done'
sample-default-worker-1 | [2023-09-27 21:22:58,833: WARNING/ForkPoolWorker-1] long running task: start
sample-default-worker-1 | [2023-09-27 21:22:58,835: INFO/MainProcess] Task sample.celery.long_running_task[48c5534d-74ab-462b-8674-c72737080724] received
sample-rabbitmq-1 | 2023-09-27 21:23:10.579 [info] <0.60.0> SIGTERM received - shutting down
sample-rabbitmq-1 | 2023-09-27 21:23:10.583 [warning] <0.587.0> HTTP listener registry could not find context rabbitmq_prometheus_tls
sample-rabbitmq-1 | 2023-09-27 21:23:10.604 [warning] <0.587.0> HTTP listener registry could not find context rabbitmq_management_tls
sample-rabbitmq-1 | 2023-09-27 21:23:10.614 [info] <0.272.0> Peer discovery backend rabbit_peer_discovery_classic_config does not support registration, skipping unregistration.
sample-rabbitmq-1 | 2023-09-27 21:23:10.615 [info] <0.833.0> stopped TCP listener on [::]:5672
sample-rabbitmq-1 | 2023-09-27 21:23:10.616 [error] <0.850.0> Error on AMQP connection <0.850.0> (172.22.0.4:39010 -> 172.22.0.3:5672, vhost: '/', user: 'guest', state: running), channel 0:
sample-rabbitmq-1 | operation none caused a connection exception connection_forced: "broker forced connection closure with reason 'shutdown'"
sample-rabbitmq-1 | 2023-09-27 21:23:10.616 [error] <0.847.0> Error on AMQP connection <0.847.0> (172.22.0.4:39024 -> 172.22.0.3:5672, vhost: '/', user: 'guest', state: running), channel 0:
sample-rabbitmq-1 | operation none caused a connection exception connection_forced: "broker forced connection closure with reason 'shutdown'"
sample-rabbitmq-1 | 2023-09-27 21:23:10.616 [error] <0.844.0> Error on AMQP connection <0.844.0> (172.22.0.4:39000 -> 172.22.0.3:5672, vhost: '/', user: 'guest', state: running), channel 0:
sample-rabbitmq-1 | operation none caused a connection exception connection_forced: "broker forced connection closure with reason 'shutdown'"
sample-rabbitmq-1 | 2023-09-27 21:23:10.616 [error] <0.998.0> Error on AMQP connection <0.998.0> (172.22.0.5:47838 -> 172.22.0.3:5672, vhost: '/', user: 'guest', state: running), channel 0:
sample-rabbitmq-1 | operation none caused a connection exception connection_forced: "broker forced connection closure with reason 'shutdown'"
sample-rabbitmq-1 | 2023-09-27 21:23:10.617 [error] <0.937.0> Error on AMQP connection <0.937.0> (172.22.0.4:39068 -> 172.22.0.3:5672, vhost: '/', user: 'guest', state: running), channel 0:
sample-rabbitmq-1 | operation none caused a connection exception connection_forced: "broker forced connection closure with reason 'shutdown'"
sample-rabbitmq-1 | 2023-09-27 21:23:10.616 [error] <0.947.0> Error on AMQP connection <0.947.0> (172.22.0.4:39084 -> 172.22.0.3:5672, vhost: '/', user: 'guest', state: running), channel 0:
sample-rabbitmq-1 | operation none caused a connection exception connection_forced: "broker forced connection closure with reason 'shutdown'"
sample-rabbitmq-1 | 2023-09-27 21:23:10.616 [error] <0.940.0> Error on AMQP connection <0.940.0> (172.22.0.4:39072 -> 172.22.0.3:5672, vhost: '/', user: 'guest', state: running), channel 0:
sample-rabbitmq-1 | operation none caused a connection exception connection_forced: "broker forced connection closure with reason 'shutdown'"
sample-rabbitmq-1 | 2023-09-27 21:23:10.616 [error] <0.838.0> Error on AMQP connection <0.838.0> (172.22.0.4:38978 -> 172.22.0.3:5672, vhost: '/', user: 'guest', state: running), channel 0:
sample-rabbitmq-1 | operation none caused a connection exception connection_forced: "broker forced connection closure with reason 'shutdown'"
sample-rabbitmq-1 | 2023-09-27 21:23:10.617 [error] <0.933.0> Error on AMQP connection <0.933.0> (172.22.0.4:39056 -> 172.22.0.3:5672, vhost: '/', user: 'guest', state: running), channel 0:
sample-rabbitmq-1 | operation none caused a connection exception connection_forced: "broker forced connection closure with reason 'shutdown'"
sample-rabbitmq-1 | 2023-09-27 21:23:10.616 [error] <0.943.0> Error on AMQP connection <0.943.0> (172.22.0.4:39080 -> 172.22.0.3:5672, vhost: '/', user: 'guest', state: running), channel 0:
sample-rabbitmq-1 | operation none caused a connection exception connection_forced: "broker forced connection closure with reason 'shutdown'"
sample-rabbitmq-1 | 2023-09-27 21:23:10.617 [error] <0.1071.0> Error on AMQP connection <0.1071.0> (172.22.0.2:60990 -> 172.22.0.3:5672, vhost: '/', user: 'guest', state: running), channel 0:
sample-rabbitmq-1 | operation none caused a connection exception connection_forced: "broker forced connection closure with reason 'shutdown'"
sample-rabbitmq-1 | 2023-09-27 21:23:10.617 [error] <0.1012.0> Error on AMQP connection <0.1012.0> (172.22.0.5:47846 -> 172.22.0.3:5672, vhost: '/', user: 'guest', state: running), channel 0:
sample-rabbitmq-1 | operation none caused a connection exception connection_forced: "broker forced connection closure with reason 'shutdown'"
sample-rabbitmq-1 | 2023-09-27 21:23:10.617 [error] <0.991.0> Error on AMQP connection <0.991.0> (172.22.0.5:47826 -> 172.22.0.3:5672, vhost: '/', user: 'guest', state: running), channel 0:
sample-rabbitmq-1 | operation none caused a connection exception connection_forced: "broker forced connection closure with reason 'shutdown'"
sample-rabbitmq-1 | 2023-09-27 21:23:10.617 [error] <0.856.0> Error on AMQP connection <0.856.0> (172.22.0.4:39052 -> 172.22.0.3:5672, vhost: '/', user: 'guest', state: running), channel 0:
sample-rabbitmq-1 | operation none caused a connection exception connection_forced: "broker forced connection closure with reason 'shutdown'"
sample-rabbitmq-1 | 2023-09-27 21:23:10.616 [error] <0.853.0> Error on AMQP connection <0.853.0> (172.22.0.4:39040 -> 172.22.0.3:5672, vhost: '/', user: 'guest', state: running), channel 0:
sample-rabbitmq-1 | operation none caused a connection exception connection_forced: "broker forced connection closure with reason 'shutdown'"
sample-rabbitmq-1 | 2023-09-27 21:23:10.616 [error] <0.841.0> Error on AMQP connection <0.841.0> (172.22.0.4:38990 -> 172.22.0.3:5672, vhost: '/', user: 'guest', state: running), channel 0:
sample-rabbitmq-1 | operation none caused a connection exception connection_forced: "broker forced connection closure with reason 'shutdown'"
sample-rabbitmq-1 | 2023-09-27 21:23:10.622 [error] <0.950.0> Supervisor {<0.950.0>,rabbit_channel_sup_sup} had child channel_sup started with rabbit_channel_sup:start_link() at undefined exit with reason shutdown in context shutdown_error
sample-rabbitmq-1 | 2023-09-27 21:23:10.624 [error] <0.880.0> Supervisor {<0.880.0>,rabbit_channel_sup_sup} had child channel_sup started with rabbit_channel_sup:start_link() at undefined exit with reason shutdown in context shutdown_error
sample-flower-1 | [E 230927 21:23:10 events:191] Failed to capture events: '(0, 0): (320) CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'', trying again in 1 seconds.
sample-default-worker-1 | [2023-09-27 21:23:10,628: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
sample-default-worker-1 | Traceback (most recent call last):
sample-default-worker-1 | File "/usr/local/lib/python3.10/site-packages/celery/worker/consumer/consumer.py", line 340, in start
sample-default-worker-1 | blueprint.start(self)
sample-default-worker-1 | File "/usr/local/lib/python3.10/site-packages/celery/bootsteps.py", line 116, in start
sample-default-worker-1 | step.start(parent)
sample-default-worker-1 | File "/usr/local/lib/python3.10/site-packages/celery/worker/consumer/consumer.py", line 742, in start
sample-default-worker-1 | c.loop(*c.loop_args())
sample-default-worker-1 | File "/usr/local/lib/python3.10/site-packages/celery/worker/loops.py", line 97, in asynloop
sample-default-worker-1 | next(loop)
sample-default-worker-1 | File "/usr/local/lib/python3.10/site-packages/kombu/asynchronous/hub.py", line 373, in create_loop
sample-default-worker-1 | cb(*cbargs)
sample-default-worker-1 | File "/usr/local/lib/python3.10/site-packages/kombu/transport/base.py", line 248, in on_readable
sample-default-worker-1 | reader(loop)
sample-default-worker-1 | File "/usr/local/lib/python3.10/site-packages/kombu/transport/base.py", line 230, in _read
sample-default-worker-1 | drain_events(timeout=0)
sample-default-worker-1 | File "/usr/local/lib/python3.10/site-packages/amqp/connection.py", line 525, in drain_events
sample-default-worker-1 | while not self.blocking_read(timeout):
sample-default-worker-1 | File "/usr/local/lib/python3.10/site-packages/amqp/connection.py", line 531, in blocking_read
sample-default-worker-1 | return self.on_inbound_frame(frame)
sample-default-worker-1 | File "/usr/local/lib/python3.10/site-packages/amqp/method_framing.py", line 53, in on_frame
sample-default-worker-1 | callback(channel, method_sig, buf, None)
sample-default-worker-1 | File "/usr/local/lib/python3.10/site-packages/amqp/connection.py", line 537, in on_inbound_method
sample-default-worker-1 | return self.channels[channel_id].dispatch_method(
sample-default-worker-1 | File "/usr/local/lib/python3.10/site-packages/amqp/abstract_channel.py", line 156, in dispatch_method
sample-default-worker-1 | listener(*args)
sample-default-worker-1 | File "/usr/local/lib/python3.10/site-packages/amqp/connection.py", line 667, in _on_close
sample-default-worker-1 | raise error_for_code(reply_code, reply_text,
sample-default-worker-1 | amqp.exceptions.ConnectionForced: (0, 0): (320) CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'
sample-default-worker-1 | [2023-09-27 21:23:10,632: WARNING/MainProcess] /usr/local/lib/python3.10/site-packages/celery/worker/consumer/consumer.py:391: CPendingDeprecationWarning:
sample-default-worker-1 | In Celery 5.1 we introduced an optional breaking change which
sample-default-worker-1 | on connection loss cancels all currently executed tasks with late acknowledgement enabled.
sample-default-worker-1 | These tasks cannot be acknowledged as the connection is gone, and the tasks are automatically redelivered
sample-default-worker-1 | back to the queue. You can enable this behavior using the worker_cancel_long_running_tasks_on_connection_loss
sample-default-worker-1 | setting. In Celery 5.1 it is set to False by default. The setting will be set to True by default in Celery 6.0.
sample-default-worker-1 |
sample-default-worker-1 | warnings.warn(CANCEL_TASKS_BY_DEFAULT, CPendingDeprecationWarning)
sample-default-worker-1 |
sample-default-worker-1 | [2023-09-27 21:23:10,633: INFO/MainProcess] Temporarily reducing the prefetch count to 1 to avoid over-fetching since 2 tasks are currently being processed.
sample-default-worker-1 | The prefetch count will be gradually restored to 2 as the tasks complete processing.
sample-default-worker-1 | [2023-09-27 21:23:10,635: WARNING/MainProcess] /usr/local/lib/python3.10/site-packages/celery/worker/consumer/consumer.py:507: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
sample-default-worker-1 | whether broker connection retries are made during startup in Celery 6.0 and above.
sample-default-worker-1 | If you wish to retain the existing behavior for retrying connections on startup,
sample-default-worker-1 | you should set broker_connection_retry_on_startup to True.
sample-default-worker-1 | warnings.warn(
sample-default-worker-1 |
sample-rabbitmq-1 | 2023-09-27 21:23:10.636 [info] <0.468.0> Closing all connections in vhost '/' on node 'rabbit@rabbitmq' because the vhost is stopping
sample-default-worker-1 | [2023-09-27 21:23:10,639: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:@rabbitmq:5672//: [Errno 111] Connection refused.
sample-default-worker-1 | Trying again in 2.00 seconds... (1/100)
sample-default-worker-1 |
sample-rabbitmq-1 | 2023-09-27 21:23:10.640 [info] <0.487.0> Stopping message store for directory '/var/lib/rabbitmq/mnesia/rabbit@rabbitmq/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent'
sample-rabbitmq-1 | 2023-09-27 21:23:10.644 [info] <0.487.0> Message store for directory '/var/lib/rabbitmq/mnesia/rabbit@rabbitmq/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent' is stopped
sample-rabbitmq-1 | 2023-09-27 21:23:10.644 [info] <0.483.0> Stopping message store for directory '/var/lib/rabbitmq/mnesia/rabbit@rabbitmq/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_transient'
sample-rabbitmq-1 | 2023-09-27 21:23:10.647 [info] <0.483.0> Message store for directory '/var/lib/rabbitmq/mnesia/rabbit@rabbitmq/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_transient' is stopped
sample-default-worker-1 | [2023-09-27 21:23:12,642: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:
@rabbitmq:5672//: [Errno 111] Connection refused.
sample-default-worker-1 | Trying again in 4.00 seconds... (2/100)
sample-default-worker-1 |
sample-rabbitmq-1 exited with code 0
sample-rabbitmq-1 exited with code 0
sample-rabbitmq-1 | Configuring logger redirection
sample-rabbitmq-1 | 2023-09-27 21:23:15.567 [debug] <0.287.0> Lager installed handler error_logger_lager_h into error_logger
sample-rabbitmq-1 | 2023-09-27 21:23:15.574 [debug] <0.317.0> Lager installed handler lager_forwarder_backend into rabbit_log_queue_lager_event
sample-rabbitmq-1 | 2023-09-27 21:23:15.575 [debug] <0.320.0> Lager installed handler lager_forwarder_backend into rabbit_log_ra_lager_event
sample-rabbitmq-1 | 2023-09-27 21:23:15.575 [debug] <0.323.0> Lager installed handler lager_forwarder_backend into rabbit_log_shovel_lager_event
sample-rabbitmq-1 | 2023-09-27 21:23:15.575 [debug] <0.326.0> Lager installed handler lager_forwarder_backend into rabbit_log_upgrade_lager_event
sample-rabbitmq-1 | 2023-09-27 21:23:15.575 [debug] <0.293.0> Lager installed handler lager_forwarder_backend into rabbit_log_lager_event
sample-rabbitmq-1 | 2023-09-27 21:23:15.575 [debug] <0.290.0> Lager installed handler lager_forwarder_backend into error_logger_lager_event
sample-rabbitmq-1 | 2023-09-27 21:23:15.575 [debug] <0.299.0> Lager installed handler lager_forwarder_backend into rabbit_log_connection_lager_event
sample-rabbitmq-1 | 2023-09-27 21:23:15.575 [debug] <0.296.0> Lager installed handler lager_forwarder_backend into rabbit_log_channel_lager_event
sample-rabbitmq-1 | 2023-09-27 21:23:15.575 [debug] <0.302.0> Lager installed handler lager_forwarder_backend into rabbit_log_feature_flags_lager_event
sample-rabbitmq-1 | 2023-09-27 21:23:15.575 [debug] <0.308.0> Lager installed handler lager_forwarder_backend into rabbit_log_ldap_lager_event
sample-rabbitmq-1 | 2023-09-27 21:23:15.575 [debug] <0.305.0> Lager installed handler lager_forwarder_backend into rabbit_log_federation_lager_event
sample-rabbitmq-1 | 2023-09-27 21:23:15.575 [debug] <0.311.0> Lager installed handler lager_forwarder_backend into rabbit_log_mirroring_lager_event
sample-rabbitmq-1 | 2023-09-27 21:23:15.575 [debug] <0.314.0> Lager installed handler lager_forwarder_backend into rabbit_log_prelaunch_lager_event
sample-rabbitmq-1 | 2023-09-27 21:23:15.580 [info] <0.44.0> Application lager started on node rabbit@rabbitmq
sample-rabbitmq-1 | 2023-09-27 21:23:16.067 [debug] <0.283.0> Lager installed handler lager_backend_throttle into lager_event
sample-rabbitmq-1 | 2023-09-27 21:23:16.256 [info] <0.44.0> Application mnesia started on node rabbit@rabbitmq
sample-rabbitmq-1 | 2023-09-27 21:23:16.257 [info] <0.272.0>
sample-rabbitmq-1 | Starting RabbitMQ 3.8.9 on Erlang 23.2.2
sample-rabbitmq-1 | Copyright (c) 2007-2020 VMware, Inc. or its affiliates.
sample-rabbitmq-1 | Licensed under the MPL 2.0. Website: https://rabbitmq.com
sample-rabbitmq-1 |
sample-rabbitmq-1 | ## ## RabbitMQ 3.8.9
sample-rabbitmq-1 | ## ##
sample-rabbitmq-1 | ########## Copyright (c) 2007-2020 VMware, Inc. or its affiliates.
sample-rabbitmq-1 | ###### ##
sample-rabbitmq-1 | ########## Licensed under the MPL 2.0. Website: https://rabbitmq.com
sample-rabbitmq-1 |
sample-rabbitmq-1 | Doc guides: https://rabbitmq.com/documentation.html
sample-rabbitmq-1 | Support: https://rabbitmq.com/contact.html
sample-rabbitmq-1 | Tutorials: https://rabbitmq.com/getstarted.html
sample-rabbitmq-1 | Monitoring: https://rabbitmq.com/monitoring.html
sample-rabbitmq-1 |
sample-rabbitmq-1 | Logs:
sample-rabbitmq-1 |
sample-rabbitmq-1 | Config file(s): /etc/rabbitmq/rabbitmq.conf
sample-rabbitmq-1 |
sample-rabbitmq-1 | Starting broker...2023-09-27 21:23:16.258 [info] <0.272.0>
sample-rabbitmq-1 | node : rabbit@rabbitmq
sample-rabbitmq-1 | home dir : /var/lib/rabbitmq
sample-rabbitmq-1 | config file(s) : /etc/rabbitmq/rabbitmq.conf
sample-rabbitmq-1 | cookie hash : gExh7HvSqyTfFbIFiMTJCw==
sample-rabbitmq-1 | log(s) :
sample-rabbitmq-1 | database dir : /var/lib/rabbitmq/mnesia/rabbit@rabbitmq
sample-rabbitmq-1 | 2023-09-27 21:23:16.262 [info] <0.272.0> Running boot step pre_boot defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.262 [info] <0.272.0> Running boot step rabbit_core_metrics defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.262 [info] <0.272.0> Running boot step rabbit_alarm defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.265 [info] <0.412.0> Memory high watermark set to 1573 MiB (1649627955 bytes) of 3933 MiB (4124069888 bytes) total
sample-rabbitmq-1 | 2023-09-27 21:23:16.267 [info] <0.419.0> Enabling free disk space monitoring
sample-rabbitmq-1 | 2023-09-27 21:23:16.267 [info] <0.419.0> Disk free limit set to 50MB
sample-rabbitmq-1 | 2023-09-27 21:23:16.269 [info] <0.272.0> Running boot step code_server_cache defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.269 [info] <0.272.0> Running boot step file_handle_cache defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.269 [info] <0.422.0> Limiting to approx 1048479 file handles (943629 sockets)
sample-rabbitmq-1 | 2023-09-27 21:23:16.269 [info] <0.423.0> FHC read buffering: OFF
sample-rabbitmq-1 | 2023-09-27 21:23:16.269 [info] <0.423.0> FHC write buffering: ON
sample-rabbitmq-1 | 2023-09-27 21:23:16.269 [info] <0.272.0> Running boot step worker_pool defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.270 [info] <0.373.0> Will use 2 processes for default worker pool
sample-rabbitmq-1 | 2023-09-27 21:23:16.270 [info] <0.373.0> Starting worker pool 'worker_pool' with 2 processes in it
sample-rabbitmq-1 | 2023-09-27 21:23:16.270 [info] <0.272.0> Running boot step database defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.270 [info] <0.272.0> Waiting for Mnesia tables for 30000 ms, 9 retries left
sample-rabbitmq-1 | 2023-09-27 21:23:16.271 [info] <0.272.0> Successfully synced tables from a peer
sample-rabbitmq-1 | 2023-09-27 21:23:16.271 [info] <0.272.0> Waiting for Mnesia tables for 30000 ms, 9 retries left
sample-rabbitmq-1 | 2023-09-27 21:23:16.271 [info] <0.272.0> Successfully synced tables from a peer
sample-rabbitmq-1 | 2023-09-27 21:23:16.278 [info] <0.272.0> Waiting for Mnesia tables for 30000 ms, 9 retries left
sample-rabbitmq-1 | 2023-09-27 21:23:16.278 [info] <0.272.0> Successfully synced tables from a peer
sample-rabbitmq-1 | 2023-09-27 21:23:16.278 [info] <0.272.0> Peer discovery backend rabbit_peer_discovery_classic_config does not support registration, skipping registration.
sample-rabbitmq-1 | 2023-09-27 21:23:16.278 [info] <0.272.0> Running boot step database_sync defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.279 [info] <0.272.0> Running boot step feature_flags defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.279 [info] <0.272.0> Running boot step codec_correctness_check defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.279 [info] <0.272.0> Running boot step external_infrastructure defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.279 [info] <0.272.0> Running boot step rabbit_registry defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.279 [info] <0.272.0> Running boot step rabbit_auth_mechanism_cr_demo defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.279 [info] <0.272.0> Running boot step rabbit_queue_location_random defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.279 [info] <0.272.0> Running boot step rabbit_event defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.279 [info] <0.272.0> Running boot step rabbit_auth_mechanism_amqplain defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.279 [info] <0.272.0> Running boot step rabbit_auth_mechanism_plain defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.279 [info] <0.272.0> Running boot step rabbit_exchange_type_direct defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.279 [info] <0.272.0> Running boot step rabbit_exchange_type_fanout defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.279 [info] <0.272.0> Running boot step rabbit_exchange_type_headers defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.280 [info] <0.272.0> Running boot step rabbit_exchange_type_topic defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.280 [info] <0.272.0> Running boot step rabbit_mirror_queue_mode_all defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.280 [info] <0.272.0> Running boot step rabbit_mirror_queue_mode_exactly defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.280 [info] <0.272.0> Running boot step rabbit_mirror_queue_mode_nodes defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.280 [info] <0.272.0> Running boot step rabbit_priority_queue defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.280 [info] <0.272.0> Priority queues enabled, real BQ is rabbit_variable_queue
sample-rabbitmq-1 | 2023-09-27 21:23:16.280 [info] <0.272.0> Running boot step rabbit_queue_location_client_local defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.280 [info] <0.272.0> Running boot step rabbit_queue_location_min_masters defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.280 [info] <0.272.0> Running boot step kernel_ready defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.280 [info] <0.272.0> Running boot step rabbit_sysmon_minder defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.280 [info] <0.272.0> Running boot step rabbit_epmd_monitor defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.281 [info] <0.443.0> epmd monitor knows us, inter-node communication (distribution) port: 25672
sample-rabbitmq-1 | 2023-09-27 21:23:16.281 [info] <0.272.0> Running boot step guid_generator defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.282 [info] <0.272.0> Running boot step rabbit_node_monitor defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.282 [info] <0.447.0> Starting rabbit_node_monitor
sample-rabbitmq-1 | 2023-09-27 21:23:16.282 [info] <0.272.0> Running boot step delegate_sup defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.283 [info] <0.272.0> Running boot step rabbit_memory_monitor defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.283 [info] <0.272.0> Running boot step core_initialized defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.283 [info] <0.272.0> Running boot step upgrade_queues defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.289 [info] <0.272.0> Running boot step rabbit_connection_tracking defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.289 [info] <0.272.0> Running boot step rabbit_connection_tracking_handler defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.289 [info] <0.272.0> Running boot step rabbit_exchange_parameters defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.289 [info] <0.272.0> Running boot step rabbit_mirror_queue_misc defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.289 [info] <0.272.0> Running boot step rabbit_policies defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.290 [info] <0.272.0> Running boot step rabbit_policy defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.290 [info] <0.272.0> Running boot step rabbit_queue_location_validator defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.290 [info] <0.272.0> Running boot step rabbit_quorum_memory_manager defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.290 [info] <0.272.0> Running boot step rabbit_vhost_limit defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.290 [info] <0.272.0> Running boot step recovery defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.291 [info] <0.479.0> Making sure data directory '/var/lib/rabbitmq/mnesia/rabbit@rabbitmq/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L' for vhost '/' exists
sample-rabbitmq-1 | 2023-09-27 21:23:16.293 [info] <0.479.0> Starting message stores for vhost '/'
sample-rabbitmq-1 | 2023-09-27 21:23:16.293 [info] <0.483.0> Message store "628WB79CIFDYO9LJI6DKMI09L/msg_store_transient": using rabbit_msg_store_ets_index to provide index
sample-rabbitmq-1 | 2023-09-27 21:23:16.298 [info] <0.479.0> Started message store of type transient for vhost '/'
sample-rabbitmq-1 | 2023-09-27 21:23:16.298 [info] <0.487.0> Message store "628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent": using rabbit_msg_store_ets_index to provide index
sample-rabbitmq-1 | 2023-09-27 21:23:16.300 [info] <0.479.0> Started message store of type persistent for vhost '/'
sample-rabbitmq-1 | 2023-09-27 21:23:16.308 [info] <0.272.0> Running boot step empty_db_check defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.308 [info] <0.272.0> Will not seed default virtual host and user: have definitions to load...
sample-rabbitmq-1 | 2023-09-27 21:23:16.308 [info] <0.272.0> Running boot step rabbit_looking_glass defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.308 [info] <0.272.0> Running boot step rabbit_core_metrics_gc defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.309 [info] <0.272.0> Running boot step background_gc defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.309 [info] <0.272.0> Running boot step connection_tracking defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.309 [info] <0.272.0> Setting up a table for connection tracking on this node: tracked_connection_on_node_rabbit@rabbitmq
sample-rabbitmq-1 | 2023-09-27 21:23:16.309 [info] <0.272.0> Setting up a table for per-vhost connection counting on this node: tracked_connection_per_vhost_on_node_rabbit@rabbitmq
sample-rabbitmq-1 | 2023-09-27 21:23:16.309 [info] <0.272.0> Running boot step routing_ready defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.309 [info] <0.272.0> Running boot step pre_flight defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.309 [info] <0.272.0> Running boot step notify_cluster defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.309 [info] <0.272.0> Running boot step networking defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.309 [info] <0.272.0> Running boot step definition_import_worker_pool defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.310 [info] <0.373.0> Starting worker pool 'definition_import_pool' with 2 processes in it
sample-rabbitmq-1 | 2023-09-27 21:23:16.310 [info] <0.272.0> Running boot step cluster_name defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.310 [info] <0.272.0> Running boot step direct_client defined by app rabbit
sample-rabbitmq-1 | 2023-09-27 21:23:16.312 [info] <0.44.0> Application rabbit started on node rabbit@rabbitmq
sample-rabbitmq-1 | 2023-09-27 21:23:16.603 [info] <0.530.0> Feature flags: list of feature flags found:
sample-rabbitmq-1 | 2023-09-27 21:23:16.603 [info] <0.530.0> Feature flags: [ ] drop_unroutable_metric
sample-rabbitmq-1 | 2023-09-27 21:23:16.603 [info] <0.530.0> Feature flags: [ ] empty_basic_get_metric
sample-rabbitmq-1 | 2023-09-27 21:23:16.604 [info] <0.530.0> Feature flags: [x] implicit_default_bindings
sample-rabbitmq-1 | 2023-09-27 21:23:16.604 [info] <0.530.0> Feature flags: [x] maintenance_mode_status
sample-rabbitmq-1 | 2023-09-27 21:23:16.604 [info] <0.530.0> Feature flags: [x] quorum_queue
sample-rabbitmq-1 | 2023-09-27 21:23:16.604 [info] <0.530.0> Feature flags: [x] virtual_host_metadata
sample-rabbitmq-1 | 2023-09-27 21:23:16.604 [info] <0.530.0> Feature flags: feature flag states written to disk: yes
sample-default-worker-1 | [2023-09-27 21:23:16,647: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:@rabbitmq:5672//: [Errno 111] Connection refused.
sample-default-worker-1 | Trying again in 6.00 seconds... (3/100)
sample-default-worker-1 |
sample-rabbitmq-1 | 2023-09-27 21:23:16.686 [info] <0.530.0> Running boot step rabbit_mgmt_db_handler defined by app rabbitmq_management_agent
sample-rabbitmq-1 | 2023-09-27 21:23:16.687 [info] <0.530.0> Management plugin: using rates mode 'basic'
sample-rabbitmq-1 | 2023-09-27 21:23:16.689 [info] <0.44.0> Application rabbitmq_management_agent started on node rabbit@rabbitmq
sample-rabbitmq-1 | 2023-09-27 21:23:16.694 [info] <0.44.0> Application cowlib started on node rabbit@rabbitmq
sample-rabbitmq-1 | 2023-09-27 21:23:16.700 [info] <0.44.0> Application cowboy started on node rabbit@rabbitmq
sample-rabbitmq-1 | 2023-09-27 21:23:16.706 [info] <0.44.0> Application rabbitmq_web_dispatch started on node rabbit@rabbitmq
sample-rabbitmq-1 | 2023-09-27 21:23:16.711 [info] <0.44.0> Application amqp_client started on node rabbit@rabbitmq
sample-rabbitmq-1 | 2023-09-27 21:23:16.716 [info] <0.530.0> Running boot step rabbit_mgmt_reset_handler defined by app rabbitmq_management
sample-rabbitmq-1 | 2023-09-27 21:23:16.716 [info] <0.530.0> Running boot step rabbit_management_load_definitions defined by app rabbitmq_management
sample-rabbitmq-1 | 2023-09-27 21:23:16.728 [info] <0.595.0> Management plugin: HTTP (non-TLS) listener started on port 15672
sample-rabbitmq-1 | 2023-09-27 21:23:16.728 [info] <0.701.0> Statistics database started.
sample-rabbitmq-1 | 2023-09-27 21:23:16.728 [info] <0.700.0> Starting worker pool 'management_worker_pool' with 3 processes in it
sample-rabbitmq-1 | 2023-09-27 21:23:16.728 [info] <0.44.0> Application rabbitmq_management started on node rabbit@rabbitmq
sample-rabbitmq-1 | 2023-09-27 21:23:16.744 [info] <0.44.0> Application prometheus started on node rabbit@rabbitmq
sample-rabbitmq-1 | 2023-09-27 21:23:16.751 [info] <0.714.0> Prometheus metrics: HTTP (non-TLS) listener started on port 15692
sample-rabbitmq-1 | 2023-09-27 21:23:16.751 [info] <0.530.0> Ready to start client connection listeners
sample-rabbitmq-1 | 2023-09-27 21:23:16.751 [info] <0.44.0> Application rabbitmq_prometheus started on node rabbit@rabbitmq
sample-rabbitmq-1 | 2023-09-27 21:23:16.752 [info] <0.833.0> started TCP listener on [::]:5672
sample-rabbitmq-1 | 2023-09-27 21:23:16.873 [info] <0.530.0> Server startup complete; 4 plugins started.
sample-rabbitmq-1 | * rabbitmq_prometheus
sample-rabbitmq-1 | * rabbitmq_management
sample-rabbitmq-1 | * rabbitmq_web_dispatch
sample-rabbitmq-1 | * rabbitmq_management_agent
sample-rabbitmq-1 | completed with 4 plugins.
sample-rabbitmq-1 | 2023-09-27 21:23:16.873 [info] <0.530.0> Resetting node maintenance status
sample-rabbitmq-1 | 2023-09-27 21:23:17.585 [info] <0.836.0> accepting AMQP connection <0.836.0> (172.22.0.4:38188 -> 172.22.0.3:5672)
sample-rabbitmq-1 | 2023-09-27 21:23:17.588 [info] <0.836.0> connection <0.836.0> (172.22.0.4:38188 -> 172.22.0.3:5672): user 'guest' authenticated and granted access to vhost '/'
sample-rabbitmq-1 | 2023-09-27 21:23:17.648 [info] <0.846.0> accepting AMQP connection <0.846.0> (172.22.0.4:38200 -> 172.22.0.3:5672)
sample-rabbitmq-1 | 2023-09-27 21:23:17.654 [info] <0.846.0> connection <0.846.0> (172.22.0.4:38200 -> 172.22.0.3:5672): user 'guest' authenticated and granted access to vhost '/'
sample-rabbitmq-1 | 2023-09-27 21:23:17.662 [info] <0.855.0> accepting AMQP connection <0.855.0> (172.22.0.4:38212 -> 172.22.0.3:5672)
sample-rabbitmq-1 | 2023-09-27 21:23:17.666 [info] <0.855.0> connection <0.855.0> (172.22.0.4:38212 -> 172.22.0.3:5672): user 'guest' authenticated and granted access to vhost '/'
sample-flower-1 | [I 230927 21:23:17 mixins:228] Connected to amqp://guest:
@rabbitmq:5672//
sample-rabbitmq-1 | 2023-09-27 21:23:17.674 [info] <0.869.0> accepting AMQP connection <0.869.0> (172.22.0.4:38218 -> 172.22.0.3:5672)
sample-rabbitmq-1 | 2023-09-27 21:23:17.675 [info] <0.869.0> connection <0.869.0> (172.22.0.4:38218 -> 172.22.0.3:5672): user 'guest' authenticated and granted access to vhost '/'
sample-rabbitmq-1 | 2023-09-27 21:23:22.675 [info] <0.880.0> accepting AMQP connection <0.880.0> (172.22.0.5:58834 -> 172.22.0.3:5672)
sample-rabbitmq-1 | 2023-09-27 21:23:22.687 [info] <0.880.0> connection <0.880.0> (172.22.0.5:58834 -> 172.22.0.3:5672): user 'guest' authenticated and granted access to vhost '/'
sample-default-worker-1 | [2023-09-27 21:23:22,689: INFO/MainProcess] Connected to amqp://guest:**@rabbitmq:5672//
sample-default-worker-1 | [2023-09-27 21:23:22,699: WARNING/MainProcess] /usr/local/lib/python3.10/site-packages/celery/worker/consumer/consumer.py:507: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
sample-default-worker-1 | whether broker connection retries are made during startup in Celery 6.0 and above.
sample-default-worker-1 | If you wish to retain the existing behavior for retrying connections on startup,
sample-default-worker-1 | you should set broker_connection_retry_on_startup to True.
sample-default-worker-1 | warnings.warn(
sample-default-worker-1 |
sample-rabbitmq-1 | 2023-09-27 21:23:22.702 [info] <0.887.0> accepting AMQP connection <0.887.0> (172.22.0.5:58844 -> 172.22.0.3:5672)
sample-rabbitmq-1 | 2023-09-27 21:23:22.704 [info] <0.887.0> connection <0.887.0> (172.22.0.5:58844 -> 172.22.0.3:5672): user 'guest' authenticated and granted access to vhost '/'
sample-default-worker-1 | [2023-09-27 21:23:22,709: INFO/MainProcess] mingle: searching for neighbors
sample-rabbitmq-1 | 2023-09-27 21:23:22.720 [info] <0.906.0> accepting AMQP connection <0.906.0> (172.22.0.5:58860 -> 172.22.0.3:5672)
sample-rabbitmq-1 | 2023-09-27 21:23:22.723 [info] <0.906.0> connection <0.906.0> (172.22.0.5:58860 -> 172.22.0.3:5672): user 'guest' authenticated and granted access to vhost '/'
sample-default-worker-1 | [2023-09-27 21:23:23,797: INFO/MainProcess] mingle: all alone
sample-default-worker-1 | [2023-09-27 21:23:23,827: INFO/MainProcess] Task sample.celery.long_running_task[ab9bc5b2-1eed-49c0-b9fd-48be02d72cb2] received
sample-default-worker-1 | [2023-09-27 21:23:28,300: WARNING/ForkPoolWorker-2] long running task: end
sample-default-worker-1 | [2023-09-27 21:23:28,310: INFO/ForkPoolWorker-2] Task sample.celery.long_running_task[2cbdb8e8-aa29-45a1-aee1-e9afa244ea10] succeeded in 30.0159327230067s: 'long running task: done'
sample-default-worker-1 | [2023-09-27 21:23:28,320: WARNING/ForkPoolWorker-2] long running task: start
sample-default-worker-1 | [2023-09-27 21:23:28,320: INFO/MainProcess] Resuming normal operations following a restart.
sample-default-worker-1 | Prefetch count has been restored to the maximum of 2
sample-default-worker-1 | [2023-09-27 21:23:28,323: INFO/MainProcess] Task sample.celery.long_running_task[48c5534d-74ab-462b-8674-c72737080724] received
sample-default-worker-1 | [2023-09-27 21:23:28,324: INFO/MainProcess] Task sample.celery.long_running_task[827dd637-df4c-4fbd-97ed-2738d8a2e6f1] received
sample-default-worker-1 | [2023-09-27 21:23:28,862: WARNING/ForkPoolWorker-1] long running task: end
sample-default-worker-1 | [2023-09-27 21:23:28,865: INFO/ForkPoolWorker-1] Task sample.celery.long_running_task[b25e8de3-8b69-45a2-a9e6-9afca2b6e5da] succeed


Possible solution

While debugging we found a way to fix this but we are not sure it's the right way or if there are unintended consequences.

  1. Do not clear the _cache
# celery/concurrency/asynpool.py:L1020
    def flush(self):
        if self._state == TERMINATE:
            return
        # cancel all tasks that haven't been accepted so that NACK is sent
        # if synack is enabled.
        if self.synack:
            for job in self._cache.values():
                if not job._accepted:
                    job._cancel()

        # clear the outgoing buffer as the tasks will be redelivered by
        # the broker anyway.
        if self.outbound_buffer:
            self.outbound_buffer.clear()

        self.maintain_pool()

        try:
            # ...but we must continue writing the payloads we already started
            # to keep message boundaries.
            # The messages may be NACK'ed later if synack is enabled.
            if self._state == RUN:
                # flush outgoing buffers
                intervals = fxrange(0.01, 0.1, 0.01, repeatlast=True)

                # TODO: Rewrite this as a dictionary comprehension once we drop support for Python 3.7
                #       This dict comprehension requires the walrus operator which is only available in 3.8.
                owned_by = {}
                for job in self._cache.values():
                    writer = _get_job_writer(job)
                    if writer is not None:
                        owned_by[writer] = job

                if not self._active_writers:
                    # self._cache.clear()
                else:
                    ...

Or directly avoid flushing the pool when the consumer disconnects:

# colery/worker/consumer/consumer.py:L445
    def on_close(self):
        # Clear internal queues to get rid of old messages.
        # They can't be acked anyway, as a delivery tag is specific
        # to the current channel.
        if self.controller and self.controller.semaphore:
            self.controller.semaphore.clear()
        if self.timer:
            self.timer.clear()
        for bucket in self.task_buckets.values():
            if bucket:
                bucket.clear_pending()
        for request_id in reserved_requests:
            if request_id in requests:
                del requests[request_id]
        reserved_requests.clear()
        # if self.pool and self.pool.flush:
        #     self.pool.flush()

By doing this the tasks are correctly computed as processed and no longer active because the _cache won't raise a KeyError when the job is done.

One of the things that puzzles me, is if we are not using ack_late then why do we need to flush the worker pool? My feeling is that they should be as independent as possible.

  1. Regarding the succeeded event we found this workaround:
# celery/events/dispatcher.py:L215
def extend_buffer(self, other):
        """Copy the outbound buffer of another instance."""
        self._outbound_buffer.extend(other._outbound_buffer)
        self._group_buffer = other._group_buffer  # Keep the other/prev messages buffer

When the EventDispatcher gets flushed we need to keep the pointer to the previous _group_buffer so the ongoing jobs before the connection restart will have the correct pointer.

We are hesitant if 1 and 2 are real solutions or if we are not considering other use cases or externalities. Any help and feedback are more than welcomed!

@acangiani
Copy link
Author

@thedrow Upon further investigation I think it is related to your MR #6863, if possible, can you take a look into this issue?

@auvipy
Copy link
Member

auvipy commented Oct 2, 2023

Hey, Agustin! can you come with a PR with extensive test case and prospective fix? as the old PR do not have any unit test so it is difficult to verify that. And thanks for your investigation so far. We can discuss further on the draft/in progress PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants