Skip to content

Commit

Permalink
NEW Add lazy_init optional kwarg
Browse files Browse the repository at this point in the history
The lazy_init kwarg can be used to avoid a bug that leads to no metrics
being sent by the ThreadedBackend when used in a Celery project with
--max-memory-per-child param.
See: kpn#37
  • Loading branch information
puntonim committed Jan 15, 2019
1 parent 993b046 commit 79881c7
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 4 deletions.
18 changes: 18 additions & 0 deletions tests/test_threaded_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@


class TestTimeExecution(TestBaseBackend):
LAZY_INIT = False

def setUp(self):
self.qsize = 10
Expand All @@ -31,6 +32,7 @@ def setUp(self):
backend_kwargs=dict(key1='kwarg1', key2='kwarg2'),
queue_maxsize=self.qsize,
queue_timeout=self.qtimeout,
lazy_init=self.LAZY_INIT,
)
self.backend.bulk_size = self.qsize / 2
self.backend.bulk_timeout = self.qtimeout * 2
Expand All @@ -49,6 +51,8 @@ def resume_worker(self, worker_limit=None, **kwargs):
self.backend.start_worker()

def test_thread_name(self):
if self.backend.lazy_init:
go()
self.assertEquals(self.backend.thread.name, "TimeExecutionThread")

def test_backend_args(self):
Expand Down Expand Up @@ -130,6 +134,8 @@ def test_worker_sends_remainder(self):
)

def test_worker_error(self):
if self.backend.lazy_init:
go()
self.assertFalse(self.backend.thread is None)
# simulate TypeError in queue.get
with mock.patch.object(self.backend._queue, 'get', side_effect=TypeError):
Expand All @@ -139,6 +145,8 @@ def test_worker_error(self):
self.assertTrue(self.backend.thread is None)

def test_producer_in_another_process(self):
if self.backend.lazy_init:
go()
# assure worker is stopped
self.stop_worker()

Expand All @@ -151,6 +159,10 @@ def test_producer_in_another_process(self):
self.assertEqual(self.backend._queue.qsize(), 1)


class TestTimeExecutionLazy(TestTimeExecution):
LAZY_INIT = True


class TestThreaded(object):
def test_calling_thread_waits_for_worker(self):
"""
Expand All @@ -166,6 +178,7 @@ def test_calling_thread_waits_for_worker(self):


class TestElastic(TestBaseBackend, ElasticTestMixin):
LAZY_INIT = False

def setUp(self):
self.qtime = 0.1
Expand All @@ -174,6 +187,7 @@ def setUp(self):
backend_args=('elasticsearch', ),
backend_kwargs=dict(index='threaded-metrics'),
queue_timeout=self.qtime,
lazy_init=self.LAZY_INIT,
)
settings.configure(backends=[self.backend])
self._clear(self.backend.backend)
Expand All @@ -183,3 +197,7 @@ def test_write_method(self):
time.sleep(2 * self.backend.bulk_timeout)
metrics = self._query_backend(self.backend.backend, go.fqn)
self.assertEqual(metrics['hits']['total'], 1)


class TestElasticLazy(TestBaseBackend, ElasticTestMixin):
LAZY_INIT = True
19 changes: 15 additions & 4 deletions time_execution/backends/threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,30 @@
class ThreadedBackend(BaseMetricsBackend):

def __init__(self, backend, backend_args=None, backend_kwargs=None,
queue_maxsize=1000, queue_timeout=0.5, worker_limit=None):
queue_maxsize=1000, queue_timeout=0.5, worker_limit=None,
lazy_init=False):
if backend_args is None:
backend_args = tuple()
if backend_kwargs is None:
backend_kwargs = dict()
self.parent_thread = threading.current_thread()
self.parent_thread = None
self.queue_timeout = queue_timeout
self.worker_limit = worker_limit
self.thread = None
self.fetched_items = 0
self.bulk_size = 50
self.bulk_timeout = 1 # second
self.backend = backend(*backend_args, **backend_kwargs)
self._queue = Queue(maxsize=queue_maxsize)
self.start_worker()
self._queue = None
self.queue_maxsize = queue_maxsize
self.lazy_init = lazy_init
if not lazy_init:
self.start_worker()

def write(self, name, **data):
if self.lazy_init and not self.thread:
self.start_worker()

data["timestamp"] = datetime.datetime.utcnow()
try:
self._queue.put_nowait((name, data))
Expand All @@ -46,6 +53,10 @@ def write(self, name, **data):
def start_worker(self):
if self.thread:
return

self.parent_thread = threading.current_thread()
if not self._queue:
self._queue = Queue(maxsize=self.queue_maxsize)
self.fetched_items = 0
self.thread = threading.Thread(
target=self.worker,
Expand Down

0 comments on commit 79881c7

Please sign in to comment.