From 528e936f4484f52e86c2eb3242f6f1f08bd5bc8c Mon Sep 17 00:00:00 2001 From: Sidhant Kohli Date: Thu, 7 Mar 2024 13:42:24 -0800 Subject: [PATCH] try/catch Signed-off-by: Sidhant Kohli --- .../reducestreamer/servicer/task_manager.py | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/pynumaflow/reducestreamer/servicer/task_manager.py b/pynumaflow/reducestreamer/servicer/task_manager.py index 62fdb709..a948000a 100644 --- a/pynumaflow/reducestreamer/servicer/task_manager.py +++ b/pynumaflow/reducestreamer/servicer/task_manager.py @@ -197,15 +197,14 @@ async def producer(self, request_iterator: AsyncIterable[reduce_pb2.ReduceReques # Put the exception in the global result queue await self.global_result_queue.put(e) - # send EOF to all the tasks once the request iterator is exhausted - # This will signal the tasks to stop reading the data on their - # respective iterators. - await self.stream_send_eof() - - # get the list of reduce tasks that are currently being processed - res = self.get_tasks() - try: + # send EOF to all the tasks once the request iterator is exhausted + # This will signal the tasks to stop reading the data on their + # respective iterators. + await self.stream_send_eof() + + # get the list of reduce tasks that are currently being processed + res = self.get_tasks() # iterate through the tasks and wait for them to complete for task in res: # Once this is done, we know that the task has written all the results @@ -225,14 +224,14 @@ async def producer(self, request_iterator: AsyncIterable[reduce_pb2.ReduceReques # Send an EOF message to the global result queue # This will signal that window has been processed await self.global_result_queue.put(task.window) + + # Once all tasks are completed, senf EOF the global result queue + await self.global_result_queue.put(STREAM_EOF) except Exception as e: err_msg = "Reduce Streaming Error: %r" % e.__str__() _LOGGER.critical(err_msg, exc_info=True) await self.global_result_queue.put(e) - # Once all tasks are completed, senf EOF the global result queue - await self.global_result_queue.put(STREAM_EOF) - async def consumer( self, input_queue: NonBlockingIterator, output_queue: NonBlockingIterator, window ):