Skip to content

Commit

Permalink
Merge branch 'main' into uds
Browse files Browse the repository at this point in the history
  • Loading branch information
kohlisid authored Oct 13, 2023
2 parents 901783f + 7b90d4d commit c09f28d
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 11 deletions.
7 changes: 7 additions & 0 deletions pynumaflow/_constants.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from datetime import datetime, timezone, timedelta

MAP_SOCK_PATH = "/var/run/numaflow/map.sock"
MAP_STREAM_SOCK_PATH = "/var/run/numaflow/mapstream.sock"
REDUCE_SOCK_PATH = "/var/run/numaflow/reduce.sock"
Expand All @@ -17,3 +19,8 @@
STREAM_EOF = "EOF"
DELIMITER = ":"
DROP = "U+005C__DROP__"
# Watermark are at millisecond granularity, hence we use epoch(0) - 1
# to indicate watermark is not available.
# EVENT_TIME_FOR_DROP is used to indicate that the message is dropped,
# hence excluded from watermark calculation.
EVENT_TIME_FOR_DROP = datetime(1970, 1, 1, tzinfo=timezone.utc) - timedelta(milliseconds=1)
8 changes: 2 additions & 6 deletions pynumaflow/sourcetransformer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
from pynumaflow.sourcetransformer._dtypes import (
Message,
Messages,
Datum,
DROP,
)
from pynumaflow.sourcetransformer._dtypes import Message, Messages, Datum, DROP, EVENT_TIME_FOR_DROP
from pynumaflow.sourcetransformer.multiproc_server import MultiProcSourceTransformer
from pynumaflow.sourcetransformer.server import SourceTransformer

Expand All @@ -12,6 +7,7 @@
"Messages",
"Datum",
"DROP",
"EVENT_TIME_FOR_DROP",
"SourceTransformer",
"MultiProcSourceTransformer",
]
5 changes: 2 additions & 3 deletions pynumaflow/sourcetransformer/_dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import TypeVar, Callable
from warnings import warn

from pynumaflow._constants import DROP
from pynumaflow._constants import DROP, EVENT_TIME_FOR_DROP

M = TypeVar("M", bound="Message")
Ms = TypeVar("Ms", bound="Messages")
Expand Down Expand Up @@ -44,7 +44,7 @@ def __init__(

@classmethod
def to_drop(cls: type[M]) -> M:
return cls(b"", datetime(1, 1, 1, 0, 0), None, [DROP])
return cls(b"", EVENT_TIME_FOR_DROP, None, [DROP])

@property
def event_time(self) -> datetime:
Expand Down Expand Up @@ -115,7 +115,6 @@ class Datum:
value: the payload of the event.
event_time: the event time of the event.
watermark: the watermark of the event.
metadata: the metadata of the event.
>>> # Example usage
>>> from pynumaflow.sourcetransformer import Datum
>>> from datetime import datetime, timezone
Expand Down
10 changes: 8 additions & 2 deletions tests/sourcetransform/test_messages.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import unittest
from datetime import datetime, timezone

from pynumaflow.sourcetransformer import Messages, Message, DROP
from pynumaflow.sourcetransformer import Messages, Message, DROP, EVENT_TIME_FOR_DROP


def mock_message_t():
Expand Down Expand Up @@ -31,12 +31,18 @@ def test_Message_creation(self):
self.assertEqual(mock_obj["Tags"], msgt.tags)

def test_message_to_drop(self):
mock_obj = {"Keys": [], "Value": b"", "Tags": [DROP]}
mock_obj = {
"Keys": [],
"Value": b"",
"Tags": [DROP],
"EventTime": EVENT_TIME_FOR_DROP,
}
msgt = Message(b"", mock_event_time()).to_drop()
self.assertEqual(Message, type(msgt))
self.assertEqual(mock_obj["Keys"], msgt.keys)
self.assertEqual(mock_obj["Value"], msgt.value)
self.assertEqual(mock_obj["Tags"], msgt.tags)
self.assertEqual(mock_obj["EventTime"], msgt.event_time)


class TestMessages(unittest.TestCase):
Expand Down

0 comments on commit c09f28d

Please sign in to comment.