Skip to content

Commit

Permalink
comments
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
kohlisid committed Mar 6, 2024
1 parent 0ef57ca commit 5bfd9ea
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 5 deletions.
4 changes: 2 additions & 2 deletions pynumaflow/reducer/servicer/async_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,16 @@ async def ReduceFn(
# check whether the request is an open or append operation
if request.operation is int(WindowOperation.OPEN):
# create a new task for the open operation
# and inserts the request data into the task
await task_manager.create_task(request)
elif request.operation is int(WindowOperation.APPEND):
# append the task data to the existing task
# if the task does not exist, it will create a new task
await task_manager.append_task(request)
except Exception as e:
_LOGGER.critical("Reduce Error", exc_info=True)
context.set_code(grpc.StatusCode.UNKNOWN)
context.set_details(e.__str__())
yield reduce_pb2.ReduceResponse()
raise e

# send EOF to all the tasks once the request iterator is exhausted
Expand All @@ -126,7 +127,6 @@ async def ReduceFn(
except Exception as e:
context.set_code(grpc.StatusCode.UNKNOWN)
context.set_details(e.__str__())
yield reduce_pb2.ReduceResponse()
raise e

async def IsReady(
Expand Down
6 changes: 3 additions & 3 deletions pynumaflow/reducer/servicer/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
)


def get_unique_key(keys, window):
def build_unique_key_name(keys, window):
return f"{window.start.ToMilliseconds()}:{window.end.ToMilliseconds()}:{DELIMITER.join(keys)}"


Expand Down Expand Up @@ -70,7 +70,7 @@ async def create_task(self, req):

d = req.payload
keys = d.keys()
unified_key = get_unique_key(keys, req.windows[0])
unified_key = build_unique_key_name(keys, req.windows[0])
result = self.tasks.get(unified_key, None)

# If the task does not exist, create a new task
Expand Down Expand Up @@ -99,7 +99,7 @@ async def append_task(self, req):
raise UDFError("reduce create operation error: invalid number of windows")
d = req.payload
keys = d.keys()
unified_key = get_unique_key(keys, req.windows[0])
unified_key = build_unique_key_name(keys, req.windows[0])
result = self.tasks.get(unified_key, None)
if not result:
await self.create_task(req)
Expand Down

0 comments on commit 5bfd9ea

Please sign in to comment.