Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
kohlisid committed Oct 13, 2023
1 parent c09f28d commit 2e54bbe
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 7 deletions.
22 changes: 21 additions & 1 deletion examples/source/simple-source/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,24 @@


class SimpleSource:
"""
SimpleSource is a class for User Defined Source implementation.
"""

def __init__(self):
"""
to_ack_set: Set to maintain a track of the offsets yet to be acknowledged
read_idx : the offset idx till where the messages have been read
"""
self.to_ack_set = set()
self.read_idx = 0

def read_handler(self, datum: ReadRequest) -> Iterable[Message]:
"""
read_handler is used to read the data from the source and send the data forward
for each read request we process num_records and increment the read_idx to indicate that
the message has been read and the same is added to the ack set
"""
if self.to_ack_set:
return

Expand All @@ -27,13 +40,20 @@ def read_handler(self, datum: ReadRequest) -> Iterable[Message]:
event_time=datetime.now(),
)
self.to_ack_set.add(str(self.read_idx))
self.read_idx = self.read_idx + 1
self.read_idx += 1

def ack_handler(self, ack_request: AckRequest):
"""
The ack handler is used acknowledge the offsets that have been read, and remove them
from the to_ack_set
"""
for offset in ack_request.offset:
self.to_ack_set.remove(str(offset.offset, "utf-8"))

def pending_handler(self) -> PendingResponse:
"""
The simple source always returns zero to indicate there is no pending record.
"""
return PendingResponse(count=0)


Expand Down
10 changes: 4 additions & 6 deletions pynumaflow/sourcer/_dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ class ReadRequest:
timeout_in_ms: the request timeout in milliseconds.
>>> # Example usage
>>> from pynumaflow.sourcer import ReadRequest
>>> from datetime import datetime, timezone
>>> datum = ReadRequest(num_records=10, timeout_in_ms=1000)
>>> read_request = ReadRequest(num_records=10, timeout_in_ms=1000)
"""

__slots__ = ("_num_records", "_timeout_in_ms")
Expand All @@ -106,10 +105,10 @@ def __init__(
timeout_in_ms: int,
):
if not isinstance(num_records, int):
raise TypeError(f"Wrong data type: {type(num_records)} for Datum.num_records")
raise TypeError(f"Wrong data type: {type(num_records)} for ReadRequest.num_records")
self._num_records = num_records
if not isinstance(timeout_in_ms, int):
raise TypeError(f"Wrong data type: {type(timeout_in_ms)} for Datum.timeout_in_ms")
raise TypeError(f"Wrong data type: {type(timeout_in_ms)} for ReadRequest.timeout_in_ms")
self._timeout_in_ms = timeout_in_ms

@property
Expand All @@ -132,7 +131,6 @@ class AckRequest:
offsets: the offsets to be acknowledged.
>>> # Example usage
>>> from pynumaflow.sourcer import AckRequest, Offset
>>> from datetime import datetime, timezone
>>> offset = Offset(offset=b"123", partition_id="0")
>>> ack_request = AckRequest(offsets=[offset, offset])
"""
Expand Down Expand Up @@ -169,7 +167,7 @@ def __init__(self, count: int):

@property
def count(self) -> int:
"""Returns the count of the event"""
"""Returns the count of pending records"""
return self._count


Expand Down

0 comments on commit 2e54bbe

Please sign in to comment.