Skip to content

Commit

Permalink
comments
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
kohlisid committed Mar 5, 2024
1 parent 1748753 commit 4ecbb7e
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 36 deletions.
9 changes: 8 additions & 1 deletion pynumaflow/reducer/servicer/async_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ async def ReduceFn(
# Create a task manager instance
task_manager = TaskManager(handler=self.__reduce_handler)

# Start iterating through the request iterator
# Start iterating through the request iterator and create tasks
# based on the operation type received.
try:
async for request in datum_iterator:
# check whether the request is an open or append operation
Expand All @@ -82,13 +83,18 @@ async def ReduceFn(
context.set_code(grpc.StatusCode.UNKNOWN)
context.set_details(e.__str__())
yield reduce_pb2.ReduceResponse()
raise e

# send EOF to all the tasks once the request iterator is exhausted
# This will signal the tasks to stop reading the data on their
# respective iterators.
await task_manager.stream_send_eof()

# get the results from all the tasks
res = task_manager.get_tasks()
try:
# iterate through the tasks and yield the response
# once the task is completed.
for task in res:
fut = task.future
await fut
Expand All @@ -99,6 +105,7 @@ async def ReduceFn(
context.set_code(grpc.StatusCode.UNKNOWN)
context.set_details(e.__str__())
yield reduce_pb2.ReduceResponse()
raise e

async def IsReady(
self, request: _empty_pb2.Empty, context: NumaflowServicerContext
Expand Down
45 changes: 34 additions & 11 deletions pynumaflow/reducer/servicer/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,41 @@ def get_unique_key(keys, window):


class TaskManager:
"""
TaskManager is responsible for managing the reduce tasks.
It is created whenever a new reduce operation is requested.
"""

def __init__(self, handler: Union[ReduceAsyncCallable, _ReduceBuilderClass]):
# A dictionary to store the task information
self.tasks = {}
# A set to store the background tasks to keep a reference to them
self.background_tasks = set()
# Handler for the reduce operation
self.__reduce_handler = handler

def get_tasks(self):
"""
Returns the list of reduce tasks that are
currently being processed
"""
return self.tasks.values()

def build_eof_responses(self):
window_set = set()
# Extract the windows from the tasks
for unified_key in self.tasks.keys():
window_set.add(self.tasks[unified_key].window)
# Build the response
resps = []
for window in window_set:
resps.append(reduce_pb2.ReduceResponse(window=window.to_proto(), eof=True))
return resps

async def stream_send_eof(self):
"""
Sends EOF to input streams of all the reduce
tasks that are currently being processed.
This is called when the input grpc stream is closed.
"""
for unified_key in self.tasks:
await self.tasks[unified_key].iterator.put(STREAM_EOF)

async def create_task(self, req):
"""
Creates a new reduce task for the given request.
Based on the request we compute a unique key, and then
it creates a new task or appends the request to the existing task.
"""
# if len of windows in request != 1, raise error
if len(req.windows) != 1:
raise UDFError("reduce create operation error: invalid number of windows")

Check warning on line 67 in pynumaflow/reducer/servicer/task_manager.py

View check run for this annotation

Codecov / codecov/patch

pynumaflow/reducer/servicer/task_manager.py#L67

Added line #L67 was not covered by tests
Expand All @@ -60,6 +71,7 @@ async def create_task(self, req):
unified_key = get_unique_key(keys, req.windows[0])
result = self.tasks.get(unified_key, None)

# If the task does not exist, create a new task
if not result:
niter = NonBlockingIterator()
riter = niter.read_iterator()
Expand All @@ -77,6 +89,10 @@ async def create_task(self, req):
await result.iterator.put(d)

async def append_task(self, req):
"""
Appends the request to the existing window reduce task.
If the task does not exist, create it.
"""
if len(req.windows) != 1:
raise UDFError("reduce create operation error: invalid number of windows")
d = req.payload
Expand All @@ -91,7 +107,14 @@ async def append_task(self, req):
async def __invoke_reduce(
self, keys: list[str], request_iterator: AsyncIterable[Datum], window: ReduceWindow
):
"""
Invokes the UDF reduce handler with the given keys,
request iterator, and window. Returns the result of the
reduce operation.
"""
new_instance = self.__reduce_handler

# Convert the window to a datetime object
start_dt = datetime.fromtimestamp(int(window.start.ToMilliseconds()) / 1e3, timezone.utc)
end_dt = datetime.fromtimestamp(int(window.end.ToMilliseconds()) / 1e3, timezone.utc)
interval_window = IntervalWindow(start_dt, end_dt)
Expand Down
25 changes: 1 addition & 24 deletions tests/reduce/test_async_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def start_request() -> (Datum, tuple):
watermark=watermark_timestamp,
)
operation = reduce_pb2.ReduceRequest.WindowOperation(
event=reduce_pb2.ReduceRequest.WindowOperation.Event.APPEND,
event=reduce_pb2.ReduceRequest.WindowOperation.Event.OPEN,
windows=[window],
)
request = reduce_pb2.ReduceRequest(
Expand Down Expand Up @@ -145,29 +145,6 @@ def tearDownClass(cls) -> None:
except Exception as e:
LOGGER.error(e)

def test_reduce_invalid_metadata(self) -> None:
stub = self.__stub()
request, metadata = start_request()
invalid_metadata = {}
try:
generator_response = stub.ReduceFn(
request_iterator=request_generator(count=10, request=request),
metadata=invalid_metadata,
)
count = 0
for _ in generator_response:
count += 1
except grpc.RpcError as e:
self.assertEqual(grpc.StatusCode.INVALID_ARGUMENT, e.code())
self.assertEqual(
"Expected to have all key/window_start_time/window_end_time;"
" got start: None, end: None.",
e.details(),
)
except Exception as err:
self.fail("Expected an exception.")
logging.error(err)

def test_reduce(self) -> None:
stub = self.__stub()
request, metadata = start_request()
Expand Down

0 comments on commit 4ecbb7e

Please sign in to comment.