From 2e54bbe2a65045a3a2f60f6e5a161104a9700dda Mon Sep 17 00:00:00 2001 From: Sidhant Kohli Date: Fri, 13 Oct 2023 15:31:36 -0700 Subject: [PATCH] refactor Signed-off-by: Sidhant Kohli --- examples/source/simple-source/example.py | 22 +++++++++++++++++++++- pynumaflow/sourcer/_dtypes.py | 10 ++++------ 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/examples/source/simple-source/example.py b/examples/source/simple-source/example.py index f5854a14..47023f8a 100644 --- a/examples/source/simple-source/example.py +++ b/examples/source/simple-source/example.py @@ -12,11 +12,24 @@ class SimpleSource: + """ + SimpleSource is a class for User Defined Source implementation. + """ + def __init__(self): + """ + to_ack_set: Set to maintain a track of the offsets yet to be acknowledged + read_idx : the offset idx till where the messages have been read + """ self.to_ack_set = set() self.read_idx = 0 def read_handler(self, datum: ReadRequest) -> Iterable[Message]: + """ + read_handler is used to read the data from the source and send the data forward + for each read request we process num_records and increment the read_idx to indicate that + the message has been read and the same is added to the ack set + """ if self.to_ack_set: return @@ -27,13 +40,20 @@ def read_handler(self, datum: ReadRequest) -> Iterable[Message]: event_time=datetime.now(), ) self.to_ack_set.add(str(self.read_idx)) - self.read_idx = self.read_idx + 1 + self.read_idx += 1 def ack_handler(self, ack_request: AckRequest): + """ + The ack handler is used acknowledge the offsets that have been read, and remove them + from the to_ack_set + """ for offset in ack_request.offset: self.to_ack_set.remove(str(offset.offset, "utf-8")) def pending_handler(self) -> PendingResponse: + """ + The simple source always returns zero to indicate there is no pending record. + """ return PendingResponse(count=0) diff --git a/pynumaflow/sourcer/_dtypes.py b/pynumaflow/sourcer/_dtypes.py index b9a1a4f3..7f88c9c8 100644 --- a/pynumaflow/sourcer/_dtypes.py +++ b/pynumaflow/sourcer/_dtypes.py @@ -91,8 +91,7 @@ class ReadRequest: timeout_in_ms: the request timeout in milliseconds. >>> # Example usage >>> from pynumaflow.sourcer import ReadRequest - >>> from datetime import datetime, timezone - >>> datum = ReadRequest(num_records=10, timeout_in_ms=1000) + >>> read_request = ReadRequest(num_records=10, timeout_in_ms=1000) """ __slots__ = ("_num_records", "_timeout_in_ms") @@ -106,10 +105,10 @@ def __init__( timeout_in_ms: int, ): if not isinstance(num_records, int): - raise TypeError(f"Wrong data type: {type(num_records)} for Datum.num_records") + raise TypeError(f"Wrong data type: {type(num_records)} for ReadRequest.num_records") self._num_records = num_records if not isinstance(timeout_in_ms, int): - raise TypeError(f"Wrong data type: {type(timeout_in_ms)} for Datum.timeout_in_ms") + raise TypeError(f"Wrong data type: {type(timeout_in_ms)} for ReadRequest.timeout_in_ms") self._timeout_in_ms = timeout_in_ms @property @@ -132,7 +131,6 @@ class AckRequest: offsets: the offsets to be acknowledged. >>> # Example usage >>> from pynumaflow.sourcer import AckRequest, Offset - >>> from datetime import datetime, timezone >>> offset = Offset(offset=b"123", partition_id="0") >>> ack_request = AckRequest(offsets=[offset, offset]) """ @@ -169,7 +167,7 @@ def __init__(self, count: int): @property def count(self) -> int: - """Returns the count of the event""" + """Returns the count of pending records""" return self._count