Skip to content

Commit

Permalink
Merge pull request kpn#33 from eigenein/fix/threaded-backend-multipro…
Browse files Browse the repository at this point in the history
…cessing

FIX Make threaded backend multiprocess-safe
  • Loading branch information
sergray authored Feb 20, 2018
2 parents 862e16f + d1b005a commit 3031b19
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 4 deletions.
18 changes: 16 additions & 2 deletions tests/test_threaded_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import subprocess
import time
from datetime import datetime
from multiprocessing import Process

import mock
from freezegun import freeze_time
Expand Down Expand Up @@ -60,14 +61,15 @@ def test_empty_queue(self):
self.assertEqual(0, self.backend.fetched_items)

def test_decorator(self):
with freeze_time('2016-08-01 00:00:00'):
now = datetime.now()
with freeze_time(now):
go()
# ensure worker thread catches up
time.sleep(2 * self.backend.bulk_timeout)
mocked_write = self.mocked_backend.bulk_write
self.assertEqual(1, self.backend.fetched_items)
mocked_write.assert_called_with([{
'timestamp': datetime(2016, 8, 1, 0, 0),
'timestamp': now,
'hostname': SHORT_HOSTNAME,
'name': 'tests.conftest.go',
'value': 0.0,
Expand Down Expand Up @@ -136,6 +138,18 @@ def test_worker_error(self):
# assert thread stopped
self.assertTrue(self.backend.thread is None)

def test_producer_in_another_process(self):
# assure worker is stopped
self.stop_worker()

# fill in the queue
process = Process(target=go)
process.start()
process.join()

# check the queue contains the item
self.assertEqual(self.backend._queue.qsize(), 1)


class TestThreaded(object):
def test_calling_thread_waits_for_worker(self):
Expand Down
5 changes: 3 additions & 2 deletions time_execution/backends/threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
import logging
import threading
import time
from multiprocessing import Queue

from time_execution.backends.base import BaseMetricsBackend

try:
from Queue import Queue, Empty, Full
from Queue import Empty, Full
except ImportError:
from queue import Queue, Empty, Full
from queue import Empty, Full


logger = logging.getLogger(__file__)
Expand Down

0 comments on commit 3031b19

Please sign in to comment.