Skip to content

Commit

Permalink
chore: batch ack req and sink resp
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
kohlisid committed Oct 30, 2024
1 parent a1d3eb5 commit 2373a0f
Show file tree
Hide file tree
Showing 16 changed files with 98 additions and 82 deletions.
3 changes: 2 additions & 1 deletion examples/source/simple_source/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ async def ack_handler(self, ack_request: AckRequest):
The ack handler is used acknowledge the offsets that have been read, and remove them
from the to_ack_set
"""
self.to_ack_set.remove(str(ack_request.offset.offset, "utf-8"))
for req in ack_request.offsets:
self.to_ack_set.remove(str(req.offset, "utf-8"))

async def pending_handler(self) -> PendingResponse:
"""
Expand Down
2 changes: 1 addition & 1 deletion pynumaflow/proto/sinker/sink.proto
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ message SinkResponse {
// err_msg is the error message, set it if success is set to false.
string err_msg = 3;
}
Result result = 1;
repeated Result results = 1;
optional Handshake handshake = 2;
optional TransmissionStatus status = 3;
}
16 changes: 8 additions & 8 deletions pynumaflow/proto/sinker/sink_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions pynumaflow/proto/sinker/sink_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class TransmissionStatus(_message.Message):
def __init__(self, eot: bool = ...) -> None: ...

class SinkResponse(_message.Message):
__slots__ = ("result", "handshake", "status")
__slots__ = ("results", "handshake", "status")

class Result(_message.Message):
__slots__ = ("id", "status", "err_msg")
Expand All @@ -106,15 +106,15 @@ class SinkResponse(_message.Message):
status: _Optional[_Union[Status, str]] = ...,
err_msg: _Optional[str] = ...,
) -> None: ...
RESULT_FIELD_NUMBER: _ClassVar[int]
RESULTS_FIELD_NUMBER: _ClassVar[int]
HANDSHAKE_FIELD_NUMBER: _ClassVar[int]
STATUS_FIELD_NUMBER: _ClassVar[int]
result: SinkResponse.Result
results: _containers.RepeatedCompositeFieldContainer[SinkResponse.Result]
handshake: Handshake
status: TransmissionStatus
def __init__(
self,
result: _Optional[_Union[SinkResponse.Result, _Mapping]] = ...,
results: _Optional[_Iterable[_Union[SinkResponse.Result, _Mapping]]] = ...,
handshake: _Optional[_Union[Handshake, _Mapping]] = ...,
status: _Optional[_Union[TransmissionStatus, _Mapping]] = ...,
) -> None: ...
4 changes: 2 additions & 2 deletions pynumaflow/proto/sourcer/source.proto
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ message ReadResponse {
*/
message AckRequest {
message Request {
// Required field holding the offset to be acked
Offset offset = 1;
// Required field holding the offsets to be acked
repeated Offset offsets = 1;
}
// Required field holding the request. The list will be ordered and will have the same order as the original Read response.
Request request = 1;
Expand Down
42 changes: 21 additions & 21 deletions pynumaflow/proto/sourcer/source_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions pynumaflow/proto/sourcer/source_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,12 @@ class AckRequest(_message.Message):
__slots__ = ("request", "handshake")

class Request(_message.Message):
__slots__ = ("offset",)
OFFSET_FIELD_NUMBER: _ClassVar[int]
offset: Offset
def __init__(self, offset: _Optional[_Union[Offset, _Mapping]] = ...) -> None: ...
__slots__ = ("offsets",)
OFFSETS_FIELD_NUMBER: _ClassVar[int]
offsets: _containers.RepeatedCompositeFieldContainer[Offset]
def __init__(
self, offsets: _Optional[_Iterable[_Union[Offset, _Mapping]]] = ...
) -> None: ...
REQUEST_FIELD_NUMBER: _ClassVar[int]
HANDSHAKE_FIELD_NUMBER: _ClassVar[int]
request: AckRequest.Request
Expand Down
5 changes: 2 additions & 3 deletions pynumaflow/sinker/servicer/async_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ async def SinkFn(
await req_queue.put(STREAM_EOF)
await cur_task
ret = cur_task.result()
for r in ret:
yield sink_pb2.SinkResponse(result=r)
yield sink_pb2.SinkResponse(results=ret)
# send EOT after each finishing sink responses
yield sink_pb2.SinkResponse(status=sink_pb2.TransmissionStatus(eot=True))
cur_task = None
Expand All @@ -99,7 +98,7 @@ async def __invoke_sink(
except BaseException as err:
err_msg = f"UDSinkError: {repr(err)}"
_LOGGER.critical(err_msg, exc_info=True)
exit_on_error(context, err_msg)
# exit_on_error(context, err_msg)
raise err

async def IsReady(
Expand Down
3 changes: 1 addition & 2 deletions pynumaflow/sinker/servicer/sync_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ def SinkFn(
if d.status and d.status.eot:
req_queue.put(STREAM_EOF)
ret = cur_task.join()
for resp in ret:
yield sink_pb2.SinkResponse(result=resp)
yield sink_pb2.SinkResponse(results=ret)
# send EOT after each finishing sink responses
yield sink_pb2.SinkResponse(status=sink_pb2.TransmissionStatus(eot=True))
cur_task = None
Expand Down
1 change: 0 additions & 1 deletion pynumaflow/sinker/servicer/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,5 @@ def _create_read_handshake_response() -> sink_pb2.SinkResponse:
sink_pb2.SinkResponse: A SinkResponse object indicating a successful handshake.
"""
return sink_pb2.SinkResponse(
result=sink_pb2.SinkResponse.Result(status=sink_pb2.SUCCESS),
handshake=sink_pb2.Handshake(sot=True),
)
20 changes: 10 additions & 10 deletions pynumaflow/sourcer/_dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,25 +149,25 @@ def timeout_in_ms(self) -> int:
class AckRequest:
"""
Class for defining the request for acknowledging datum.
It takes an offset that need to be acknowledged.
It takes a list of offsets that need to be acknowledged.
Args:
offset: the offset to be acknowledged.
offsets: the offsets to be acknowledged.
>>> # Example usage
>>> from pynumaflow.sourcer import AckRequest, Offset
>>> offset = Offset(offset=b"123", partition_id="0")
>>> ack_request = AckRequest(offsets=[offset, offset])
>>> offset_val = Offset(offset=b"123", partition_id=0)
>>> ack_request = AckRequest(offsets=[offset_val, offset_val])
"""

__slots__ = ("_offset",)
_offset: Offset
__slots__ = ("_offsets",)
_offsets: list[Offset]

def __init__(self, offset: Offset):
self._offset = offset
def __init__(self, offsets: list[Offset]):
self._offsets = offsets

@property
def offset(self) -> Offset:
def offsets(self) -> list[Offset]:
"""Returns the offsets to be acknowledged."""
return self._offset
return self._offsets

Check warning on line 170 in pynumaflow/sourcer/_dtypes.py

View check run for this annotation

Codecov / codecov/patch

pynumaflow/sourcer/_dtypes.py#L170

Added line #L170 was not covered by tests


@dataclass(init=False)
Expand Down
Loading

0 comments on commit 2373a0f

Please sign in to comment.