From 79881c744f054202170e93348db8b28e38562144 Mon Sep 17 00:00:00 2001 From: puntonim Date: Tue, 15 Jan 2019 14:39:37 +0100 Subject: [PATCH] NEW Add lazy_init optional kwarg The lazy_init kwarg can be used to avoid a bug that leads to no metrics being sent by the ThreadedBackend when used in a Celery project with --max-memory-per-child param. See: https://github.com/kpn-digital/py-timeexecution/issues/37 --- tests/test_threaded_backend.py | 18 ++++++++++++++++++ time_execution/backends/threaded.py | 19 +++++++++++++++---- 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/tests/test_threaded_backend.py b/tests/test_threaded_backend.py index 33ab983..90a9078 100644 --- a/tests/test_threaded_backend.py +++ b/tests/test_threaded_backend.py @@ -17,6 +17,7 @@ class TestTimeExecution(TestBaseBackend): + LAZY_INIT = False def setUp(self): self.qsize = 10 @@ -31,6 +32,7 @@ def setUp(self): backend_kwargs=dict(key1='kwarg1', key2='kwarg2'), queue_maxsize=self.qsize, queue_timeout=self.qtimeout, + lazy_init=self.LAZY_INIT, ) self.backend.bulk_size = self.qsize / 2 self.backend.bulk_timeout = self.qtimeout * 2 @@ -49,6 +51,8 @@ def resume_worker(self, worker_limit=None, **kwargs): self.backend.start_worker() def test_thread_name(self): + if self.backend.lazy_init: + go() self.assertEquals(self.backend.thread.name, "TimeExecutionThread") def test_backend_args(self): @@ -130,6 +134,8 @@ def test_worker_sends_remainder(self): ) def test_worker_error(self): + if self.backend.lazy_init: + go() self.assertFalse(self.backend.thread is None) # simulate TypeError in queue.get with mock.patch.object(self.backend._queue, 'get', side_effect=TypeError): @@ -139,6 +145,8 @@ def test_worker_error(self): self.assertTrue(self.backend.thread is None) def test_producer_in_another_process(self): + if self.backend.lazy_init: + go() # assure worker is stopped self.stop_worker() @@ -151,6 +159,10 @@ def test_producer_in_another_process(self): self.assertEqual(self.backend._queue.qsize(), 1) +class TestTimeExecutionLazy(TestTimeExecution): + LAZY_INIT = True + + class TestThreaded(object): def test_calling_thread_waits_for_worker(self): """ @@ -166,6 +178,7 @@ def test_calling_thread_waits_for_worker(self): class TestElastic(TestBaseBackend, ElasticTestMixin): + LAZY_INIT = False def setUp(self): self.qtime = 0.1 @@ -174,6 +187,7 @@ def setUp(self): backend_args=('elasticsearch', ), backend_kwargs=dict(index='threaded-metrics'), queue_timeout=self.qtime, + lazy_init=self.LAZY_INIT, ) settings.configure(backends=[self.backend]) self._clear(self.backend.backend) @@ -183,3 +197,7 @@ def test_write_method(self): time.sleep(2 * self.backend.bulk_timeout) metrics = self._query_backend(self.backend.backend, go.fqn) self.assertEqual(metrics['hits']['total'], 1) + + +class TestElasticLazy(TestBaseBackend, ElasticTestMixin): + LAZY_INIT = True diff --git a/time_execution/backends/threaded.py b/time_execution/backends/threaded.py index f3c92aa..03e72b1 100644 --- a/time_execution/backends/threaded.py +++ b/time_execution/backends/threaded.py @@ -20,12 +20,13 @@ class ThreadedBackend(BaseMetricsBackend): def __init__(self, backend, backend_args=None, backend_kwargs=None, - queue_maxsize=1000, queue_timeout=0.5, worker_limit=None): + queue_maxsize=1000, queue_timeout=0.5, worker_limit=None, + lazy_init=False): if backend_args is None: backend_args = tuple() if backend_kwargs is None: backend_kwargs = dict() - self.parent_thread = threading.current_thread() + self.parent_thread = None self.queue_timeout = queue_timeout self.worker_limit = worker_limit self.thread = None @@ -33,10 +34,16 @@ def __init__(self, backend, backend_args=None, backend_kwargs=None, self.bulk_size = 50 self.bulk_timeout = 1 # second self.backend = backend(*backend_args, **backend_kwargs) - self._queue = Queue(maxsize=queue_maxsize) - self.start_worker() + self._queue = None + self.queue_maxsize = queue_maxsize + self.lazy_init = lazy_init + if not lazy_init: + self.start_worker() def write(self, name, **data): + if self.lazy_init and not self.thread: + self.start_worker() + data["timestamp"] = datetime.datetime.utcnow() try: self._queue.put_nowait((name, data)) @@ -46,6 +53,10 @@ def write(self, name, **data): def start_worker(self): if self.thread: return + + self.parent_thread = threading.current_thread() + if not self._queue: + self._queue = Queue(maxsize=self.queue_maxsize) self.fetched_items = 0 self.thread = threading.Thread( target=self.worker,