Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

celery workers randomly hang on IPC (causing all jobs to queue, but no longer run) #4185

Closed
chrismeyersfsu opened this issue Aug 7, 2017 · 121 comments

Comments

@chrismeyersfsu
Copy link

chrismeyersfsu commented Aug 7, 2017

Fails in the same way on multiple celery versions:
celery version 3.1.7 billiard version 3.3.0.23
celery version 3.1.25 billiard version 3.3.0.23

celery worker -l debug --autoscale=50,4 -Ofair -Q tower_scheduler,tower_broadcast_all,tower,localhost -n celery@localhost

Steps to reproduce

Happens sporadically when running our nightly 8 hour integration tests on Ubuntu 14.04 and 16.04. Does not happen on RHEL 7 nor Centos 7. We are working on a set of smaller recreation steps.

Expected behavior

  • For the workers to continue processing work.

Actual behavior

The celery master process and a particular worker both block on the same file descriptor, performing a read(). The worker has finished a job and is ready for more work. The parent is "locked up", blocking on the read() system call shared by the worker.

Restarting celery "fixes" the issue. More surgically, sending a SIGUSR1 to the master process "fixes" the issue by breaking it out of the read() system call. The child then returns from the read and seems to process a/the pending message. The master process does NOT try to read() from the PIPE again.

https://github.com/celery/celery/blob/3.1/celery/concurrency/asynpool.py#L220 is where we are hanging. This is a call to __read__ gotten from https://github.com/celery/billiard/blob/3.3/Modules/_billiard/multiprocessing.c#L202
This is non-blocking and non-asynchronous. I don't really understand how this code isn't susceptible to a deadlock/infinite blocking scenario. The read is non-blocking and is called in such a way that it can block forever if the child dies.

We are now trying to recreate the problem with CELERY_RDBSIG="1" set so that we can jump into a remote debug session when the deadlock occurs.

Any advise would be helpful.

From an OS perspective, I can't reason how this could occur.

  • No write() being observed after the SIGUSR1.
  • 2 processes blocked on the same PIPE via a read(). This means that there is either no data in the pipe or not enough data.
  • One process "gives up" (because we sent the SIGUSR1)
  • Then the other processes read() proceeds with data ... without a write() observed.
@codeadict
Copy link

codeadict commented Aug 9, 2017

I have the same issue here, workers look like running but they aren't processing messages.

screen shot 2017-08-08 at 6 55 12 pm

@codeadict
Copy link

I'm running on Python 3.6 and using Celery 4.1.0

@chrismeyersfsu
Copy link
Author

@codeadict are you able to strace the two processes (worker and parent) to see if they are both stuck on the same file descriptor?

You can verify it's the same fd with sudo lsof | grep 53r where 53r is the number of the file descriptor the processes are blocked on.

@ryanpetrello
Copy link
Contributor

ryanpetrello commented Aug 11, 2017

@codeadict as @chrismeyersfsu mentioned, you can strace the master celery process and see if it's stuck reading on a pipe:

$ sudo strace -p <pid-of-celery-master-process> -s 100
Process X attached
read(53

@ryanpetrello
Copy link
Contributor

If you have python-dbg installed w/ Python symbols, you can also use gdb to see where the master is stuck:

$ gdb python -p <pid-of-master-process>
...
(gdb) py-list
 216            # header
 217
 218            while Hr < 4:
 219                try:
 220                    n = __read__(
>221                        fd, bufv[Hr:] if readcanbuf else bufv, 4 - Hr,

https://github.com/celery/celery/blob/v3.1.17/celery/concurrency/asynpool.py#L219

@harlov
Copy link

harlov commented Oct 20, 2017

Same problem.

celery==3.1.23, billiard==3.3.0.23
Workers started with options: -time-limit=7200 --autoscale=64,2 --maxtasksperchild=1
python 2.7.3

After few hours celery 'freeze'
I see that master and childs infinity wait read() on pipes (b.t.w, different pipes).

So, this is master process:

root@vm1:~# strace -p 41840
Process 41840 attached - interrupt to quit
read(50, ^C <unfinished ...>
Process 41840 detached

root@vm1:~# sudo lsof | grep 50r
python     41840                 root   50r     FIFO                0,8        0t0     548796 pipe
python     46237                 root   50r      CHR                1,3        0t0       7604 /dev/null
python     46304                 root   50r     FIFO                0,8        0t0     548796 pipe


root@vm1:~# ls -lh /proc/41840/fd/50
lr-x------ 1 root root 64 Oct 20 10:02 /proc/41840/fd/50 -> pipe:[548796]

root@vm1:~# (find /proc -type l | xargs ls -l | fgrep 'pipe:[548796]') 2>/dev/null
lr-x------ 1 root    root    64 Oct 20 10:02 /proc/41840/fd/50 -> pipe:[548796]
lr-x------ 1 root    root    64 Oct 20 10:02 /proc/41840/fd/51 -> pipe:[548796]
lr-x------ 1 root    root    64 Oct 20 10:07 /proc/41840/task/41840/fd/50 -> pipe:[548796]
lr-x------ 1 root    root    64 Oct 20 10:07 /proc/41840/task/41840/fd/51 -> pipe:[548796]
l-wx------ 1 root    root    64 Oct 20 10:02 /proc/46237/fd/51 -> pipe:[548796]
l-wx------ 1 root    root    64 Oct 20 10:07 /proc/46237/task/46237/fd/51 -> pipe:[548796]
lr-x------ 1 root    root    64 Oct 20 10:02 /proc/46304/fd/50 -> pipe:[548796]
lr-x------ 1 root    root    64 Oct 20 10:02 /proc/46304/fd/51 -> pipe:[548796]
lr-x------ 1 root    root    64 Oct 20 10:07 /proc/46304/task/46304/fd/50 -> pipe:[548796]
lr-x------ 1 root    root    64 Oct 20 10:07 /proc/46304/task/46304/fd/51 -> pipe:[548796]

And one of childs:

root@vm1:~# strace -p 46304
Process 46304 attached - interrupt to quit
read(16, ^C <unfinished ...>
Process 46304 detached

root@vm1:~# ls -lh /proc/46304/fd/16
lr-x------ 1 root root 64 Oct 20 10:02 /proc/46304/fd/16 -> pipe:[482281]


root@vm1:~# (find /proc -type l | xargs ls -l | fgrep 'pipe:[482281]') 2>/dev/null
lr-x------ 1 root    root    64 Oct 20 10:02 /proc/41840/fd/16 -> pipe:[482281]
l-wx------ 1 root    root    64 Oct 20 10:02 /proc/41840/fd/17 -> pipe:[482281]
lr-x------ 1 root    root    64 Oct 20 10:07 /proc/41840/task/41840/fd/16 -> pipe:[482281]
l-wx------ 1 root    root    64 Oct 20 10:07 /proc/41840/task/41840/fd/17 -> pipe:[482281]
lr-x------ 1 root    root    64 Oct 20 10:02 /proc/46237/fd/16 -> pipe:[482281]
l-wx------ 1 root    root    64 Oct 20 10:02 /proc/46237/fd/17 -> pipe:[482281]
lr-x------ 1 root    root    64 Oct 20 10:07 /proc/46237/task/46237/fd/16 -> pipe:[482281]
l-wx------ 1 root    root    64 Oct 20 10:07 /proc/46237/task/46237/fd/17 -> pipe:[482281]
lr-x------ 1 root    root    64 Oct 20 10:02 /proc/46304/fd/16 -> pipe:[482281]
lr-x------ 1 root    root    64 Oct 20 10:07 /proc/46304/task/46304/fd/16 -> pipe:[482281]
root@vm1:~# 

And this current 'freezed' frame in master process:

Thread 0x7efc1c101700
('Frame: ', 63347776)
  File "/usr/lib/python2.7/runpy.py", line 162, in _run_module_as_main
    "__main__", fname, loader, pkg_name)
  File "/usr/lib/python2.7/runpy.py", line 72, in _run_code
    exec code in run_globals
  File "/opt/waf/python/lib/python2.7/site-packages/celery/__main__.py", line 54, in <module>
    main()
  File "/opt/waf/python/lib/python2.7/site-packages/celery/__main__.py", line 30, in main
    main()
  File "/opt/waf/python/lib/python2.7/site-packages/celery/bin/celery.py", line 81, in main
    cmd.execute_from_commandline(argv)
  File "/opt/waf/python/lib/python2.7/site-packages/celery/bin/celery.py", line 793, in execute_from_commandline
    super(CeleryCommand, self).execute_from_commandline(argv)))
  File "/opt/waf/python/lib/python2.7/site-packages/celery/bin/base.py", line 311, in execute_from_commandline
    return self.handle_argv(self.prog_name, argv[1:])
  File "/opt/waf/python/lib/python2.7/site-packages/celery/bin/celery.py", line 785, in handle_argv
    return self.execute(command, argv)
  File "/opt/waf/python/lib/python2.7/site-packages/celery/bin/celery.py", line 717, in execute
    ).run_from_argv(self.prog_name, argv[1:], command=argv[0])
  File "/opt/waf/python/lib/python2.7/site-packages/celery/bin/worker.py", line 179, in run_from_argv
    return self(*args, **options)
  File "/opt/waf/python/lib/python2.7/site-packages/celery/bin/base.py", line 274, in __call__
    ret = self.run(*args, **kwargs)
  File "/opt/waf/python/lib/python2.7/site-packages/celery/bin/worker.py", line 212, in run
    state_db=self.node_format(state_db, hostname), **kwargs
  File "/opt/waf/python/lib/python2.7/site-packages/celery/worker/__init__.py", line 206, in start
    self.blueprint.start(self)
  File "/opt/waf/python/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "/opt/waf/python/lib/python2.7/site-packages/celery/bootsteps.py", line 374, in start
    return self.obj.start()
  File "/opt/waf/python/lib/python2.7/site-packages/celery/worker/consumer.py", line 279, in start
    blueprint.start(self)
  File "/opt/waf/python/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "/opt/waf/python/lib/python2.7/site-packages/celery/worker/consumer.py", line 838, in start
    c.loop(*c.loop_args())
  File "/opt/waf/python/lib/python2.7/site-packages/celery/worker/loops.py", line 76, in asynloop
    next(loop)
  File "/opt/waf/python/lib/python2.7/site-packages/kombu/async/hub.py", line 340, in create_loop
    cb(*cbargs)
  File "/opt/waf/python/lib/python2.7/site-packages/celery/concurrency/asynpool.py", line 279, in on_result_readable
    next(it)
  File "/opt/waf/python/lib/python2.7/site-packages/celery/concurrency/asynpool.py", line 221, in _recv_message
    fd, bufv[Hr:] if readcanbuf else bufv, 4 - Hr,
()

@korycins
Copy link

@harlov @codeadict @chrismeyersfsu Did you find any solution or workaround?

@chrismeyersfsu
Copy link
Author

No. We are upgrading to celery 4. Will run out same set of tests that invokes this error and see if we can recreate on celery 4.

@harlov
Copy link

harlov commented Nov 21, 2017

@korycins we remove --autoscale and --maxtasksperchild - all okay yet.

I gues ongoing respawning workers with autoscale о maxtasksperchild may cause this situation.

@korycins
Copy link

@chrismeyersfsu I have the same issue with celery 4.1 so I guess switching to v4.1 will not help you.
Previously I used only --autoscale option. Workers hang on from time to time ( around once per week) but when I added maxtasksperchild=20, workers hang on around once per hour. I thought that problem can be with maxtasksperchild so I removed it.
I created custom autoscale to dynamically scale up/down based on free resources and the same issue occurs so I guess problem is connected with re spawning workers (as @harlov said)

@auvipy
Copy link
Member

auvipy commented Dec 21, 2017

can anyone verify the issues on top of latest master? not sure if it's already fixed with latest commit.

@ryanpetrello
Copy link
Contributor

ryanpetrello commented Jan 3, 2018

@auvipy which commit(s) do you suspect might resolve this? I'm currently able to reproduce this issue on celery 4.1.0, billiard 3.5.0.3 from PyPI.

@auvipy
Copy link
Member

auvipy commented Jan 15, 2018

I am not sure so could you plz try the master branch? If after releasing 4.2 it still exists work would be carried out to fix

@ryanpetrello
Copy link
Contributor

@auvipy we could give it a try, but it may be hard to say for sure - personally, we only seem to encounter this hang once every few weeks, so it's pretty hard to narrow down.

@korycins
Copy link

@ryanpetrello Try to add max_task_per_child and set it to 1. http://docs.celeryproject.org/en/latest/userguide/workers.html#max-tasks-per-child-setting
This should increase posibility of issue occurrence

@auvipy auvipy added this to the v5.0.0 milestone Jan 15, 2018
@trianglesis
Copy link

Same issue, cannot reproduce right now, I thought It was configure issue, when restart just un-hang the worker, but looks like it happening from time to time.

@kleptog
Copy link

kleptog commented Mar 7, 2018

Just chiming in to say we also run into this issue. celery 4.1.0, kombu 4.1.0, billiard 3.5.0.3.
Command line: /usr/bin/python /usr/local/bin/celery worker --app myapp.celery -B --loglevel INFO --autoscale=5,1 --max-tasks-per-child=50 --max-memory-per-child=1000000 -O fair

With respect to blocking on file descriptors, we see the same as @harlov above: the master is in a deadlock with one of its workers.

Sending SIGUSR1 to the celery master "unsticks" the process and dumps the stacktrace below. What is suspicious is that in a function "on_result_readable" it gets stuck on a read. This feels like a classic race condition: poll() returns readable but the read() blocks. This can happen, this is why it is always recommended to set the FDs in non-block mode to make sure nothing goes wrong.

Relevant message on LKML: https://lkml.org/lkml/2011/6/18/105

Not sure it is easy to set the descriptors non-blocking, because we have an environment with many celery masters and there's always a few stuck, if it's a simple change we may be able to test it quickly.

MainThread                                                                                                                                                                                                                            [636/660]
=================================================                                                                                                                                                    
  File "/usr/local/bin/celery", line 11, in <module>                                                                                                                                                 
    sys.exit(main())                                                                                                                                                                                 
  File "/usr/local/lib/python2.7/dist-packages/celery/__main__.py", line 14, in main                                                                                                             
    _main()                                                                                                                                                                                          
  File "/usr/local/lib/python2.7/dist-packages/celery/bin/celery.py", line 326, in main                                                                                                              
    cmd.execute_from_commandline(argv)                                                                                                                                                               
  File "/usr/local/lib/python2.7/dist-packages/celery/bin/celery.py", line 488, in execute_from_commandline                                                                                                            
    super(CeleryCommand, self).execute_from_commandline(argv)))                                                                                                                                      
  File "/usr/local/lib/python2.7/dist-packages/celery/bin/base.py", line 281, in execute_from_commandline                                                                                        
    return self.handle_argv(self.prog_name, argv[1:])                                                                                                              
  File "/usr/local/lib/python2.7/dist-packages/celery/bin/celery.py", line 480, in handle_argv                                                                                                       
    return self.execute(command, argv)                                                                                                                                                           
  File "/usr/local/lib/python2.7/dist-packages/celery/bin/celery.py", line 412, in execute                                                                                                           
    ).run_from_argv(self.prog_name, argv[1:], command=argv[0])                                                                                                                                   
  File "/usr/local/lib/python2.7/dist-packages/celery/bin/worker.py", line 221, in run_from_argv                                                                                                                       
    return self(*args, **options)                                                                                                                                                                    
  File "/usr/local/lib/python2.7/dist-packages/celery/bin/base.py", line 244, in __call__                                                                                                            
    ret = self.run(*args, **kwargs)                                                                                                                                                                  
  File "/usr/local/lib/python2.7/dist-packages/celery/bin/worker.py", line 256, in run                                                                                                          
    worker.start()                                                                                                                                                                                   
  File "/usr/local/lib/python2.7/dist-packages/celery/worker/worker.py", line 203, in start                                                                                                     
    self.blueprint.start(self)                                                                                                                                      
  File "/usr/local/lib/python2.7/dist-packages/celery/bootsteps.py", line 119, in start                                                                                                                                
    step.start(parent)                                                                                                                                                                               
  File "/usr/local/lib/python2.7/dist-packages/celery/bootsteps.py", line 370, in start                                                                                                              
    return self.obj.start()                                                                                                                                                                      
  File "/usr/local/lib/python2.7/dist-packages/celery/worker/consumer/consumer.py", line 320, in start                                                                                               
    blueprint.start(self)                                                                                                                                                                            
  File "/usr/local/lib/python2.7/dist-packages/celery/bootsteps.py", line 119, in start                                                                                                             
    step.start(parent)                                                                                                                                                                               
  File "/usr/local/lib/python2.7/dist-packages/celery/worker/consumer/consumer.py", line 596, in start                                                                                               
    c.loop(*c.loop_args())                                                                                                                                                                           
  File "/usr/local/lib/python2.7/dist-packages/celery/worker/loops.py", line 88, in asynloop                                                                                                         
    next(loop)                                                                                                                                                                                       
  File "/usr/local/lib/python2.7/dist-packages/kombu/async/hub.py", line 354, in create_loop                                                                        
    cb(*cbargs)                                                                                                                                                                                      
  File "/usr/local/lib/python2.7/dist-packages/celery/concurrency/asynpool.py", line 292, in on_result_readable                                                                                      
    next(it)
  File "/usr/local/lib/python2.7/dist-packages/celery/concurrency/asynpool.py", line 235, in _recv_message
    fd, bufv[Hr:] if readcanbuf else bufv, 4 - Hr,
  File "/usr/local/lib/python2.7/dist-packages/celery/apps/worker.py", line 349, in cry_handler
    safe_say(cry())
  File "/usr/local/lib/python2.7/dist-packages/celery/utils/debug.py", line 193, in cry
    traceback.print_stack(frame, file=out)
=================================================
LOCAL VARIABLES
=================================================
{'P': <functools.partial object at 0x7f29da0f1c58>,
 'frame': <frame object at 0x37137b0>,
 'out': <vine.five.WhateverIO object at 0x7f29da14e150>,
 'sep': u'=================================================',
 'sepchr': u'=',
 'seplen': 49,
 'thread': <_MainThread(MainThread, started 139818014390016)>,
 'threading': <module 'threading' from '/usr/lib/python2.7/threading.pyc'>,
 'tid': 139818014390016,
 'tmap': {139818014390016: <_MainThread(MainThread, started 139818014390016)>}}

@kleptog
Copy link

kleptog commented Mar 13, 2018

Just an update. We spent some time testing things and trying to reproduce it.

  • Setting max_tasks_per_child to 1 makes it worse
  • Removing -O fair makes no difference
  • Removing the autoscale also doesn't help
  • We've so far only reproduced it on machines with kernel 4.9 (Stretch), not on machines with 3.16 (Jessie)

I verified that the file descriptors normally used by celery are all correctly marked non-blocking and they are. So the question is, how does the code block if it's every descriptor is non-blocking.

The track I'm following now is that the epoll() object is getting confused about the descriptors it's supposed to be watching. If you strace celery you get lots of output like:

14659 14:32:56.665937 epoll_ctl(4, EPOLL_CTL_ADD, 43, {EPOLLIN|EPOLLERR|EPOLLHUP, {u32=43, u64=14879199376994992171}}) = -1 EEXIST (File exists)
14659 14:32:56.667084 epoll_ctl(4, EPOLL_CTL_ADD, 16, {EPOLLIN|EPOLLERR|EPOLLHUP, {u32=16, u64=14879199376994992144}}) = -1 EEXIST (File exists)
14659 14:32:56.667278 epoll_ctl(4, EPOLL_CTL_ADD, 26, {EPOLLIN|EPOLLERR|EPOLLHUP, {u32=26, u64=14879199376994992154}}) = -1 EEXIST (File exists)
14659 14:32:56.667440 epoll_ctl(4, EPOLL_CTL_ADD, 23, {EPOLLIN|EPOLLERR|EPOLLHUP, {u32=23, u64=14879199376994992151}}) = -1 EEXIST (File exists)
14659 14:32:56.667607 epoll_ctl(4, EPOLL_CTL_DEL, 37, 0x7ffd31c17f60) = -1 ENOENT (No such file or directory)
14659 14:32:56.667804 epoll_ctl(4, EPOLL_CTL_DEL, 38, 0x7ffd31c17f60) = -1 ENOENT (No such file or directory)

It's entirely plausible that at some point a file descriptor that once was used for one thing gets used for something else, the hub gets confused and calls the wrong method and boom!

@kleptog
Copy link

kleptog commented Mar 14, 2018

Ok, so I've found the problem, and I have strace output capturing it failing. And it's very subtle and to do with the following completely non-obvious 'feature' of the epoll() interface. From the manpage:

Q6
Will closing a file descriptor cause it to be removed from all epoll sets automatically?
A6
Yes, but be aware of the following point. A file descriptor is a reference to an open file description (see open(2)). Whenever a file descriptor is duplicated via dup(2), dup2(2), fcntl(2) F_DUPFD, or fork(2), a new file descriptor referring to the same open file description is created. An open file description con‐ tinues to exist until all file descriptors referring to it have been closed. A file descriptor is removed from an epoll set only after all the file descriptors referring to the underlying open file description have been closed (or before if the file descrip‐ tor is explicitly removed using epoll_ctl(2) EPOLL_CTL_DEL). This means that even after a file descriptor that is part of an epoll set has been closed, events may be reported for that file descriptor if other file descriptors referring to the same under‐ lying file description remain open.

What is happening is that a file descriptor is being used by celery for communication, the file descriptor leaks to a worker, the master closes the file descriptor and then opens another for the sentinel which is not marked non-blocking. The child process dies causing the file descriptor to get marked as readable, the master tries to read it an blocks.

Yes, epoll() returns readability for a file descriptor that is no longer open in this process.

Here I'm pasting captured strace output for just fd 49 as evidence (apologies for the multi-line output, strace was monitoring multiple processes):

7417  15:04:00.001511 pipe( <unfinished ...>
7417  15:04:00.001646 <... pipe resumed> [49, 50]) = 0
7417  15:04:00.001727 fcntl(49, F_GETFL <unfinished ...>
7417  15:04:00.002091 <... fcntl resumed> ) = 0 (flags O_RDONLY)
7417  15:04:00.002165 fcntl(49, F_SETFL, O_RDONLY|O_NONBLOCK <unfinished ...>
7417  15:04:00.002222 <... fcntl resumed> ) = 0
7417  15:04:00.003052 fcntl(49, F_GETFL <unfinished ...>
7417  15:04:00.003195 <... fcntl resumed> ) = 0x800 (flags O_RDONLY|O_NONBLOCK)
...
7417  15:04:00.237131 fcntl(49, F_GETFL) = 0x800 (flags O_RDONLY|O_NONBLOCK)
7417  15:04:00.237363 epoll_ctl(4, EPOLL_CTL_ADD, 49, {EPOLLIN|EPOLLERR|EPOLLHUP, {u32=49, u64=12923050543437316145}} <unfinished ...>
7417  15:04:00.237478 <... epoll_ctl resumed> ) = 0
...
7417  15:04:00.274482 read(49,  <unfinished ...>
7417  15:04:00.274558 <... read resumed> "\0\0\0\25", 4) = 4
7417  15:04:00.274670 read(49,  <unfinished ...>
7417  15:04:00.274734 <... read resumed> "(I15\n(I9395\ntp0\ntp1\n.", 21) = 21
7417  15:04:00.274851 epoll_ctl(4, EPOLL_CTL_ADD, 49, {EPOLLIN|EPOLLERR|EPOLLHUP, {u32=49, u64=12923050543437316145}} <unfinished ...>
7417  15:04:00.274970 <... epoll_ctl resumed> ) = -1 EEXIST (File exists)
...
... from here fd 49 is used normally for a while,  have not included all the output ...
...
7417  15:04:52.871646 epoll_wait(4, [{EPOLLIN, {u32=49, u64=12923050543437316145}}, {EPOLLIN, {u32=33, u64=12923050543437316129}}, {EPOLLOUT, {u32=48, u64=12923050543437316144}}, {EPOLLOUT, {u32=8, u64=12923050543437316104}}], 1023, 155) = 4
7417  15:04:52.871888 read(49, "\0\0\0\31", 4) = 4
7417  15:04:52.872079 read(49, "(I4\n(I9400\nI155\ntp0\ntp1\n.", 25) = 25
... here it uses normal poll for a once?
7417  15:04:52.891818 poll([{fd=49, events=POLLERR}], 1, 10 <unfinished ...>
7417  15:04:52.902239 <... poll resumed> ) = 0 (Timeout)
7417  15:04:52.913780 close(49 <unfinished ...>
7417  15:04:52.914020 <... close resumed> ) = 0
... here we create a new pipe for a child process
7417  15:04:53.148195 pipe([49, 50])    = 0
7417  15:04:53.148996 clone(child_stack=NULL, flags=CLONE_CHILD_CLEARTID|CLONE_CHILD_SETTID|SIGCHLD, child_tidptr=0x7f2dee7f79d0) = 9482
... the write end got closed, now we dup it. This is the sentinal fd so is not marked non-blocking
7417  15:04:53.155409 dup(49)           = 50
7417  15:04:53.156030 epoll_ctl(4, EPOLL_CTL_ADD, 50, {EPOLLIN|EPOLLERR|EPOLLHUP, {u32=50, u64=12923050543437316146}}) = 0
... and here it breaks
7417  15:04:53.776974 epoll_wait(4,  <unfinished ...>
7417  15:04:53.785786 <... epoll_wait resumed> [{EPOLLHUP, {u32=49, u64=12923050543437316145}}], 1023, 1878) = 1
7417  15:04:53.785876 --- SIGCHLD {si_signo=SIGCHLD, si_code=CLD_EXITED, si_pid=9481, si_uid=905, si_status=155, si_utime=4, si_stime=3} ---
7417  15:04:53.786073 read(49,  <unfinished ...>
... discontinuity ...
7417  15:09:24.338109 <... read resumed> 0x23f4f10, 4) = ? ERESTARTSYS (To be restarted if SA_RESTART is set)
7417  15:09:24.345507 epoll_ctl(4, EPOLL_CTL_DEL, 49, 0x7ffeecbf7660) = -1 ENOENT (No such file or directory)

Note how epoll() admits that fd 49 is no longer in its fdset, but it returned the number nonetheless.

The way to fix this is to ensure that in the worker every file descriptor is closed that is being monitored by epoll(). Currently workers close the file descriptors they don't want that correspond to themselves, but they don't touch the file descriptors they inherit that relate to other workers. Workarounds are to mark the sentinal fd also as non-blocking, or simply using poll() instead of epoll(). I'm testing the latter locally.

@kleptog
Copy link

kleptog commented Mar 16, 2018

Ok, as it turns out, using poll() doesn't solve the problem. As the actual issue is that a file descriptor is being closed while still registered with the hub. Once you realise that it is easy to figure out exactly where it goes wrong and the following hack fixes this issue for us. Without this we could reproduce the issue within an hour or two. With it we haven't been able to trigger it yet. The hack is not the correct fix, but it really requires someone who knows the code better to write a good fix.

--- /usr/local/lib/python2.7/dist-packages/celery/concurrency/asynpool.py~	2018-03-16 01:10:04.000000000 +0000
+++ /usr/local/lib/python2.7/dist-packages/celery/concurrency/asynpool.py	2018-03-16 12:08:58.058686003 +0000
@@ -735,6 +735,7 @@
             except KeyError:
                 pass
         self.on_inqueue_close = on_inqueue_close
+        self.hub_remove = hub_remove
 
         def schedule_writes(ready_fds, total_write_count=[0]):
             # Schedule write operation to ready file descriptor.
@@ -1241,6 +1242,7 @@
             if queue:
                 for sock in (queue._reader, queue._writer):
                     if not sock.closed:
+                        self.hub_remove(sock)
                         try:
                             sock.close()
                         except (IOError, OSError):

It looks like the write queue fd is properly removed from the hub (a few lines up), but the read queue fd is not.

@Olerdrive
Copy link

No updates here?:(

@ryanpetrello
Copy link
Contributor

@auvipy any thoughts on @kleptog's analysis above at #4185 (comment)?

@auvipy
Copy link
Member

auvipy commented Mar 26, 2018

sorry dont have enough time right now to investigate the issue

@ryanpetrello
Copy link
Contributor

ryanpetrello commented Mar 26, 2018

@kleptog thanks for so much legwork on this.

It seems like on_inqueue_close itself calls hub_remove(fd) - specifically on L744, similar to your example:

def on_inqueue_close(fd, proc):
# Makes sure the fd is removed from tracking when
# the connection is closed, this is essential as fds may be reused.
busy_workers.discard(fd)
try:
if fileno_to_inq[fd] is proc:
fileno_to_inq.pop(fd, None)
active_writes.discard(fd)
all_inqueues.discard(fd)
hub_remove(fd)
except KeyError:

...which is called in destroy_queues() on L1247, but only for queues[0].

removed = 0
try:
self.on_inqueue_close(queues[0]._writer.fileno(), proc)
except IOError:
pass

So is this statement actually accurate?

The hack is not the correct fix, but it really requires someone who knows the code better to write a good fix.

I'm not knowledgeable enough about celery internals to know why the code is currently only doing this for queues[0]. Might be worth a pull request w/ your diff, at least.

@kleptog
Copy link

kleptog commented Mar 30, 2018

IIRC it was indeed the other queue that was being left out, so maybe it's a matter of doing an inqueue_close on the other queue? I won't have the opportunity to test it for a while though.

@ryanpetrello
Copy link
Contributor

ryanpetrello commented Apr 19, 2018

@auvipy I'm sorry to be a nag, and I can sympathize with the often thankless task of doing free work on an open source project 😞 - I work on several in my spare time, and another one for my day job.

Do you know if you or any other celery maintainer has had a chance to take a peek at this one? We've got a notable number of celery users in this thread that have been encountering this debilitating issue since is was reported last summer, and even a few who have pitched in and possibly identified the area of code that's the culprit. Is there any sort of ETA on when this bug might be looked at?

@thedrow
Copy link
Member

thedrow commented Aug 3, 2021

Ok, as it turns out, using poll() doesn't solve the problem. As the actual issue is that a file descriptor is being closed while still registered with the hub. Once you realise that it is easy to figure out exactly where it goes wrong and the following hack fixes this issue for us. Without this we could reproduce the issue within an hour or two. With it we haven't been able to trigger it yet. The hack is not the correct fix, but it really requires someone who knows the code better to write a good fix.

--- /usr/local/lib/python2.7/dist-packages/celery/concurrency/asynpool.py~	2018-03-16 01:10:04.000000000 +0000
+++ /usr/local/lib/python2.7/dist-packages/celery/concurrency/asynpool.py	2018-03-16 12:08:58.058686003 +0000
@@ -735,6 +735,7 @@
             except KeyError:
                 pass
         self.on_inqueue_close = on_inqueue_close
+        self.hub_remove = hub_remove
 
         def schedule_writes(ready_fds, total_write_count=[0]):
             # Schedule write operation to ready file descriptor.
@@ -1241,6 +1242,7 @@
             if queue:
                 for sock in (queue._reader, queue._writer):
                     if not sock.closed:
+                        self.hub_remove(sock)
                         try:
                             sock.close()
                         except (IOError, OSError):

It looks like the write queue fd is properly removed from the hub (a few lines up), but the read queue fd is not.

Hi everyone,

While there is a workaround which fixes the problem, @kleptog suggests it is a hack.
Since we cannot reproduce the issue ourselves, we'd like confirmation that this fix is not a hack but the proper solution.

@vlademy Did you encounter any problems while applying this patch?

@vlademy
Copy link

vlademy commented Aug 3, 2021

This was a separate issue with a library that was using a separate polling mechanism.

@Anton-Shutik
Copy link

Anton-Shutik commented Oct 21, 2022

Hi there, got same issue. Worker was running fine for several days, but then suddenly stopped picking up and executing tasks, which led to queue growth. Restart fixed the problem. Here is what I found

python 3.9.13
celery==4.4.7

env:

CELERY_REDIS_MAX_CONNECTIONS: "5"
CELERY_WORKER_CONCURRENCY: "8"
CELERY_WORKER_MAX_TASKS_PER_CHILD: "20"

Celery starts like this:

ddtrace-run celery -A core worker -l info --without-gossip -Ofair -Q analytics

Few hours after it stopped executing tasks, I found records in the logs like this:
[YYYY-MM-DD HH:MM:SS: INFO/MainProcess] sync with celery@<some worker name>

I didn't see such a records before incident. Interesting, that workers were syncing with different workers, not sure what is the correct behaviour (should it sync with celery beat ?)

@auvipy auvipy modified the milestones: Future, 5.3.x Oct 22, 2022
@meensu23
Copy link

meensu23 commented Jul 11, 2023

I encountered similar scenario in production with celery 5.2.3. I logged issue celery/billiard#389 against billiard. Not sure if this is a problem with celery or billiard. If I kill the child, the task gets executed elsewhere. Strangely master seems to be fine processing other requests even after this stuck task on child. Soft timeout/hard timeout dont trigger - probably the master did not see an ack from child as child is still trying to read from the socket.

@quanqigu
Copy link

The same issue on 5.2.7, I have config the MAX_TASK_PER_CHILD as 100, but we met verify_process_alive exception.

Traceback (most recent call last):\n 
File \"/usr/local/lib/python3.9/site-packages/celery/worker/worker.py\", line 203, in start\n self.blueprint.start(self)\n 
File \"/usr/local/lib/python3.9/site-packages/celery/bootsteps.py\", line 116, in start\n 
step.start(parent)\n 
File \"/usr/local/lib/python3.9/site-packages/celery/bootsteps.py\", line 365, in start\n
 return self.obj.start()\n File \"/usr/local/lib/python3.9/site-packages/celery/worker/consumer/consumer.py\", line 332, in start\n 
blueprint.start(self)\n 
File \"/usr/local/lib/python3.9/site-packages/celery/bootsteps.py\", line 116, in start\n
 step.start(parent)\n File \"/usr/local/lib/python3.9/site-packages/celery/worker/consumer/consumer.py\", line 628, in start\n 
c.loop(*c.loop_args())\n 
File \"/usr/local/lib/python3.9/site-packages/celery/worker/loops.py\", line 97, in asynloop\n 
next(loop)\n 
File \"/usr/local/lib/python3.9/site-packages/kombu/asynchronous/hub.py\", line 301, in create_loop\n 
poll_timeout = fire_timers(propagate=propagate) if scheduled else 1\n 
File \"/usr/local/lib/python3.9/site-packages/kombu/asynchronous/hub.py\", line 143, in fire_timers\n 
entry()\n 
File \"/usr/local/lib/python3.9/site-packages/kombu/asynchronous/timer.py\", line 64, in __call__\n 
return self.fun(*self.args, **self.kwargs)\n 
File \"/usr/local/lib/python3.9/site-packages/celery/concurrency/asynpool.py\", line 617, in verify_process_alive\n 
assert proc.outqR_fd in hub.readers\n
AssertionError"}

@ramksesh
Copy link

Noticing the same issue in 5.2.3. Child stack trace shows its waiting on recv, debugging the code further, I observed the code snippet in Celery's _write_job function, where an exception raised to the main worker process with errno values of EINTR or EAGAIN results in suspension using yield. Could this behavior be a potential cause for random failures in sending jobs from the supervisor to child workers?

@trianglesis
Copy link

trianglesis commented Sep 26, 2023

Same issue, cannot reproduce right now, I thought It was configure issue, when restart just un-hang the worker, but looks like it happening from time to time.

And again.
I'm back from 2018 with the same issue!

software -> celery:5.3.4 (emerald-rush) kombu:5.3.2 py:3.11.2
            billiard:4.1.0 py-amqp:5.1.1
platform -> system:Linux arch:64bit, ELF
            kernel version:5.15.0-104.119.4.2.el9uek.x86_64 imp:CPython
loader   -> celery.loaders.app.AppLoader
settings -> transport:pyamqp results:db+mysql://celery_backend:**@localhost/lobster

broker_url: 'amqp://octo_user:********@localhost:5672/tentacle'
result_backend: 'db+mysql://celery_backend:********@localhost/lobster'
deprecated_settings: None
timezone: 'UTC'
enable_utc: True
accept_content: ['pickle']
task_serializer: 'pickle'
result_serializer: 'pickle'
result_extended: True
task_track_started: True
beat_scheduler: 'django_celery_beat.schedulers:DatabaseScheduler'
database_engine_options:
 'pool_timeout': 90}
worker_prefetch_multiplier: 1
worker_concurrency: 1
worker_timer_precision: 1.0
broker_heartbeat: 10.0
broker_heartbeat_checkrate: 2.0
worker_max_memory_per_child: 102400
worker_max_tasks_per_child: 1000
worker_proc_alive_timeout: 5.0
worker_cancel_long_running_tasks_on_connection_loss: False
worker_pool_restarts: True
worker_enable_remote_control: True
worker_lost_wait: 20
broker_connection_retry_on_startup: True
broker_connection_retry: True
broker_connection_max_retries: 0
broker_connection_timeout: 4.0
broker_channel_error_retry: True
broker_pool_limit: 100
task_send_sent_event: True
worker_send_task_events: True
task_acks_on_failure_or_timeout: True
worker_disable_rate_limits: True
task_default_queue: 'default'
task_default_exchange: 'default'
task_default_routing_key: '********'
task_default_exchange_type: 'direct'
worker_direct: True
task_create_missing_queues: True
task_default_delivery_mode: 'transient'

python gdb:

(gdb) py-bt

(gdb) py-bt
Traceback (most recent call first):
  File "/usr/local/lib/python3.11/threading.py", line 1132, in _wait_for_tstate_lock
    if lock.acquire(block, timeout):
  File "/usr/local/lib/python3.11/threading.py", line 1112, in join
    self._wait_for_tstate_lock()
  File "/var/www/MYPROJECT/djnago_site1/test_executor.py", line 192, in run
    test_th.join()
  File "/var/www/MYPROJECT/CORE_MODULE/helpers/tasks_helpers.py", line 37, in wrapper
    return function(*args, **kwargs)
  File "/var/www/MYPROJECT/djnago_site1/tasks.py", line 94, in t_test_exec_threads
    return test_instance.run(**kwargs)  # Visible kwargs pass to be catched by exception!
  File "/var/www/MYPROJECT/CORE_MODULE/helpers/tasks_helpers.py", line 37, in wrapper
    return function(*args, **kwargs)
  File "/var/www/MYPROJECT/venv/lib/python3.11/site-packages/celery/app/trace.py", line 760, in __protected_call__
    return self.run(*args, **kwargs)
  File "/var/www/MYPROJECT/venv/lib/python3.11/site-packages/celery/app/trace.py", line 477, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/var/www/MYPROJECT/venv/lib/python3.11/site-packages/celery/app/trace.py", line 675, in fast_trace_task
    R, I, T, Rstr = tasks[task].__trace__(
  File "/var/www/MYPROJECT/venv/lib/python3.11/site-packages/billiard/pool.py", line 361, in workloop
    result = (True, prepare_result(fun(*args, **kwargs)))
  File "/var/www/MYPROJECT/venv/lib/python3.11/site-packages/billiard/pool.py", line 291, in __call__
    sys.exit(self.workloop(pid=pid))
  File "/var/www/MYPROJECT/venv/lib/python3.11/site-packages/billiard/process.py", line 110, in run
    self._target(*self._args, **self._kwargs)
  File "/var/www/MYPROJECT/venv/lib/python3.11/site-packages/billiard/process.py", line 323, in _bootstrap
    self.run()
  File "/var/www/MYPROJECT/venv/lib/python3.11/site-packages/billiard/popen_fork.py", line 77, in _launch
    code = process_obj._bootstrap()
  File "/var/www/MYPROJECT/venv/lib/python3.11/site-packages/billiard/popen_fork.py", line 22, in __init__
    self._launch(process_obj)
  File "/var/www/MYPROJECT/venv/lib/python3.11/site-packages/billiard/context.py", line 331, in _Popen
    return Popen(process_obj)
  File "/var/www/MYPROJECT/venv/lib/python3.11/site-packages/billiard/process.py", line 120, in start
    self._popen = self._Popen(self)
  File "/var/www/MYPROJECT/venv/lib/python3.11/site-packages/billiard/pool.py", line 1157, in _create_worker_process
    w.start()
  File "/var/www/MYPROJECT/venv/lib/python3.11/site-packages/celery/concurrency/asynpool.py", line 482, in _create_worker_process
    return super()._create_worker_process(i)
  File "/var/www/MYPROJECT/venv/lib/python3.11/site-packages/billiard/pool.py", line 1327, in _repopulate_pool
    self._create_worker_process(self._avail_index())
  File "/var/www/MYPROJECT/venv/lib/python3.11/site-packages/billiard/pool.py", line 1342, in _maintain_pool
    self._repopulate_pool(joined)
  File "/var/www/MYPROJECT/venv/lib/python3.11/site-packages/billiard/pool.py", line 1350, in maintain_pool
    self._maintain_pool()
  File "/var/www/MYPROJECT/venv/lib/python3.11/site-packages/celery/concurrency/asynpool.py", line 487, in _event_process_exit
    self.maintain_pool()
  File "/var/www/MYPROJECT/venv/lib/python3.11/site-packages/kombu/asynchronous/hub.py", line 373, in create_loop
    cb(*cbargs)
  <built-in method next of module object at remote 0x7fef9273b5f0>
  File "/var/www/MYPROJECT/venv/lib/python3.11/site-packages/celery/worker/loops.py", line 97, in asynloop
    next(loop)
  File "/var/www/MYPROJECT/venv/lib/python3.11/site-packages/celery/worker/consumer/consumer.py", line 742, in start
    c.loop(*c.loop_args())
  File "/var/www/MYPROJECT/venv/lib/python3.11/site-packages/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/var/www/MYPROJECT/venv/lib/python3.11/site-packages/celery/worker/consumer/consumer.py", line 340, in start
    blueprint.start(self)
  File "/var/www/MYPROJECT/venv/lib/python3.11/site-packages/celery/bootsteps.py", line 365, in start
--Type <RET> for more, q to quit, c to continue without paging--c
    return self.obj.start()
  File "/var/www/MYPROJECT/venv/lib/python3.11/site-packages/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/var/www/MYPROJECT/venv/lib/python3.11/site-packages/celery/worker/worker.py", line 202, in start
    self.blueprint.start(self)
  File "/var/www/MYPROJECT/venv/lib/python3.11/site-packages/celery/bin/worker.py", line 356, in worker
    worker.start()
  File "/var/www/MYPROJECT/venv/lib/python3.11/site-packages/celery/bin/base.py", line 134, in caller
    return f(ctx, *args, **kwargs)
  File "/var/www/MYPROJECT/venv/lib/python3.11/site-packages/click/decorators.py", line 33, in new_func
    return f(get_current_context(), *args, **kwargs)
  File "/var/www/MYPROJECT/venv/lib/python3.11/site-packages/click/core.py", line 783, in invoke
    return __callback(*args, **kwargs)
  File "/var/www/MYPROJECT/venv/lib/python3.11/site-packages/click/core.py", line 1434, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/var/www/MYPROJECT/venv/lib/python3.11/site-packages/click/core.py", line 1688, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/var/www/MYPROJECT/venv/lib/python3.11/site-packages/click/core.py", line 1078, in main
    rv = self.invoke(ctx)
  File "/var/www/MYPROJECT/venv/lib/python3.11/site-packages/click/core.py", line 1157, in __call__
    return self.main(*args, **kwargs)
  File "/var/www/MYPROJECT/venv/lib/python3.11/site-packages/celery/bin/celery.py", line 236, in main
    return celery(auto_envvar_prefix="CELERY")
  File "/var/www/MYPROJECT/venv/lib/python3.11/site-packages/celery/__main__.py", line 15, in main
    sys.exit(_main())
  File "/var/www/MYPROJECT/venv/lib/python3.11/site-packages/celery/__main__.py", line 19, in <module>
    main()
  <built-in method exec of module object at remote 0x7fef9273b5f0>
  File "/usr/local/lib/python3.11/runpy.py", line 88, in _run_code
    exec(code, run_globals)
  File "/usr/local/lib/python3.11/runpy.py", line 198, in _run_module_as_main
    return _run_code(code, main_globals, None,

Interestingly the issue occurs right during thread exec:

for test_th in thread_list:
  test_th.join()

Threads overall:

(gdb) info threads
  Id   Target Id                                     Frame
* 1    Thread 0x7fef927f0740 (LWP 1593305) "python3" 0x00007fef9288f3aa in __futex_abstimed_wait_common () from /lib64/libc.so.6
  2    Thread 0x7fef89621640 (LWP 1593306) "python3" 0x00007fef9288f3aa in __futex_abstimed_wait_common () from /lib64/libc.so.6
  3    Thread 0x7fef83fff640 (LWP 1593308) "python3" 0x00007fef929359ef in poll () from /lib64/libc.so.6
  4    Thread 0x7fef837fe640 (LWP 1593309) "python3" 0x00007fef929359ef in poll () from /lib64/libc.so.6
(gdb) py-list
  83                           __cached__ = cached,
  84                           __doc__ = None,
  85                           __loader__ = loader,
  86                           __package__ = pkg_name,
  87                           __spec__ = mod_spec)
 >88        exec(code, run_globals)
  89        return run_globals
  90
  91    def _run_module_code(code, init_globals=None,
  92                        mod_name=None, mod_spec=None,
  93                        pkg_name=None, script_name=None):
(gdb) thread 2
[Switching to thread 2 (Thread 0x7fef89621640 (LWP 1593306))]
#0  0x00007fef9288f3aa in __futex_abstimed_wait_common () from /lib64/libc.so.6
(gdb) py-list
 315            self._waiters.append(waiter)
 316            saved_state = self._release_save()
 317            gotit = False
 318            try:    # restore state no matter what (e.g., KeyboardInterrupt)
 319                if timeout is None:
>320                    waiter.acquire()
 321                    gotit = True
 322                else:
 323                    if timeout > 0:
 324                        gotit = waiter.acquire(True, timeout)
 325                    else:
(gdb) thread 3
[Switching to thread 3 (Thread 0x7fef83fff640 (LWP 1593308))]
#0  0x00007fef929359ef in poll () from /lib64/libc.so.6
(gdb) py-list
 296            while n > 0:
 297                got_timeout = False
 298                if self.handshake_timed_out():
 299                    raise EOFError()
 300                try:
>301                    x = self.__socket.recv(n)
 302                    if len(x) == 0:
 303                        raise EOFError()
 304                    out += x
 305                    n -= len(x)
 306                except socket.timeout:
(gdb) thread 4
[Switching to thread 4 (Thread 0x7fef837fe640 (LWP 1593309))]
#0  0x00007fef929359ef in poll () from /lib64/libc.so.6
(gdb) py-list
 296            while n > 0:
 297                got_timeout = False
 298                if self.handshake_timed_out():
 299                    raise EOFError()
 300                try:
>301                    x = self.__socket.recv(n)
 302                    if len(x) == 0:
 303                        raise EOFError()
 304                    out += x
 305                    n -= len(x)
 306                except socket.timeout:

Or to be honest, in the next thread (which is the 2nd) there is a place, where my code is waiting for the paramiko ssh command to give me some output:

  File "/var/www/lobster/venv/lib/python3.11/site-packages/paramiko/file.py", line 275, in readline
    new_data = self._read(n)
  File "/var/www/lobster/venv/lib/python3.11/site-packages/paramiko/file.py", line 333, in readlines
    line = self.readline()

I'm still not sure about the root cause and just randomly changing the code and removing try's to let the thread fail with anything more verbose.

I cannot get any deeper with gdb since I can't find actual traceback or visible issues.
On the RabbitMQ side, only [email protected] died.

@auvipy
Copy link
Member

auvipy commented Oct 2, 2023

thanks for you detailed investigation.

@IdanHaim
Copy link
Contributor

IdanHaim commented May 30, 2024

On celery 5.2.7 I can see this error again

2024-05-07 07:34:27,747: CRITICAL/MainProcess/796] c: AssertionError()
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/celery/worker/worker.py", line 203, in start
    self.blueprint.start(self)
  File "/usr/local/lib/python3.9/site-packages/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/usr/local/lib/python3.9/site-packages/celery/bootsteps.py", line 365, in start
    return self.obj.start()
  File "/usr/local/lib/python3.9/site-packages/celery/worker/consumer/consumer.py", line 332, in start
    blueprint.start(self)
  File "/usr/local/lib/python3.9/site-packages/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/usr/local/lib/python3.9/site-packages/celery/worker/consumer/consumer.py", line 628, in start
    c.loop(*c.loop_args())
  File "/usr/local/lib/python3.9/site-packages/celery/worker/loops.py", line 97, in asynloop
    next(loop)
  File "/usr/local/lib/python3.9/site-packages/kombu/asynchronous/hub.py", line 301, in create_loop
    poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
  File "/usr/local/lib/python3.9/site-packages/kombu/asynchronous/hub.py", line 143, in fire_timers
    entry()
  File "/usr/local/lib/python3.9/site-packages/kombu/asynchronous/timer.py", line 64, in __call__
    return self.fun(*self.args, **self.kwargs)
  File "/usr/local/lib/python3.9/site-packages/celery/concurrency/asynpool.py", line 617, in verify_process_alive
    assert proc.outqR_fd in hub.readers
AssertionError)

this happened when a worker got a timeout to start in verify_process_alive
when a worker is beginning with the same reader fd as another write fd from a different worker

example:

we have this ForkProcess(ForkPoolWorker-27 started with read fd=76 and write fd=75
on_process_up: started <ForkProcess(ForkPoolWorker-27, started daemon)> with outqR_fd: 76, inqW_fd: 75
and we have this ForkProcess(ForkPoolWorker-7 started with read fd=77 and write fd=76
on_process_up: started <ForkProcess(ForkPoolWorker-7, started daemon)> with outqR_fd: 77, inqW_fd: 76
when worker-7 start we can see celery trying to remove the write fd because he can find it in the inactive
INFO/MainProcess/14019] will try to remove fd=76
when the 27 fails on timeout we can't find the fd in the hub.readers anymore and get  the error

ForkPoolWorker-7 started before ForkPoolWorker-27 and every hub.remove in
_write_job
destroy_queues
schedule_writes
_untrack_child_process
on_pool_start
will remove this fd from the hub because of diff(active_writes) when the fd is not in active anymore
I was able to reproduce this with the suggestion in #4185 (comment)

the on_pool_start happened all the time because of the hub tick but when changing the hub.remove to hub.remove_writer
I still got this error because the other functions I mentioned here

is this assert assert proc.outqR_fd in hub.readers is vital in this version?

recent reproduction:

[2024-05-30 12:46:55,978: INFO/MainProcess/20903] on_process_up: started <ForkProcess(ForkPoolWorker-132, started daemon)> with outqR_fd: 100, inqW_fd: 99, synqW_fd: None, synqR_fd: None
[2024-05-30 12:47:02,573: INFO/MainProcess/20903] on_process_up: started <ForkProcess(ForkPoolWorker-153, started daemon)> with outqR_fd: 99, inqW_fd: 97, synqW_fd: None, synqR_fd: None

[2024-05-30 12:46:25,052: ERROR/MainProcess/20903] called removed with fd=99, caller_name=_write_job
[2024-05-30 12:46:25,136: ERROR/MainProcess/20903] called removed with fd=99, caller_name=iterate_file_descriptors_safely
[2024-05-30 12:46:29,886: ERROR/MainProcess/20903] called removed with fd=99, caller_name=_write_job
[2024-05-30 12:47:02,580: ERROR/MainProcess/20903] called removed with fd=99, caller_name=schedule_writes

[2024-05-30 12:47:06,687: CRITICAL/MainProcess/20903] Unrecoverable error: AssertionError()

by changing the use of hub.remove to hub.remove_writer in the right places I couldn't reproduce the issue anymore
see - #9055 as far as I understood those functions are meant to handle write fds

IdanHaim pushed a commit to IdanHaim/celery that referenced this issue May 30, 2024
- fix main process Unrecoverable error: AssertionError() when read fd is deleted
- see celery#4185 (comment)
IdanHaim pushed a commit to IdanHaim/celery that referenced this issue May 30, 2024
- fix main process Unrecoverable error: AssertionError() when read fd is deleted
- see celery#4185 (comment)
IdanHaim pushed a commit to IdanHaim/celery that referenced this issue May 30, 2024
- fix main process Unrecoverable error: AssertionError() when read fd is deleted
- see celery#4185 (comment)
IdanHaim pushed a commit to IdanHaim/celery that referenced this issue May 30, 2024
- fix main process Unrecoverable error: AssertionError() when read fd is deleted
- see celery#4185 (comment)
IdanHaim pushed a commit to IdanHaim/celery that referenced this issue Jun 9, 2024
- fix main process Unrecoverable error: AssertionError() when read fd is deleted
- see celery#4185 (comment)
tests:
- change hub.remove to hub.remove_writer in test_poll_write_generator and test_poll_write_generator_stopped
- add 2 more tests for schedule_writes to assert only hub.writers is removed when hub.readers have the same fd id
IdanHaim pushed a commit to IdanHaim/celery that referenced this issue Jun 10, 2024
- fix main process Unrecoverable error: AssertionError() when read fd is deleted
- see celery#4185 (comment)
tests:
- change hub.remove to hub.remove_writer in test_poll_write_generator and test_poll_write_generator_stopped
- add 2 more tests for schedule_writes to assert only hub.writers is removed when hub.readers have the same fd id
IdanHaim pushed a commit to IdanHaim/celery that referenced this issue Jun 10, 2024
- fix main process Unrecoverable error: AssertionError() when read fd is deleted
- see celery#4185 (comment)
- tests:
    - change hub.remove to hub.remove_writer in test_poll_write_generator and test_poll_write_generator_stopped
    - add 2 more tests for schedule_writes to assert only hub.writers is removed when hub.readers have the same fd id
IdanHaim pushed a commit to IdanHaim/celery that referenced this issue Jun 14, 2024
- fix main process Unrecoverable error: AssertionError() when read fd is deleted
- see celery#4185 (comment)
- tests:
    - change hub.remove to hub.remove_writer in test_poll_write_generator and test_poll_write_generator_stopped
    - add 2 more tests for schedule_writes to assert only hub.writers is removed when hub.readers have the same fd id
IdanHaim pushed a commit to IdanHaim/celery that referenced this issue Jun 14, 2024
- fix main process Unrecoverable error: AssertionError() when read fd is deleted
- see celery#4185 (comment)
- tests:
    - change hub.remove to hub.remove_writer in test_poll_write_generator and test_poll_write_generator_stopped
    - add 3 more tests for schedule_writes to assert only hub.writers is removed when hub.readers have the same fd id
IdanHaim pushed a commit to IdanHaim/celery that referenced this issue Jun 14, 2024
- fix main process Unrecoverable error: AssertionError() when read fd is deleted
- see celery#4185 (comment)
- tests:
    - change hub.remove to hub.remove_writer in test_poll_write_generator and test_poll_write_generator_stopped
    - add 3 more tests for schedule_writes to assert only hub.writers is removed when hub.readers have the same fd id
@auvipy auvipy modified the milestones: 5.3.x, 5.4.x Jun 15, 2024
auvipy pushed a commit that referenced this issue Jun 16, 2024
)

- fix main process Unrecoverable error: AssertionError() when read fd is deleted
- see #4185 (comment)
- tests:
    - change hub.remove to hub.remove_writer in test_poll_write_generator and test_poll_write_generator_stopped
    - add 3 more tests for schedule_writes to assert only hub.writers is removed when hub.readers have the same fd id

Co-authored-by: Idan Haim Shalom <[email protected]>
Co-authored-by: Tomer Nosrati <[email protected]>
@thedrow
Copy link
Member

thedrow commented Aug 27, 2024

This is now fixed. I'm closing the issue.

@thedrow thedrow closed this as completed Aug 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: Done
Development

No branches or pull requests