Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for new reduce proto #133

Merged
merged 12 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pynumaflow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@ class NoPublicConstructorError(TypeError):

class SocketError(Exception):
"""To raise an error while creating socket or setting its property"""


class UDFError(Exception):
"""To Raise an error while executing a UDF call"""
45 changes: 40 additions & 5 deletions pynumaflow/proto/reducer/reduce.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,57 @@ service Reduce {
* ReduceRequest represents a request element.
*/
message ReduceRequest {
repeated string keys = 1;
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
// WindowOperation represents a window operation.
// For Aligned windows, OPEN, APPEND and CLOSE events are sent.
message WindowOperation {
enum Event {
OPEN = 0;
CLOSE = 1;
APPEND = 4;
}

Event event = 1;
repeated Window windows = 2;
}

// Payload represents a payload element.
message Payload {
repeated string keys = 1;
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
}

Payload payload = 1;
WindowOperation operation = 2;
}

// Window represents a window.
// Since the client doesn't track keys, window doesn't have a keys field.
message Window {
google.protobuf.Timestamp start = 1;
google.protobuf.Timestamp end = 2;
string slot = 3;
}

/**
* ReduceResponse represents a response element.
*/
message ReduceResponse {
// Result represents a result element. It contains the result of the reduce function.
message Result {
repeated string keys = 1;
bytes value = 2;
repeated string tags = 3;
}
repeated Result results = 1;

Result result = 1;

// window represents a window to which the result belongs.
Window window = 2;

// EOF represents the end of the response for a window.
bool EOF = 3;
}

/**
Expand Down
28 changes: 18 additions & 10 deletions pynumaflow/proto/reducer/reduce_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

110 changes: 108 additions & 2 deletions pynumaflow/reducer/_dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from collections.abc import Iterator, Sequence, Awaitable
from dataclasses import dataclass
from datetime import datetime
from enum import IntEnum
from typing import TypeVar, Callable, Union
from collections.abc import AsyncIterable
from warnings import warn
Expand All @@ -14,6 +15,20 @@
Ms = TypeVar("Ms", bound="Messages")


class WindowOperation(IntEnum):
"""
Enumerate the type of Window operation received.
The operation can be one of the following:
- OPEN: A new window is opened.
- CLOSE: The window is closed.
- APPEND: The window is appended with new data.
"""

OPEN = (0,)
CLOSE = (1,)
APPEND = (4,)


@dataclass(init=False)
class Message:
"""
Expand Down Expand Up @@ -190,6 +205,43 @@ def end(self):
return self._end


@dataclass(init=False)
class ReduceWindow:
"""
Defines the window for a reduce operation which includes the
interval window along with the slot.
"""

__slots__ = ("_window", "_slot")

_window: IntervalWindow
_slot: str

def __init__(self, start: datetime, end: datetime, slot: str = ""):
self._window = IntervalWindow(start=start, end=end)
self._slot = slot

@property
def start(self):
"""Returns the start timestamp of the interval window."""
return self._window.start

@property
def end(self):
"""Returns the end timestamp of the interval window."""
return self._window.end

@property
def slot(self):
"""Returns the slot from the window"""
return self._slot

@property
def window(self):
"""Return the interval window"""
return self._window


@dataclass(init=False)
class Metadata:
"""Defines the metadata for the event."""
Expand All @@ -209,13 +261,21 @@ def interval_window(self):

@dataclass
class ReduceResult:
"""Defines the object to hold the result of reduce computation."""
"""
Defines the object to hold the result of reduce computation.
It contains the following
1. Future: The async awaitable result of computation
2. Iterator: The handle to the input queue
3. Key: The keys of the partition
4. Window: The window of the reduce operation
"""

__slots__ = ("_future", "_iterator", "_key")
__slots__ = ("_future", "_iterator", "_key", "_window")

_future: Task
_iterator: NonBlockingIterator
_key: list[str]
_window: ReduceWindow

@property
def future(self):
Expand All @@ -232,6 +292,52 @@ def keys(self) -> list[str]:
"""Returns the keys of the partition."""
return self._key

@property
def window(self) -> ReduceWindow:
""""""
return self._window


@dataclass
class ReduceRequest:
"""Defines the object to hold a request for the reduce operation."""

__slots__ = ("_operation", "_windows", "_payload")

_operation: WindowOperation
_windows: list[ReduceWindow]
_payload: Datum

def __init__(self, operation: WindowOperation, windows: list[ReduceWindow], payload: Datum):
self._operation = operation
self._windows = windows
self._payload = payload

@property
def operation(self) -> WindowOperation:
"""
Returns the operation of the reduce request.
The operation can be one of the following:
- OPEN: A new window is opened.
- CLOSE: The window is closed.
- APPEND: The window is appended with new data.
"""
return self._operation

@property
def windows(self) -> list[ReduceWindow]:
"""
Returns the windows of the reduce request.
"""
return self._windows

@property
def payload(self) -> Datum:
"""
Returns the payload of the reduce request.
"""
return self._payload


ReduceAsyncCallable = Callable[[list[str], AsyncIterable[Datum], Metadata], Awaitable[Messages]]

Expand Down
Loading
Loading