Skip to content

Commit

Permalink
tests
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
kohlisid committed Mar 5, 2024
1 parent 41ba107 commit 1748753
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 39 deletions.
4 changes: 4 additions & 0 deletions pynumaflow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@ class NoPublicConstructorError(TypeError):

class SocketError(Exception):
"""To raise an error while creating socket or setting its property"""


class UDFError(Exception):
"""To Raise an error while executing a UDF call"""
32 changes: 5 additions & 27 deletions pynumaflow/reducer/servicer/async_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from pynumaflow.proto.reducer import reduce_pb2, reduce_pb2_grpc
from pynumaflow.reducer._dtypes import (
Datum,
Metadata,
ReduceAsyncCallable,
_ReduceBuilderClass,
ReduceRequest,
Expand Down Expand Up @@ -50,7 +49,7 @@ def __init__(
# Event loop only keeps a weak reference, which can cause it to
# get lost during execution.
self.background_tasks = set()
# The reduce handler can be a function or a builder class instance.
# The Reduce handler can be a function or a builder class instance.
self.__reduce_handler: Union[ReduceAsyncCallable, _ReduceBuilderClass] = handler

async def ReduceFn(
Expand Down Expand Up @@ -79,7 +78,10 @@ async def ReduceFn(
# append the task data to the existing task
await task_manager.append_task(request)
except Exception as e:
_LOGGER.critical(e.__str__())
_LOGGER.critical("Reduce Error", exc_info=True)
context.set_code(grpc.StatusCode.UNKNOWN)
context.set_details(e.__str__())
yield reduce_pb2.ReduceResponse()

# send EOF to all the tasks once the request iterator is exhausted
await task_manager.stream_send_eof()
Expand All @@ -98,30 +100,6 @@ async def ReduceFn(
context.set_details(e.__str__())
yield reduce_pb2.ReduceResponse()

async def __invoke_reduce(
self, keys: list[str], request_iterator: AsyncIterable[Datum], md: Metadata
):
new_instance = self.__reduce_handler
# If the reduce handler is a class instance, create a new instance of it.
# It is required for a new key to be processed by a
# new instance of the reducer for a given window
# Otherwise the function handler can be called directly
if isinstance(self.__reduce_handler, _ReduceBuilderClass):
new_instance = self.__reduce_handler.create()
try:
msgs = await new_instance(keys, request_iterator, md)
except Exception as err:
_LOGGER.critical("UDFError, re-raising the error", exc_info=True)
raise err

datum_responses = []
for msg in msgs:
datum_responses.append(
reduce_pb2.ReduceResponse.Result(keys=msg.keys, value=msg.value, tags=msg.tags)
)

return datum_responses

async def IsReady(
self, request: _empty_pb2.Empty, context: NumaflowServicerContext
) -> reduce_pb2.ReadyResponse:
Expand Down
22 changes: 16 additions & 6 deletions pynumaflow/reducer/servicer/task_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import asyncio
from datetime import datetime, timezone
from typing import Union
from collections.abc import AsyncIterable

from pynumaflow.exceptions import UDFError
from pynumaflow.proto.reducer import reduce_pb2
from pynumaflow.reducer.servicer.asynciter import NonBlockingIterator
from pynumaflow._constants import (
Expand All @@ -20,7 +23,7 @@


def get_unique_key(keys, window):
return f"{window.start}:{window.end}:{DELIMITER.join(keys)}"
return f"{window.start.ToMilliseconds()}:{window.end.ToMilliseconds()}:{DELIMITER.join(keys)}"


class TaskManager:
Expand Down Expand Up @@ -49,6 +52,9 @@ async def stream_send_eof(self):

async def create_task(self, req):
# if len of windows in request != 1, raise error
if len(req.windows) != 1:
raise UDFError("reduce create operation error: invalid number of windows")

d = req.payload
keys = d.keys()
unified_key = get_unique_key(keys, req.windows[0])
Expand All @@ -70,21 +76,25 @@ async def create_task(self, req):
# Put the request in the iterator
await result.iterator.put(d)

async def append_task(self, request):
d = request.payload
async def append_task(self, req):
if len(req.windows) != 1:
raise UDFError("reduce create operation error: invalid number of windows")
d = req.payload
keys = d.keys()
unified_key = get_unique_key(keys, request.windows[0])
unified_key = get_unique_key(keys, req.windows[0])
result = self.tasks.get(unified_key, None)
if not result:
await self.create_task(request)
await self.create_task(req)
else:
await result.iterator.put(d)

async def __invoke_reduce(
self, keys: list[str], request_iterator: AsyncIterable[Datum], window: ReduceWindow
):
new_instance = self.__reduce_handler
interval_window = IntervalWindow(window.start, window.end)
start_dt = datetime.fromtimestamp(int(window.start.ToMilliseconds()) / 1e3, timezone.utc)
end_dt = datetime.fromtimestamp(int(window.end.ToMilliseconds()) / 1e3, timezone.utc)
interval_window = IntervalWindow(start_dt, end_dt)
md = Metadata(interval_window=interval_window)
# If the reduce handler is a class instance, create a new instance of it.
# It is required for a new key to be processed by a
Expand Down
16 changes: 12 additions & 4 deletions tests/reduce/test_async_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,12 @@ def test_reduce(self) -> None:
),
r.result.value,
)
self.assertEqual(r.window.start.ToSeconds(), 1662998400)
self.assertEqual(r.window.end.ToSeconds(), 1662998460)
self.assertEqual(r.EOF, False)
else:
self.assertEqual(r.EOF, True)
eof_count += 1
self.assertEqual(r.window.start.ToSeconds(), 1662998400)
self.assertEqual(r.window.end.ToSeconds(), 1662998460)
# since there is only one key, the output count is 1
self.assertEqual(1, count)
self.assertEqual(1, eof_count)
Expand All @@ -212,6 +214,7 @@ def test_reduce_with_multiple_keys(self) -> None:
print(e)

count = 0
eof_count = 0

# capture the output from the ReduceFn generator and assert.
for r in generator_response:
Expand All @@ -225,9 +228,14 @@ def test_reduce_with_multiple_keys(self) -> None:
),
r.result.value,
)
self.assertEqual(r.window.start.ToSeconds(), 1662998400)
self.assertEqual(r.window.end.ToSeconds(), 1662998460)
self.assertEqual(r.EOF, False)
else:
eof_count += 1
self.assertEqual(r.EOF, True)
self.assertEqual(r.window.start.ToSeconds(), 1662998400)
self.assertEqual(r.window.end.ToSeconds(), 1662998460)
self.assertEqual(100, count)
self.assertEqual(100, eof_count)

def test_is_ready(self) -> None:
with grpc.insecure_channel("unix:///tmp/reduce.sock") as channel:
Expand Down
28 changes: 26 additions & 2 deletions tests/reduce/test_async_reduce_err.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def request_generator(count, request, resetkey: bool = False):
yield request


def start_request() -> (Datum, tuple):
def start_request(multiple_window: False) -> (Datum, tuple):
event_time_timestamp, watermark_timestamp = get_time_args()
window = reduce_pb2.Window(
start=mock_interval_window_start(),
Expand All @@ -50,6 +50,12 @@ def start_request() -> (Datum, tuple):
event=reduce_pb2.ReduceRequest.WindowOperation.Event.APPEND,
windows=[window],
)
if multiple_window:
operation = reduce_pb2.ReduceRequest.WindowOperation(
event=reduce_pb2.ReduceRequest.WindowOperation.Event.APPEND,
windows=[window, window],
)

request = reduce_pb2.ReduceRequest(
payload=payload,
operation=operation,
Expand Down Expand Up @@ -134,7 +140,7 @@ def tearDownClass(cls) -> None:

def test_reduce(self) -> None:
stub = self.__stub()
request, metadata = start_request()
request, metadata = start_request(multiple_window=False)
generator_response = None
try:
generator_response = stub.ReduceFn(
Expand All @@ -148,6 +154,24 @@ def test_reduce(self) -> None:
return
self.fail("Expected an exception.")

def test_reduce_window_len(self) -> None:
stub = self.__stub()
request, metadata = start_request(multiple_window=True)
generator_response = None
try:
generator_response = stub.ReduceFn(
request_iterator=request_generator(count=10, request=request)
)
counter = 0
for _ in generator_response:
counter += 1
except Exception as err:
self.assertTrue(
"reduce create operation error: invalid number of windows" in err.__str__()
)
return
self.fail("Expected an exception.")

def __stub(self):
return reduce_pb2_grpc.ReduceStub(_channel)

Expand Down

0 comments on commit 1748753

Please sign in to comment.