From d1b005a337fb6705aa804da6c26b7d9d477b62fc Mon Sep 17 00:00:00 2001 From: Pavel Perestoronin Date: Mon, 19 Feb 2018 21:29:55 +0100 Subject: [PATCH] FIX Make threaded backend multiprocess-safe --- tests/test_threaded_backend.py | 18 ++++++++++++++++-- time_execution/backends/threaded.py | 5 +++-- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/tests/test_threaded_backend.py b/tests/test_threaded_backend.py index a39ab4b..33ab983 100644 --- a/tests/test_threaded_backend.py +++ b/tests/test_threaded_backend.py @@ -2,6 +2,7 @@ import subprocess import time from datetime import datetime +from multiprocessing import Process import mock from freezegun import freeze_time @@ -60,14 +61,15 @@ def test_empty_queue(self): self.assertEqual(0, self.backend.fetched_items) def test_decorator(self): - with freeze_time('2016-08-01 00:00:00'): + now = datetime.now() + with freeze_time(now): go() # ensure worker thread catches up time.sleep(2 * self.backend.bulk_timeout) mocked_write = self.mocked_backend.bulk_write self.assertEqual(1, self.backend.fetched_items) mocked_write.assert_called_with([{ - 'timestamp': datetime(2016, 8, 1, 0, 0), + 'timestamp': now, 'hostname': SHORT_HOSTNAME, 'name': 'tests.conftest.go', 'value': 0.0, @@ -136,6 +138,18 @@ def test_worker_error(self): # assert thread stopped self.assertTrue(self.backend.thread is None) + def test_producer_in_another_process(self): + # assure worker is stopped + self.stop_worker() + + # fill in the queue + process = Process(target=go) + process.start() + process.join() + + # check the queue contains the item + self.assertEqual(self.backend._queue.qsize(), 1) + class TestThreaded(object): def test_calling_thread_waits_for_worker(self): diff --git a/time_execution/backends/threaded.py b/time_execution/backends/threaded.py index ce7dd5f..f3c92aa 100644 --- a/time_execution/backends/threaded.py +++ b/time_execution/backends/threaded.py @@ -4,13 +4,14 @@ import logging import threading import time +from multiprocessing import Queue from time_execution.backends.base import BaseMetricsBackend try: - from Queue import Queue, Empty, Full + from Queue import Empty, Full except ImportError: - from queue import Queue, Empty, Full + from queue import Empty, Full logger = logging.getLogger(__file__)