Skip to content

Commit

Permalink
feat: streaming sink (#193)
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
kohlisid authored Oct 18, 2024
1 parent b1b2d94 commit ec00728
Show file tree
Hide file tree
Showing 18 changed files with 622 additions and 259 deletions.
2 changes: 1 addition & 1 deletion examples/sink/async_log/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ description = ""
authors = ["Numaflow developers"]

[tool.poetry.dependencies]
python = "~3.10"
python = ">=3.10,<3.13"
pynumaflow = { path = "../../../"}

[tool.poetry.dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion examples/sink/log/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ description = ""
authors = ["Numaflow developers"]

[tool.poetry.dependencies]
python = "~3.10"
python = ">=3.10,<3.13"
pynumaflow = { path = "../../../"}

[tool.poetry.dev-dependencies]
Expand Down
43 changes: 35 additions & 8 deletions pynumaflow/proto/sinker/sink.proto
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
syntax = "proto3";

import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";

Expand All @@ -7,7 +8,7 @@ package sink.v1;

service Sink {
// SinkFn writes the request to a user defined sink.
rpc SinkFn(stream SinkRequest) returns (SinkResponse);
rpc SinkFn(stream SinkRequest) returns (stream SinkResponse);

// IsReady is the heartbeat endpoint for gRPC.
rpc IsReady(google.protobuf.Empty) returns (ReadyResponse);
Expand All @@ -17,12 +18,29 @@ service Sink {
* SinkRequest represents a request element.
*/
message SinkRequest {
repeated string keys = 1;
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
string id = 5;
map<string, string> headers = 6;
message Request {
repeated string keys = 1;
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
string id = 5;
map<string, string> headers = 6;
}
// Required field indicating the request.
Request request = 1;
// Required field indicating the status of the request.
// If eot is set to true, it indicates the end of transmission.
TransmissionStatus status = 2;
// optional field indicating the handshake message.
optional Handshake handshake = 3;
}

/*
* Handshake message between client and server to indicate the start of transmission.
*/
message Handshake {
// Required field indicating the start of transmission.
bool sot = 1;
}

/**
Expand All @@ -32,6 +50,13 @@ message ReadyResponse {
bool ready = 1;
}

/**
* TransmissionStatus is the status of the transmission.
*/
message TransmissionStatus {
bool eot = 1;
}

/*
* Status is the status of the response.
*/
Expand All @@ -53,5 +78,7 @@ message SinkResponse {
// err_msg is the error message, set it if success is set to false.
string err_msg = 3;
}
repeated Result results = 1;
Result result = 1;
optional Handshake handshake = 2;
optional TransmissionStatus status = 3;
}
38 changes: 22 additions & 16 deletions pynumaflow/proto/sinker/sink_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

92 changes: 63 additions & 29 deletions pynumaflow/proto/sinker/sink_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -25,45 +25,72 @@ FAILURE: Status
FALLBACK: Status

class SinkRequest(_message.Message):
__slots__ = ("keys", "value", "event_time", "watermark", "id", "headers")
__slots__ = ("request", "status", "handshake")

class HeadersEntry(_message.Message):
__slots__ = ("key", "value")
KEY_FIELD_NUMBER: _ClassVar[int]
class Request(_message.Message):
__slots__ = ("keys", "value", "event_time", "watermark", "id", "headers")

class HeadersEntry(_message.Message):
__slots__ = ("key", "value")
KEY_FIELD_NUMBER: _ClassVar[int]
VALUE_FIELD_NUMBER: _ClassVar[int]
key: str
value: str
def __init__(self, key: _Optional[str] = ..., value: _Optional[str] = ...) -> None: ...
KEYS_FIELD_NUMBER: _ClassVar[int]
VALUE_FIELD_NUMBER: _ClassVar[int]
key: str
value: str
def __init__(self, key: _Optional[str] = ..., value: _Optional[str] = ...) -> None: ...
KEYS_FIELD_NUMBER: _ClassVar[int]
VALUE_FIELD_NUMBER: _ClassVar[int]
EVENT_TIME_FIELD_NUMBER: _ClassVar[int]
WATERMARK_FIELD_NUMBER: _ClassVar[int]
ID_FIELD_NUMBER: _ClassVar[int]
HEADERS_FIELD_NUMBER: _ClassVar[int]
keys: _containers.RepeatedScalarFieldContainer[str]
value: bytes
event_time: _timestamp_pb2.Timestamp
watermark: _timestamp_pb2.Timestamp
id: str
headers: _containers.ScalarMap[str, str]
EVENT_TIME_FIELD_NUMBER: _ClassVar[int]
WATERMARK_FIELD_NUMBER: _ClassVar[int]
ID_FIELD_NUMBER: _ClassVar[int]
HEADERS_FIELD_NUMBER: _ClassVar[int]
keys: _containers.RepeatedScalarFieldContainer[str]
value: bytes
event_time: _timestamp_pb2.Timestamp
watermark: _timestamp_pb2.Timestamp
id: str
headers: _containers.ScalarMap[str, str]
def __init__(
self,
keys: _Optional[_Iterable[str]] = ...,
value: _Optional[bytes] = ...,
event_time: _Optional[_Union[_timestamp_pb2.Timestamp, _Mapping]] = ...,
watermark: _Optional[_Union[_timestamp_pb2.Timestamp, _Mapping]] = ...,
id: _Optional[str] = ...,
headers: _Optional[_Mapping[str, str]] = ...,
) -> None: ...
REQUEST_FIELD_NUMBER: _ClassVar[int]
STATUS_FIELD_NUMBER: _ClassVar[int]
HANDSHAKE_FIELD_NUMBER: _ClassVar[int]
request: SinkRequest.Request
status: TransmissionStatus
handshake: Handshake
def __init__(
self,
keys: _Optional[_Iterable[str]] = ...,
value: _Optional[bytes] = ...,
event_time: _Optional[_Union[_timestamp_pb2.Timestamp, _Mapping]] = ...,
watermark: _Optional[_Union[_timestamp_pb2.Timestamp, _Mapping]] = ...,
id: _Optional[str] = ...,
headers: _Optional[_Mapping[str, str]] = ...,
request: _Optional[_Union[SinkRequest.Request, _Mapping]] = ...,
status: _Optional[_Union[TransmissionStatus, _Mapping]] = ...,
handshake: _Optional[_Union[Handshake, _Mapping]] = ...,
) -> None: ...

class Handshake(_message.Message):
__slots__ = ("sot",)
SOT_FIELD_NUMBER: _ClassVar[int]
sot: bool
def __init__(self, sot: bool = ...) -> None: ...

class ReadyResponse(_message.Message):
__slots__ = ("ready",)
READY_FIELD_NUMBER: _ClassVar[int]
ready: bool
def __init__(self, ready: bool = ...) -> None: ...

class TransmissionStatus(_message.Message):
__slots__ = ("eot",)
EOT_FIELD_NUMBER: _ClassVar[int]
eot: bool
def __init__(self, eot: bool = ...) -> None: ...

class SinkResponse(_message.Message):
__slots__ = ("results",)
__slots__ = ("result", "handshake", "status")

class Result(_message.Message):
__slots__ = ("id", "status", "err_msg")
Expand All @@ -79,8 +106,15 @@ class SinkResponse(_message.Message):
status: _Optional[_Union[Status, str]] = ...,
err_msg: _Optional[str] = ...,
) -> None: ...
RESULTS_FIELD_NUMBER: _ClassVar[int]
results: _containers.RepeatedCompositeFieldContainer[SinkResponse.Result]
RESULT_FIELD_NUMBER: _ClassVar[int]
HANDSHAKE_FIELD_NUMBER: _ClassVar[int]
STATUS_FIELD_NUMBER: _ClassVar[int]
result: SinkResponse.Result
handshake: Handshake
status: TransmissionStatus
def __init__(
self, results: _Optional[_Iterable[_Union[SinkResponse.Result, _Mapping]]] = ...
self,
result: _Optional[_Union[SinkResponse.Result, _Mapping]] = ...,
handshake: _Optional[_Union[Handshake, _Mapping]] = ...,
status: _Optional[_Union[TransmissionStatus, _Mapping]] = ...,
) -> None: ...
6 changes: 3 additions & 3 deletions pynumaflow/proto/sinker/sink_pb2_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def __init__(self, channel):
Args:
channel: A grpc.Channel.
"""
self.SinkFn = channel.stream_unary(
self.SinkFn = channel.stream_stream(
"/sink.v1.Sink/SinkFn",
request_serializer=sink__pb2.SinkRequest.SerializeToString,
response_deserializer=sink__pb2.SinkResponse.FromString,
Expand Down Expand Up @@ -45,7 +45,7 @@ def IsReady(self, request, context):

def add_SinkServicer_to_server(servicer, server):
rpc_method_handlers = {
"SinkFn": grpc.stream_unary_rpc_method_handler(
"SinkFn": grpc.stream_stream_rpc_method_handler(
servicer.SinkFn,
request_deserializer=sink__pb2.SinkRequest.FromString,
response_serializer=sink__pb2.SinkResponse.SerializeToString,
Expand Down Expand Up @@ -77,7 +77,7 @@ def SinkFn(
timeout=None,
metadata=None,
):
return grpc.experimental.stream_unary(
return grpc.experimental.stream_stream(
request_iterator,
target,
"/sink.v1.Sink/SinkFn",
Expand Down
24 changes: 4 additions & 20 deletions pynumaflow/reducestreamer/servicer/async_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from collections.abc import AsyncIterable
from typing import Union

import grpc
from google.protobuf import empty_pb2 as _empty_pb2

from pynumaflow.proto.reducer import reduce_pb2, reduce_pb2_grpc
Expand All @@ -13,7 +12,7 @@
ReduceRequest,
)
from pynumaflow.reducestreamer.servicer.task_manager import TaskManager
from pynumaflow.shared.server import exit_on_error, handle_error
from pynumaflow.shared.server import handle_async_error
from pynumaflow.types import NumaflowServicerContext


Expand Down Expand Up @@ -95,35 +94,20 @@ async def ReduceFn(
async for msg in consumer:
# If the message is an exception, we raise the exception
if isinstance(msg, BaseException):
handle_error(context, msg)
await asyncio.gather(
context.abort(grpc.StatusCode.UNKNOWN, details=repr(msg)),
return_exceptions=True,
)
exit_on_error(
err=repr(msg), parent=False, context=context, update_context=False
)
await handle_async_error(context, msg)
return
# Send window EOF response or Window result response
# back to the client
else:
yield msg
except BaseException as e:
handle_error(context, e)
await asyncio.gather(
context.abort(grpc.StatusCode.UNKNOWN, details=repr(e)), return_exceptions=True
)
exit_on_error(err=repr(e), parent=False, context=context, update_context=False)
await handle_async_error(context, e)
return
# Wait for the process_input_stream task to finish for a clean exit
try:
await producer
except BaseException as e:
handle_error(context, e)
await asyncio.gather(
context.abort(grpc.StatusCode.UNKNOWN, details=repr(e)), return_exceptions=True
)
exit_on_error(err=repr(e), parent=False, context=context, update_context=False)
await handle_async_error(context, e)
return

async def IsReady(
Expand Down
4 changes: 2 additions & 2 deletions pynumaflow/shared/asynciter.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ class NonBlockingIterator:

__slots__ = "_queue"

def __init__(self):
self._queue = asyncio.Queue()
def __init__(self, size=0):
self._queue = asyncio.Queue(maxsize=size)

async def read_iterator(self):
item = await self._queue.get()
Expand Down
17 changes: 16 additions & 1 deletion pynumaflow/shared/server.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import contextlib
import io
import multiprocessing
Expand Down Expand Up @@ -266,7 +267,10 @@ def exit_on_error(
p.kill()


def handle_error(context: NumaflowServicerContext, e: BaseException):
def update_context_err(context: NumaflowServicerContext, e: BaseException):
"""
Update the context with the error and log the exception.
"""
trace = get_exception_traceback_str(e)
_LOGGER.critical(trace)
_LOGGER.critical(e.__str__())
Expand All @@ -278,3 +282,14 @@ def get_exception_traceback_str(exc) -> str:
file = io.StringIO()
traceback.print_exception(exc, value=exc, tb=exc.__traceback__, file=file)
return file.getvalue().rstrip()


async def handle_async_error(context: NumaflowServicerContext, exception: BaseException):
"""
Handle exceptions for async servers by updating the context and exiting.
"""
update_context_err(context, exception)
await asyncio.gather(
context.abort(grpc.StatusCode.UNKNOWN, details=repr(exception)), return_exceptions=True
)
exit_on_error(err=repr(exception), parent=False, context=context, update_context=False)
Loading

0 comments on commit ec00728

Please sign in to comment.