diff --git a/celery/concurrency/asynpool.py b/celery/concurrency/asynpool.py index e1912b05b7a..7f51307c6c4 100644 --- a/celery/concurrency/asynpool.py +++ b/celery/concurrency/asynpool.py @@ -772,7 +772,7 @@ def on_poll_start(): None, WRITE | ERR, consolidate=True) else: iterate_file_descriptors_safely( - inactive, all_inqueues, hub_remove) + inactive, all_inqueues, hub.remove_writer) self.on_poll_start = on_poll_start def on_inqueue_close(fd, proc): @@ -818,7 +818,7 @@ def schedule_writes(ready_fds, total_write_count=None): # worker is already busy with another task continue if ready_fd not in all_inqueues: - hub_remove(ready_fd) + hub.remove_writer(ready_fd) continue try: job = pop_message() @@ -829,7 +829,7 @@ def schedule_writes(ready_fds, total_write_count=None): # this may create a spinloop where the event loop # always wakes up. for inqfd in diff(active_writes): - hub_remove(inqfd) + hub.remove_writer(inqfd) break else: @@ -927,7 +927,7 @@ def _write_job(proc, fd, job): else: errors = 0 finally: - hub_remove(fd) + hub.remove_writer(fd) write_stats[proc.index] += 1 # message written, so this fd is now available active_writes.discard(fd) diff --git a/t/unit/concurrency/test_prefork.py b/t/unit/concurrency/test_prefork.py index 7690ef09a40..570d00eacb0 100644 --- a/t/unit/concurrency/test_prefork.py +++ b/t/unit/concurrency/test_prefork.py @@ -5,6 +5,7 @@ from unittest.mock import Mock, patch import pytest +from kombu.asynchronous import Hub import t.skip from celery.app.defaults import DEFAULTS @@ -354,6 +355,58 @@ def _fake_hub(*args, **kwargs): # Then: all items were removed from the managed data source assert fd_iter == {}, "Expected all items removed from managed dict" + def test_schedule_writes_hub_remove_writer_ready_fd_not_in_all_inqueues(self): + pool = asynpool.AsynPool(threads=False) + hub = Hub() + + writer = Mock(name='writer') + reader = Mock(name='reader') + + # add 2 fake fds with the same id + hub.add_reader(6, reader, 6) + hub.add_writer(6, writer, 6) + pool._all_inqueues.clear() + pool._create_write_handlers(hub) + + # check schedule_writes write fds remove not remove the reader one from the hub. + hub.consolidate_callback(ready_fds=[6]) + assert 6 in hub.readers + assert 6 not in hub.writers + + def test_schedule_writes_hub_remove_writers_from_active_writers_when_get_index_error(self): + pool = asynpool.AsynPool(threads=False) + hub = Hub() + + writer = Mock(name='writer') + reader = Mock(name='reader') + + # add 2 fake fds with the same id + hub.add_reader(6, reader, 6) + hub.add_reader(8, reader, 8) + hub.add_reader(9, reader, 9) + hub.add_writer(6, writer, 6) + hub.add_writer(8, writer, 8) + hub.add_writer(9, writer, 9) + + # add fake fd to pool _all_inqueues to make sure we try to read from outbound_buffer + # set active_writes to 6 to make sure we remove all write fds except 6 + pool._active_writes = set([6]) + pool._all_inqueues = set([2, 6, 8, 9]) + + pool._create_write_handlers(hub) + + # clear outbound_buffer to get IndexError when trying to pop any message + # in this case all active_writers fds will be removed from the hub + pool.outbound_buffer.clear() + + hub.consolidate_callback(ready_fds=[2]) + if {6, 8, 9} <= hub.readers.keys() and not {8, 9} <= hub.writers.keys(): + assert True + else: + assert False + + assert 6 in hub.writers + def test_register_with_event_loop__no_on_tick_dupes(self): """Ensure AsynPool's register_with_event_loop only registers on_poll_start in the event loop the first time it's called. This diff --git a/t/unit/worker/test_loops.py b/t/unit/worker/test_loops.py index 68e84562b4c..754a3a119c7 100644 --- a/t/unit/worker/test_loops.py +++ b/t/unit/worker/test_loops.py @@ -363,7 +363,7 @@ def test_poll_err_writable(self): def test_poll_write_generator(self): x = X(self.app) - x.hub.remove = Mock(name='hub.remove()') + x.hub.remove_writer = Mock(name='hub.remove_writer()') def Gen(): yield 1 @@ -376,7 +376,7 @@ def Gen(): with pytest.raises(socket.error): asynloop(*x.args) assert gen.gi_frame.f_lasti != -1 - x.hub.remove.assert_not_called() + x.hub.remove_writer.assert_not_called() def test_poll_write_generator_stopped(self): x = X(self.app) @@ -388,7 +388,7 @@ def Gen(): x.hub.add_writer(6, gen) x.hub.on_tick.add(x.close_then_error(Mock(name='tick'), 2)) x.hub.poller.poll.return_value = [(6, WRITE)] - x.hub.remove = Mock(name='hub.remove()') + x.hub.remove_writer = Mock(name='hub.remove_writer()') with pytest.raises(socket.error): asynloop(*x.args) assert gen.gi_frame is None