Skip to content

Commit

Permalink
comments
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
kohlisid committed Mar 8, 2024
1 parent 004c69c commit d2f8e01
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 20 deletions.
13 changes: 5 additions & 8 deletions pynumaflow/reducestreamer/servicer/async_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ async def ReduceFn(
Applies a reduce function to a datum stream.
The pascal case function name comes from the proto reduce_pb2_grpc.py file.
"""
# Create an async iterator from the request iterator
datum_iterator = datum_generator(request_iterator=request_iterator)

# Create a task manager instance
task_manager = TaskManager(handler=self.__reduce_handler)

Expand All @@ -85,6 +82,9 @@ async def ReduceFn(
# We will read from the result queue and send the results to the client
consumer = task_manager.global_result_queue.read_iterator()

# Create an async iterator from the request iterator
datum_iterator = datum_generator(request_iterator=request_iterator)

# Create a producer task in the task manager, this would read from the datum iterator
# and then create the required tasks to process the data requests
# The results from these tasks are then sent to the result queue
Expand Down Expand Up @@ -112,11 +112,8 @@ async def ReduceFn(
if isinstance(msg, Exception):
await handle_error(context, msg)
raise msg
# If the message is a window, we send an EOF message to the
# client for the given window
elif isinstance(msg, reduce_pb2.Window):
yield reduce_pb2.ReduceResponse(window=msg, EOF=True)
# Else we send the result of the reduce function to the client
# Send window EOF response or Window result response
# back to the client
else:
yield msg
except Exception as e:
Expand Down
29 changes: 17 additions & 12 deletions pynumaflow/reducestreamer/servicer/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,17 @@
def build_unique_key_name(keys, window):
"""
Builds a unique key name for the given keys and window.
The key name is used to identify the reduce task.
The key name is used to identify the Reduce task.
The format is: start_time:end_time:key1:key2:...
"""
return f"{window.start.ToMilliseconds()}:{window.end.ToMilliseconds()}:{DELIMITER.join(keys)}"


def create_window_eof_response(window):
"""Create a Reduce response with EOF=True for a given window"""
return reduce_pb2.ReduceResponse(window=window, EOF=True)


class TaskManager:
"""
TaskManager is responsible for managing the Reduce tasks.
Expand Down Expand Up @@ -83,10 +88,10 @@ async def create_task(self, req):
d = req.payload
keys = d.keys()
unified_key = build_unique_key_name(keys, req.windows[0])
result = self.tasks.get(unified_key, None)
curr_task = self.tasks.get(unified_key, None)

# If the task does not exist, create a new task
if not result:
if not curr_task:
niter = NonBlockingIterator()
riter = niter.read_iterator()
# Create a new result queue for the current task
Expand Down Expand Up @@ -116,15 +121,15 @@ async def create_task(self, req):
task.add_done_callback(self.clean_background)

# Create a new ReduceResult object to store the task information
result = ReduceResult(task, niter, keys, req.windows[0], res_queue, consumer)
curr_task = ReduceResult(task, niter, keys, req.windows[0], res_queue, consumer)

# Save the result of the reduce operation to the task list
self.tasks[unified_key] = result
self.tasks[unified_key] = curr_task

# Put the request in the iterator
await result.iterator.put(d)
await curr_task.iterator.put(d)

async def append_task(self, req):
async def send_datum_to_task(self, req):
"""
Appends the request to the existing window reduce task.
If the task does not exist, create it.
Expand Down Expand Up @@ -188,7 +193,7 @@ async def producer(self, request_iterator: AsyncIterable[reduce_pb2.ReduceReques
elif request.operation is int(WindowOperation.APPEND):
# append the task data to the existing task
# if the task does not exist, create a new task
await self.append_task(request)
await self.send_datum_to_task(request)
# If there is an error in the reduce operation, log and
# then send the error to the result queue
except Exception as e:
Expand All @@ -204,9 +209,8 @@ async def producer(self, request_iterator: AsyncIterable[reduce_pb2.ReduceReques
await self.stream_send_eof()

# get the list of reduce tasks that are currently being processed
res = self.get_tasks()
# iterate through the tasks and wait for them to complete
for task in res:
for task in self.get_tasks():
# Once this is done, we know that the task has written all the results
# to the local result queue
fut = task.future
Expand All @@ -217,13 +221,14 @@ async def producer(self, request_iterator: AsyncIterable[reduce_pb2.ReduceReques
await task.result_queue.put(STREAM_EOF)

# Wait for the local queue to write
# all the results to the global result queue
# all the results of this task to the global result queue
con_future = task.consumer_future
await con_future

# Send an EOF message to the global result queue
# This will signal that window has been processed
await self.global_result_queue.put(task.window)
eof_window_msg = create_window_eof_response(window=task.window)
await self.global_result_queue.put(eof_window_msg)

# Once all tasks are completed, senf EOF the global result queue
await self.global_result_queue.put(STREAM_EOF)
Expand Down

0 comments on commit d2f8e01

Please sign in to comment.