You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
[2022-01-05 01:20:00,086: ERROR/ForkPoolWorker-21] Thread 'ResultHandler' crashed: ValueError('invalid file descriptor 20')
Traceback (most recent call last):
File "/var/www/backend/venv/lib/python3.8/site-packages/billiard/pool.py", line 504, in run
return self.body()
File "/var/www/backend/venv/lib/python3.8/site-packages/billiard/pool.py", line 899, in body
for _ in self._process_result(1.0): # blocking
File "/var/www/backend/venv/lib/python3.8/site-packages/billiard/pool.py", line 864, in _process_result
ready, task = poll(timeout)
File "/var/www/backend/venv/lib/python3.8/site-packages/billiard/pool.py", line 1370, in _poll_result
if self._outqueue._reader.poll(timeout):
File "/var/www/backend/venv/lib/python3.8/site-packages/billiard/connection.py", line 292, in poll
return self._poll(timeout)
File "/var/www/backend/venv/lib/python3.8/site-packages/billiard/connection.py", line 470, in _poll
r = wait([self], timeout)
File "/var/www/backend/venv/lib/python3.8/site-packages/billiard/connection.py", line 1003, in wait
return _poll(object_list, timeout)
File "/var/www/backend/venv/lib/python3.8/site-packages/billiard/connection.py", line 983, in _poll
raise ValueError('invalid file descriptor %i' % fd)
ValueError: invalid file descriptor 20
[2022-01-05 01:20:00,550: ERROR/MainProcess] Process 'ForkPoolWorker-21' pid:155045 exited with 'exitcode 1'
[2022-01-05 01:20:10,978: ERROR/MainProcess] Task handler raised error: WorkerLostError('Worker exited prematurely: exitcode 1 Job: 74.')
Traceback (most recent call last):
File "/var/www/backend/venv/lib/python3.8/site-packages/billiard/pool.py", line 1265, in mark_as_worker_lost
raise WorkerLostError(
billiard.exceptions.WorkerLostError: Worker exited prematurely: exitcode 1 Job: 74.
How to reproduce
Use above configs to start celery. (gevent.monkey.patch_all() must be used)
Use kill <pid> to kill some worker processes. Main process will create new worker processes to replace the killed worker processes.
execute xxx_task.delay() in python shell to send some tasks.
once any newly created worker processes receive a task, it will trigger invalid file descriptor error and crash.
I’m not sure if the above steps will definitely reproduce the issue, but I’ve reproduced it several times.
Analysis
ResultHandler is used by main process to read result of worker processes. It should be running at main process, but worker processes executed it.
billiard/pool.py:
classPool(object):
def__init__(self, ...):
# ...self._setup_queues()
# ...foriinrange(self._processes):
self._create_worker_process(i)
# ...# Thread processing results in the outqueue.self._result_handler=self.create_result_handler()
self.handle_result_event=self._result_handler.handle_eventifthreads:
self._result_handler.start()
# ...defcreate_result_handler(self, **extra_kwargs):
returnself.ResultHandler(
self._outqueue, self._quick_get, self._cache,
self._poll_result, self._join_exited_workers,
self._putlock, self.restart_state, self.check_timeouts,
self.on_job_ready, on_ready_counters=self._on_ready_counters,
**extra_kwargs
)
# ...def_setup_queues(self):
self._inqueue=self._ctx.SimpleQueue()
self._outqueue=self._ctx.SimpleQueue()
self._quick_put=self._inqueue._writer.sendself._quick_get=self._outqueue._reader.recvdef_poll_result(timeout):
ifself._outqueue._reader.poll(timeout):
returnTrue, self._quick_get()
returnFalse, Noneself._poll_result=_poll_result# ...defget_process_queues(self):
returnself._inqueue, self._outqueue, Nonedef_create_worker_process(self, i):
sentinel=self._ctx.Event() ifself.allow_restartelseNoneinq, outq, synq=self.get_process_queues()
on_ready_counter=self._ctx.Value('i')
w=self.WorkerProcess(self.Worker(
inq, outq, synq, self._initializer, self._initargs,
self._maxtasksperchild, sentinel, self._on_process_exit,
# Need to handle all signals if using the ipc semaphore,# to make sure the semaphore is released.sigprotection=self.threads,
wrap_exception=self._wrap_exception,
max_memory_per_child=self._max_memory_per_child,
on_ready_counter=on_ready_counter,
))
self._pool.append(w)
# ...# ...classWorker(object):
defafter_fork(self):
ifhasattr(self.inq, '_writer'):
self.inq._writer.close()
ifhasattr(self.outq, '_reader'):
self.outq._reader.close()
# ...
As above code shows, on startup, main process creates _outqueue and fork to create worker processes. After fork, worker processes close _outqueue._reader.
Then main process starts a thread to run ResultHandler. ResultHandler executes _poll_result of _outqueue._reader.
If a worker process is crashed, main process will fork a new worker process to replace it.
As fork(2) - Linux man page says, after fork, child process will only keep the current thread, and other threads disappear.
So there are no ResultHandler threads in newly worker processes.
The child process is created with a single thread--the one that called fork(). The entire virtual address space of the parent is replicated in the child, including the states of mutexes, condition variables, and other pthreads objects; the use of pthread_atfork(3) may be helpful for dealing with problems that this can cause.
When use with gevent.monkey.patch_all(), thread will be replaced with coroutine (it should be, not sure). ResultHandler is not runing in a new thread.
Fork will copy _poll_result calls. Worker process is running _poll_result of _outqueue._reader, and close _outqueue._reader.
Reading a closed file descriptor is invalid, _poll_result raises an invalid file descriptor exception.
Thanks for digging this out. I had the same issue on celery==4.1.1
This was breaking my --pool=prefork workers when I used --max-tasks-per-child. When I removed gevent monkey patching from my celery entry point, the prefork workers restarted correctly.
Environment
os: Ubuntu 20.04 LTS
broker: redis 4.0.9
python: 3.8
celery: 5.1.2
billiard: 3.6.4.0
gevent: 21.1.2
Celery startup config
supervisor
run_celery.py
Log
How to reproduce
gevent.monkey.patch_all()
must be used)kill <pid>
to kill some worker processes. Main process will create new worker processes to replace the killed worker processes.xxx_task.delay()
in python shell to send some tasks.invalid file descriptor
error and crash.I’m not sure if the above steps will definitely reproduce the issue, but I’ve reproduced it several times.
Analysis
ResultHandler
is used by main process to read result of worker processes. It should be running at main process, but worker processes executed it.billiard/pool.py
:As above code shows, on startup, main process creates
_outqueue
and fork to create worker processes. After fork, worker processes close_outqueue._reader
.Then main process starts a thread to run
ResultHandler
.ResultHandler
executes_poll_result
of_outqueue._reader
.If a worker process is crashed, main process will fork a new worker process to replace it.
As fork(2) - Linux man page says, after fork, child process will only keep the current thread, and other threads disappear.
So there are no
ResultHandler
threads in newly worker processes.When use with
gevent.monkey.patch_all()
, thread will be replaced with coroutine (it should be, not sure).ResultHandler
is not runing in a new thread.Fork will copy
_poll_result
calls. Worker process is running_poll_result
of_outqueue._reader
, and close_outqueue._reader
.Reading a closed file descriptor is invalid,
_poll_result
raises aninvalid file descriptor
exception.billiard/connection.py
:The text was updated successfully, but these errors were encountered: