diff --git a/pynumaflow/reducer/servicer/async_servicer.py b/pynumaflow/reducer/servicer/async_servicer.py index 39a75ff0..ceafae78 100644 --- a/pynumaflow/reducer/servicer/async_servicer.py +++ b/pynumaflow/reducer/servicer/async_servicer.py @@ -67,7 +67,8 @@ async def ReduceFn( # Create a task manager instance task_manager = TaskManager(handler=self.__reduce_handler) - # Start iterating through the request iterator + # Start iterating through the request iterator and create tasks + # based on the operation type received. try: async for request in datum_iterator: # check whether the request is an open or append operation @@ -82,13 +83,18 @@ async def ReduceFn( context.set_code(grpc.StatusCode.UNKNOWN) context.set_details(e.__str__()) yield reduce_pb2.ReduceResponse() + raise e # send EOF to all the tasks once the request iterator is exhausted + # This will signal the tasks to stop reading the data on their + # respective iterators. await task_manager.stream_send_eof() # get the results from all the tasks res = task_manager.get_tasks() try: + # iterate through the tasks and yield the response + # once the task is completed. for task in res: fut = task.future await fut @@ -99,6 +105,7 @@ async def ReduceFn( context.set_code(grpc.StatusCode.UNKNOWN) context.set_details(e.__str__()) yield reduce_pb2.ReduceResponse() + raise e async def IsReady( self, request: _empty_pb2.Empty, context: NumaflowServicerContext diff --git a/pynumaflow/reducer/servicer/task_manager.py b/pynumaflow/reducer/servicer/task_manager.py index 608a08bc..901b06fd 100644 --- a/pynumaflow/reducer/servicer/task_manager.py +++ b/pynumaflow/reducer/servicer/task_manager.py @@ -27,30 +27,41 @@ def get_unique_key(keys, window): class TaskManager: + """ + TaskManager is responsible for managing the reduce tasks. + It is created whenever a new reduce operation is requested. + """ + def __init__(self, handler: Union[ReduceAsyncCallable, _ReduceBuilderClass]): + # A dictionary to store the task information self.tasks = {} + # A set to store the background tasks to keep a reference to them self.background_tasks = set() + # Handler for the reduce operation self.__reduce_handler = handler def get_tasks(self): + """ + Returns the list of reduce tasks that are + currently being processed + """ return self.tasks.values() - def build_eof_responses(self): - window_set = set() - # Extract the windows from the tasks - for unified_key in self.tasks.keys(): - window_set.add(self.tasks[unified_key].window) - # Build the response - resps = [] - for window in window_set: - resps.append(reduce_pb2.ReduceResponse(window=window.to_proto(), eof=True)) - return resps - async def stream_send_eof(self): + """ + Sends EOF to input streams of all the reduce + tasks that are currently being processed. + This is called when the input grpc stream is closed. + """ for unified_key in self.tasks: await self.tasks[unified_key].iterator.put(STREAM_EOF) async def create_task(self, req): + """ + Creates a new reduce task for the given request. + Based on the request we compute a unique key, and then + it creates a new task or appends the request to the existing task. + """ # if len of windows in request != 1, raise error if len(req.windows) != 1: raise UDFError("reduce create operation error: invalid number of windows") @@ -60,6 +71,7 @@ async def create_task(self, req): unified_key = get_unique_key(keys, req.windows[0]) result = self.tasks.get(unified_key, None) + # If the task does not exist, create a new task if not result: niter = NonBlockingIterator() riter = niter.read_iterator() @@ -77,6 +89,10 @@ async def create_task(self, req): await result.iterator.put(d) async def append_task(self, req): + """ + Appends the request to the existing window reduce task. + If the task does not exist, create it. + """ if len(req.windows) != 1: raise UDFError("reduce create operation error: invalid number of windows") d = req.payload @@ -91,7 +107,14 @@ async def append_task(self, req): async def __invoke_reduce( self, keys: list[str], request_iterator: AsyncIterable[Datum], window: ReduceWindow ): + """ + Invokes the UDF reduce handler with the given keys, + request iterator, and window. Returns the result of the + reduce operation. + """ new_instance = self.__reduce_handler + + # Convert the window to a datetime object start_dt = datetime.fromtimestamp(int(window.start.ToMilliseconds()) / 1e3, timezone.utc) end_dt = datetime.fromtimestamp(int(window.end.ToMilliseconds()) / 1e3, timezone.utc) interval_window = IntervalWindow(start_dt, end_dt) diff --git a/tests/reduce/test_async_reduce.py b/tests/reduce/test_async_reduce.py index a373a765..0bba57bf 100644 --- a/tests/reduce/test_async_reduce.py +++ b/tests/reduce/test_async_reduce.py @@ -49,7 +49,7 @@ def start_request() -> (Datum, tuple): watermark=watermark_timestamp, ) operation = reduce_pb2.ReduceRequest.WindowOperation( - event=reduce_pb2.ReduceRequest.WindowOperation.Event.APPEND, + event=reduce_pb2.ReduceRequest.WindowOperation.Event.OPEN, windows=[window], ) request = reduce_pb2.ReduceRequest( @@ -145,29 +145,6 @@ def tearDownClass(cls) -> None: except Exception as e: LOGGER.error(e) - def test_reduce_invalid_metadata(self) -> None: - stub = self.__stub() - request, metadata = start_request() - invalid_metadata = {} - try: - generator_response = stub.ReduceFn( - request_iterator=request_generator(count=10, request=request), - metadata=invalid_metadata, - ) - count = 0 - for _ in generator_response: - count += 1 - except grpc.RpcError as e: - self.assertEqual(grpc.StatusCode.INVALID_ARGUMENT, e.code()) - self.assertEqual( - "Expected to have all key/window_start_time/window_end_time;" - " got start: None, end: None.", - e.details(), - ) - except Exception as err: - self.fail("Expected an exception.") - logging.error(err) - def test_reduce(self) -> None: stub = self.__stub() request, metadata = start_request()