Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Refactor SDK to split into separate servers #105

Merged
merged 10 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ coverage:

ignore:
- "examples/"
- "pynumaflow/function/proto/*"
- "pynumaflow/sink/proto/*"
- "pynumaflow/function/_udfunction_pb2.pyi"
- "pynumaflow/mapper/proto/*"
- "pynumaflow/sinker/proto/*"
- "pynumaflow/mapstreamer/proto/*"
- "pynumaflow/reducer/proto/*"
- "pynumaflow/sourcetransformer/proto/*"
- "pynumaflow/map/_udfunction_pb2.pyi"
- "pynumaflow/sink/_udsink_pb2.pyi"
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,9 @@ setup:
proto:
python3 -m grpc_tools.protoc -I=pynumaflow/function/proto --python_out=pynumaflow/function/proto --grpc_python_out=pynumaflow/function/proto pynumaflow/function/proto/*.proto
python3 -m grpc_tools.protoc -I=pynumaflow/sink/proto --python_out=pynumaflow/sink/proto --grpc_python_out=pynumaflow/sink/proto pynumaflow/sink/proto/*.proto
python3 -m grpc_tools.protoc -I=pynumaflow/map/proto --python_out=pynumaflow/map/proto --grpc_python_out=pynumaflow/map/proto pynumaflow/map/proto/*.proto
python3 -m grpc_tools.protoc -I=pynumaflow/mapstream/proto --python_out=pynumaflow/mapstream/proto --grpc_python_out=pynumaflow/mapstream/proto pynumaflow/mapstream/proto/*.proto
python3 -m grpc_tools.protoc -I=pynumaflow/reduce/proto --python_out=pynumaflow/reduce/proto --grpc_python_out=pynumaflow/reduce/proto pynumaflow/reduce/proto/*.proto
python3 -m grpc_tools.protoc -I=pynumaflow/sourcetransform/proto --python_out=pynumaflow/sourcetransform/proto --grpc_python_out=pynumaflow/sourcetransform/proto pynumaflow/sourcetransform/proto/*.proto

sed -i '' 's/^\(import.*_pb2\)/from . \1/' pynumaflow/*/proto/*.py
49 changes: 17 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pre-commit install
### Map

```python
from pynumaflow.function import Messages, Message, Datum, Server
from pynumaflow.mapper import Messages, Message, Datum, Mapper
def my_handler(keys: list[str], datum: Datum) -> Messages:
Expand All @@ -57,28 +57,28 @@ def my_handler(keys: list[str], datum: Datum) -> Messages:
if __name__ == "__main__":
grpc_server = Server(map_handler=my_handler)
grpc_server = Mapper(handler=my_handler)
grpc_server.start()
```
### MapT - Map with event time assignment capability
In addition to the regular Map function, MapT supports assigning a new event time to the message.
MapT is only supported at source vertex to enable (a) early data filtering and (b) watermark assignment by extracting new event time from the message payload.
### SourceTransformer - Map with event time assignment capability
In addition to the regular Map function, SourceTransformer supports assigning a new event time to the message.
SourceTransformer is only supported at source vertex to enable (a) early data filtering and (b) watermark assignment by extracting new event time from the message payload.
```python
from datetime import datetime
from pynumaflow.function import MessageTs, MessageT, Datum, Server
from pynumaflow.sourcetransformer import Messages, Message, Datum, SourceTransformer
def mapt_handler(keys: list[str], datum: Datum) -> MessageTs:
def transform_handler(keys: list[str], datum: Datum) -> Messages:
val = datum.value
new_event_time = datetime.now()
_ = datum.watermark
message_t_s = MessageTs(MessageT(val, event_time=new_event_time, keys=keys))
message_t_s = Messages(Message(val, event_time=new_event_time, keys=keys))
return message_t_s
if __name__ == "__main__":
grpc_server = Server(mapt_handler=mapt_handler)
grpc_server = SourceTransformer(handler=transform_handler)
grpc_server.start()
```
Expand All @@ -87,11 +87,11 @@ if __name__ == "__main__":
```python
import aiorun
from typing import Iterator, List
from pynumaflow.function import Messages, Message, Datum, Metadata, AsyncServer
from pynumaflow.reducer import Messages, Message, Datum, Metadata, AsyncReducer
async def my_handler(
keys: List[str], datums: Iterator[Datum], md: Metadata
keys: List[str], datums: Iterator[Datum], md: Metadata
) -> Messages:
interval_window = md.interval_window
counter = 0
Expand All @@ -105,19 +105,19 @@ async def my_handler(
if __name__ == "__main__":
grpc_server = AsyncServer(reduce_handler=my_handler)
grpc_server = AsyncReducer(handler=my_handler)
aiorun.run(grpc_server.start())
```
### Sample Image
A sample UDF [Dockerfile](examples/function/forward_message/Dockerfile) is provided
under [examples](examples/function/forward_message).
A sample UDF [Dockerfile](examples/map/forward_message/Dockerfile) is provided
under [examples](examples/map/forward_message).
## Implement a User Defined Sink (UDSink)
```python
from typing import Iterator
from pynumaflow.sink import Datum, Responses, Response, Sink
from pynumaflow.sinker import Datum, Responses, Response, Sinker
def my_handler(datums: Iterator[Datum]) -> Responses:
Expand All @@ -129,26 +129,11 @@ def my_handler(datums: Iterator[Datum]) -> Responses:
if __name__ == "__main__":
grpc_server = Sink(my_handler)
grpc_server = Sinker(my_handler)
grpc_server.start()
```
### Sample Image
A sample UDSink [Dockerfile](examples/sink/log/Dockerfile) is provided
under [examples](examples/sink/log).
### Datum Metadata
The Datum object contains the message payload and metadata. Currently, there are two fields
in metadata: the message ID, the message delivery count to indicate how many times the message
has been delivered. You can use these metadata to implement customized logic. For example,
```python
...
def my_handler(keys: list[str], datum: Datum) -> Messages:
num_delivered = datum.metadata.num_delivered
# Choose to do specific actions, if the message delivery count reaches a certain threshold.
if num_delivered > 3:
...
```
under [examples](examples/sink/log).
18 changes: 4 additions & 14 deletions examples/developer_guide/example.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,15 @@
import aiorun
from collections.abc import Iterator

from pynumaflow.function import (
from pynumaflow.reducer import (
Messages,
Message,
Datum,
Metadata,
AsyncServer,
AsyncReducer,
)


async def map_handler(keys: list[str], datum: Datum) -> Messages:
# forward a message
val = datum.value
_ = datum.event_time
_ = datum.watermark
messages = Messages()
messages.append(Message.to_vtx(keys, val))
return messages


async def my_handler(keys: list[str], datums: Iterator[Datum], md: Metadata) -> Messages:
# count the number of events
interval_window = md.interval_window
Expand All @@ -31,9 +21,9 @@ async def my_handler(keys: list[str], datums: Iterator[Datum], md: Metadata) ->
f"counter:{counter} interval_window_start:{interval_window.start} "
f"interval_window_end:{interval_window.end}"
)
return Messages(Message.to_vtx(keys, str.encode(msg)))
return Messages(Message(keys=keys, value=str.encode(msg)))


if __name__ == "__main__":
grpc_server = AsyncServer(map_handler=map_handler, reduce_handler=my_handler)
grpc_server = AsyncReducer(handler=my_handler)
aiorun.run(grpc_server.start())
54 changes: 0 additions & 54 deletions examples/function/flatmap/Dockerfile

This file was deleted.

54 changes: 0 additions & 54 deletions examples/function/flatmap_stream/Dockerfile

This file was deleted.

54 changes: 0 additions & 54 deletions examples/function/forward_message/Dockerfile

This file was deleted.

Loading