From 7b90d4d71f77b79ba4a9b6700e3bf84f12c0356e Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Fri, 13 Oct 2023 16:43:32 -0400 Subject: [PATCH] chore: unify the event time of messages to drop (#115) Set the event time of messages to drop to epoch(0) - 1ms across all our SDKs. Signed-off-by: Keran Yang --- pynumaflow/_constants.py | 7 +++++++ pynumaflow/sourcetransformer/__init__.py | 8 ++------ pynumaflow/sourcetransformer/_dtypes.py | 5 ++--- tests/sourcetransform/test_messages.py | 10 ++++++++-- 4 files changed, 19 insertions(+), 11 deletions(-) diff --git a/pynumaflow/_constants.py b/pynumaflow/_constants.py index 80588705..40b368bf 100644 --- a/pynumaflow/_constants.py +++ b/pynumaflow/_constants.py @@ -1,3 +1,5 @@ +from datetime import datetime, timezone, timedelta + MAP_SOCK_PATH = "/var/run/numaflow/map.sock" MAP_STREAM_SOCK_PATH = "/var/run/numaflow/mapstream.sock" REDUCE_SOCK_PATH = "/var/run/numaflow/reduce.sock" @@ -16,3 +18,8 @@ STREAM_EOF = "EOF" DELIMITER = ":" DROP = "U+005C__DROP__" +# Watermark are at millisecond granularity, hence we use epoch(0) - 1 +# to indicate watermark is not available. +# EVENT_TIME_FOR_DROP is used to indicate that the message is dropped, +# hence excluded from watermark calculation. +EVENT_TIME_FOR_DROP = datetime(1970, 1, 1, tzinfo=timezone.utc) - timedelta(milliseconds=1) diff --git a/pynumaflow/sourcetransformer/__init__.py b/pynumaflow/sourcetransformer/__init__.py index 8778ab25..13bd5fc4 100644 --- a/pynumaflow/sourcetransformer/__init__.py +++ b/pynumaflow/sourcetransformer/__init__.py @@ -1,9 +1,4 @@ -from pynumaflow.sourcetransformer._dtypes import ( - Message, - Messages, - Datum, - DROP, -) +from pynumaflow.sourcetransformer._dtypes import Message, Messages, Datum, DROP, EVENT_TIME_FOR_DROP from pynumaflow.sourcetransformer.multiproc_server import MultiProcSourceTransformer from pynumaflow.sourcetransformer.server import SourceTransformer @@ -12,6 +7,7 @@ "Messages", "Datum", "DROP", + "EVENT_TIME_FOR_DROP", "SourceTransformer", "MultiProcSourceTransformer", ] diff --git a/pynumaflow/sourcetransformer/_dtypes.py b/pynumaflow/sourcetransformer/_dtypes.py index de429b86..8a8d12a9 100644 --- a/pynumaflow/sourcetransformer/_dtypes.py +++ b/pynumaflow/sourcetransformer/_dtypes.py @@ -4,7 +4,7 @@ from typing import TypeVar, Callable from warnings import warn -from pynumaflow._constants import DROP +from pynumaflow._constants import DROP, EVENT_TIME_FOR_DROP M = TypeVar("M", bound="Message") Ms = TypeVar("Ms", bound="Messages") @@ -44,7 +44,7 @@ def __init__( @classmethod def to_drop(cls: type[M]) -> M: - return cls(b"", datetime(1, 1, 1, 0, 0), None, [DROP]) + return cls(b"", EVENT_TIME_FOR_DROP, None, [DROP]) @property def event_time(self) -> datetime: @@ -115,7 +115,6 @@ class Datum: value: the payload of the event. event_time: the event time of the event. watermark: the watermark of the event. - metadata: the metadata of the event. >>> # Example usage >>> from pynumaflow.sourcetransformer import Datum >>> from datetime import datetime, timezone diff --git a/tests/sourcetransform/test_messages.py b/tests/sourcetransform/test_messages.py index 62dba76d..34d7af08 100644 --- a/tests/sourcetransform/test_messages.py +++ b/tests/sourcetransform/test_messages.py @@ -1,7 +1,7 @@ import unittest from datetime import datetime, timezone -from pynumaflow.sourcetransformer import Messages, Message, DROP +from pynumaflow.sourcetransformer import Messages, Message, DROP, EVENT_TIME_FOR_DROP def mock_message_t(): @@ -31,12 +31,18 @@ def test_Message_creation(self): self.assertEqual(mock_obj["Tags"], msgt.tags) def test_message_to_drop(self): - mock_obj = {"Keys": [], "Value": b"", "Tags": [DROP]} + mock_obj = { + "Keys": [], + "Value": b"", + "Tags": [DROP], + "EventTime": EVENT_TIME_FOR_DROP, + } msgt = Message(b"", mock_event_time()).to_drop() self.assertEqual(Message, type(msgt)) self.assertEqual(mock_obj["Keys"], msgt.keys) self.assertEqual(mock_obj["Value"], msgt.value) self.assertEqual(mock_obj["Tags"], msgt.tags) + self.assertEqual(mock_obj["EventTime"], msgt.event_time) class TestMessages(unittest.TestCase):