Skip to content

Commit

Permalink
comments
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
kohlisid committed Jan 24, 2024
1 parent d8ce1ce commit 59d6550
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 6 deletions.
2 changes: 1 addition & 1 deletion examples/reduce/counter/Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
.PHONY: image
image:
docker build --no-cache -t "quay.io/numaio/numaflow-python/reduce-counter:v0.7.0" .
docker build -t "quay.io/numaio/numaflow-python/reduce-counter:v0.7.0" .
# Github CI runner uses platform linux/amd64. If your local environment don't, the image built by command above might not work
# under the CI E2E test environment.
# To build an image that supports multiple platforms(linux/amd64,linux/arm64) and push to quay.io, use the following command
Expand Down
7 changes: 4 additions & 3 deletions pynumaflow/reducer/async_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
Reducer,
)

from pynumaflow.shared.server import NumaflowServer, start_async_server
from pynumaflow.shared.server import NumaflowServer, checkInstance, start_async_server


def get_handler(reducer_handler: ReduceCallable, init_args: tuple = (), init_kwargs: dict = None):
Expand All @@ -34,12 +34,13 @@ def get_handler(reducer_handler: ReduceCallable, init_args: tuple = (), init_kwa
raise TypeError("Cannot pass function handler with init args or kwargs")
# return the function handler
return reducer_handler
elif issubclass(reducer_handler, Reducer):
elif not checkInstance(reducer_handler, Reducer) and issubclass(reducer_handler, Reducer):
# if handler is type of Class Reducer, create a new instance of
# a ReducerBuilderClass
return _ReduceBuilderClass(reducer_handler, init_args, init_kwargs)
else:
raise TypeError("Invalid type passed")
_LOGGER.error("Invalid Type: please provide the handler or the class name")
raise TypeError("Inavlid Type: please provide the handler or the class name")


class ReduceAsyncServer(NumaflowServer):
Expand Down
14 changes: 14 additions & 0 deletions pynumaflow/shared/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,3 +235,17 @@ def _reserve_port(port_num: int) -> Iterator[int]:
yield sock.getsockname()[1]
finally:
sock.close()


def checkInstance(instance, callable_type) -> bool:
"""
Check if the given instance is of the given callable_type.
"""
try:
if not isinstance(instance, callable_type):
return False
else:
return True
except Exception as e:
_LOGGER.error(e)
return False
10 changes: 8 additions & 2 deletions tests/reduce/test_async_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ async def handler(
return Messages(Message(str.encode(msg), keys=keys))


async def err_handler(keys: list[str], datums: AsyncIterable[Datum], md: Metadata) -> Messages:
async def reduce_handler_func(
keys: list[str], datums: AsyncIterable[Datum], md: Metadata
) -> Messages:
interval_window = md.interval_window
counter = 0
async for _ in datums:
Expand Down Expand Up @@ -238,7 +240,11 @@ def test_error_init(self):
# Check that the init_args and init_kwargs are passed
# only with a Reducer class
with self.assertRaises(TypeError):
ReduceAsyncServer(err_handler, init_args=(0, 1))
ReduceAsyncServer(reduce_handler_func, init_args=(0, 1))
# Check that an instance is not passed instead of the class
# signature
with self.assertRaises(TypeError):
ReduceAsyncServer(ExampleClass(0))


if __name__ == "__main__":
Expand Down

0 comments on commit 59d6550

Please sign in to comment.