Skip to content

Commit

Permalink
try/catch
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
kohlisid committed Mar 7, 2024
1 parent f41f41a commit 528e936
Showing 1 changed file with 10 additions and 11 deletions.
21 changes: 10 additions & 11 deletions pynumaflow/reducestreamer/servicer/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,15 +197,14 @@ async def producer(self, request_iterator: AsyncIterable[reduce_pb2.ReduceReques
# Put the exception in the global result queue
await self.global_result_queue.put(e)

# send EOF to all the tasks once the request iterator is exhausted
# This will signal the tasks to stop reading the data on their
# respective iterators.
await self.stream_send_eof()

# get the list of reduce tasks that are currently being processed
res = self.get_tasks()

try:
# send EOF to all the tasks once the request iterator is exhausted
# This will signal the tasks to stop reading the data on their
# respective iterators.
await self.stream_send_eof()

# get the list of reduce tasks that are currently being processed
res = self.get_tasks()
# iterate through the tasks and wait for them to complete
for task in res:
# Once this is done, we know that the task has written all the results
Expand All @@ -225,14 +224,14 @@ async def producer(self, request_iterator: AsyncIterable[reduce_pb2.ReduceReques
# Send an EOF message to the global result queue
# This will signal that window has been processed
await self.global_result_queue.put(task.window)

# Once all tasks are completed, senf EOF the global result queue
await self.global_result_queue.put(STREAM_EOF)
except Exception as e:
err_msg = "Reduce Streaming Error: %r" % e.__str__()
_LOGGER.critical(err_msg, exc_info=True)
await self.global_result_queue.put(e)

Check warning on line 233 in pynumaflow/reducestreamer/servicer/task_manager.py

View check run for this annotation

Codecov / codecov/patch

pynumaflow/reducestreamer/servicer/task_manager.py#L230-L233

Added lines #L230 - L233 were not covered by tests

# Once all tasks are completed, senf EOF the global result queue
await self.global_result_queue.put(STREAM_EOF)

async def consumer(
self, input_queue: NonBlockingIterator, output_queue: NonBlockingIterator, window
):
Expand Down

0 comments on commit 528e936

Please sign in to comment.