Skip to content

Commit

Permalink
chore: Refactor SDK to split into separate servers (#105)
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
kohlisid authored Sep 6, 2023
1 parent b7c130f commit 3720c95
Show file tree
Hide file tree
Showing 167 changed files with 5,142 additions and 3,561 deletions.
9 changes: 6 additions & 3 deletions .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ coverage:

ignore:
- "examples/"
- "pynumaflow/function/proto/*"
- "pynumaflow/sink/proto/*"
- "pynumaflow/function/_udfunction_pb2.pyi"
- "pynumaflow/mapper/proto/*"
- "pynumaflow/sinker/proto/*"
- "pynumaflow/mapstreamer/proto/*"
- "pynumaflow/reducer/proto/*"
- "pynumaflow/sourcetransformer/proto/*"
- "pynumaflow/map/_udfunction_pb2.pyi"
- "pynumaflow/sink/_udsink_pb2.pyi"
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,9 @@ setup:
proto:
python3 -m grpc_tools.protoc -I=pynumaflow/function/proto --python_out=pynumaflow/function/proto --grpc_python_out=pynumaflow/function/proto pynumaflow/function/proto/*.proto
python3 -m grpc_tools.protoc -I=pynumaflow/sink/proto --python_out=pynumaflow/sink/proto --grpc_python_out=pynumaflow/sink/proto pynumaflow/sink/proto/*.proto
python3 -m grpc_tools.protoc -I=pynumaflow/map/proto --python_out=pynumaflow/map/proto --grpc_python_out=pynumaflow/map/proto pynumaflow/map/proto/*.proto
python3 -m grpc_tools.protoc -I=pynumaflow/mapstream/proto --python_out=pynumaflow/mapstream/proto --grpc_python_out=pynumaflow/mapstream/proto pynumaflow/mapstream/proto/*.proto
python3 -m grpc_tools.protoc -I=pynumaflow/reduce/proto --python_out=pynumaflow/reduce/proto --grpc_python_out=pynumaflow/reduce/proto pynumaflow/reduce/proto/*.proto
python3 -m grpc_tools.protoc -I=pynumaflow/sourcetransform/proto --python_out=pynumaflow/sourcetransform/proto --grpc_python_out=pynumaflow/sourcetransform/proto pynumaflow/sourcetransform/proto/*.proto

sed -i '' 's/^\(import.*_pb2\)/from . \1/' pynumaflow/*/proto/*.py
49 changes: 17 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pre-commit install
### Map

```python
from pynumaflow.function import Messages, Message, Datum, Server
from pynumaflow.mapper import Messages, Message, Datum, Mapper
def my_handler(keys: list[str], datum: Datum) -> Messages:
Expand All @@ -57,28 +57,28 @@ def my_handler(keys: list[str], datum: Datum) -> Messages:
if __name__ == "__main__":
grpc_server = Server(map_handler=my_handler)
grpc_server = Mapper(handler=my_handler)
grpc_server.start()
```
### MapT - Map with event time assignment capability
In addition to the regular Map function, MapT supports assigning a new event time to the message.
MapT is only supported at source vertex to enable (a) early data filtering and (b) watermark assignment by extracting new event time from the message payload.
### SourceTransformer - Map with event time assignment capability
In addition to the regular Map function, SourceTransformer supports assigning a new event time to the message.
SourceTransformer is only supported at source vertex to enable (a) early data filtering and (b) watermark assignment by extracting new event time from the message payload.
```python
from datetime import datetime
from pynumaflow.function import MessageTs, MessageT, Datum, Server
from pynumaflow.sourcetransformer import Messages, Message, Datum, SourceTransformer
def mapt_handler(keys: list[str], datum: Datum) -> MessageTs:
def transform_handler(keys: list[str], datum: Datum) -> Messages:
val = datum.value
new_event_time = datetime.now()
_ = datum.watermark
message_t_s = MessageTs(MessageT(val, event_time=new_event_time, keys=keys))
message_t_s = Messages(Message(val, event_time=new_event_time, keys=keys))
return message_t_s
if __name__ == "__main__":
grpc_server = Server(mapt_handler=mapt_handler)
grpc_server = SourceTransformer(handler=transform_handler)
grpc_server.start()
```
Expand All @@ -87,11 +87,11 @@ if __name__ == "__main__":
```python
import aiorun
from typing import Iterator, List
from pynumaflow.function import Messages, Message, Datum, Metadata, AsyncServer
from pynumaflow.reducer import Messages, Message, Datum, Metadata, AsyncReducer
async def my_handler(
keys: List[str], datums: Iterator[Datum], md: Metadata
keys: List[str], datums: Iterator[Datum], md: Metadata
) -> Messages:
interval_window = md.interval_window
counter = 0
Expand All @@ -105,19 +105,19 @@ async def my_handler(
if __name__ == "__main__":
grpc_server = AsyncServer(reduce_handler=my_handler)
grpc_server = AsyncReducer(handler=my_handler)
aiorun.run(grpc_server.start())
```
### Sample Image
A sample UDF [Dockerfile](examples/function/forward_message/Dockerfile) is provided
under [examples](examples/function/forward_message).
A sample UDF [Dockerfile](examples/map/forward_message/Dockerfile) is provided
under [examples](examples/map/forward_message).
## Implement a User Defined Sink (UDSink)
```python
from typing import Iterator
from pynumaflow.sink import Datum, Responses, Response, Sink
from pynumaflow.sinker import Datum, Responses, Response, Sinker
def my_handler(datums: Iterator[Datum]) -> Responses:
Expand All @@ -129,26 +129,11 @@ def my_handler(datums: Iterator[Datum]) -> Responses:
if __name__ == "__main__":
grpc_server = Sink(my_handler)
grpc_server = Sinker(my_handler)
grpc_server.start()
```
### Sample Image
A sample UDSink [Dockerfile](examples/sink/log/Dockerfile) is provided
under [examples](examples/sink/log).
### Datum Metadata
The Datum object contains the message payload and metadata. Currently, there are two fields
in metadata: the message ID, the message delivery count to indicate how many times the message
has been delivered. You can use these metadata to implement customized logic. For example,
```python
...
def my_handler(keys: list[str], datum: Datum) -> Messages:
num_delivered = datum.metadata.num_delivered
# Choose to do specific actions, if the message delivery count reaches a certain threshold.
if num_delivered > 3:
...
```
under [examples](examples/sink/log).
18 changes: 4 additions & 14 deletions examples/developer_guide/example.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,15 @@
import aiorun
from collections.abc import Iterator

from pynumaflow.function import (
from pynumaflow.reducer import (
Messages,
Message,
Datum,
Metadata,
AsyncServer,
AsyncReducer,
)


async def map_handler(keys: list[str], datum: Datum) -> Messages:
# forward a message
val = datum.value
_ = datum.event_time
_ = datum.watermark
messages = Messages()
messages.append(Message.to_vtx(keys, val))
return messages


async def my_handler(keys: list[str], datums: Iterator[Datum], md: Metadata) -> Messages:
# count the number of events
interval_window = md.interval_window
Expand All @@ -31,9 +21,9 @@ async def my_handler(keys: list[str], datums: Iterator[Datum], md: Metadata) ->
f"counter:{counter} interval_window_start:{interval_window.start} "
f"interval_window_end:{interval_window.end}"
)
return Messages(Message.to_vtx(keys, str.encode(msg)))
return Messages(Message(keys=keys, value=str.encode(msg)))


if __name__ == "__main__":
grpc_server = AsyncServer(map_handler=map_handler, reduce_handler=my_handler)
grpc_server = AsyncReducer(handler=my_handler)
aiorun.run(grpc_server.start())
54 changes: 0 additions & 54 deletions examples/function/flatmap/Dockerfile

This file was deleted.

54 changes: 0 additions & 54 deletions examples/function/flatmap_stream/Dockerfile

This file was deleted.

54 changes: 0 additions & 54 deletions examples/function/forward_message/Dockerfile

This file was deleted.

Loading

0 comments on commit 3720c95

Please sign in to comment.