Skip to content

Commit

Permalink
Add local filtering for Global Parent cursor
Browse files Browse the repository at this point in the history
  • Loading branch information
tolik0 committed Jul 11, 2024
1 parent 91f241b commit b382990
Showing 1 changed file with 9 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,18 @@ class ClientSideIncrementalRecordFilterDecorator(RecordFilter):
"""

def __init__(
self, date_time_based_cursor: DatetimeBasedCursor, per_partition_cursor: Optional[PerPartitionCursor] = None, **kwargs: Any
self, date_time_based_cursor: DatetimeBasedCursor, per_partition_cursor: Optional[PerPartitionCursor] = None,
is_global_parent_cursor: bool = False, **kwargs: Any
):
super().__init__(**kwargs)
self._date_time_based_cursor = date_time_based_cursor
self._per_partition_cursor = per_partition_cursor
self._is_global_parent_cursor = is_global_parent_cursor

@property
def _cursor_field(self) -> str:
return self._date_time_based_cursor.cursor_field.eval(self._date_time_based_cursor.config) # type: ignore # eval returns a string in this context
return self._date_time_based_cursor.cursor_field.eval(
self._date_time_based_cursor.config) # type: ignore # eval returns a string in this context

@property
def _start_date_from_config(self) -> datetime.datetime:
Expand Down Expand Up @@ -102,6 +105,10 @@ def _get_state_value(self, stream_state: StreamState, stream_slice: StreamSlice)
# self._per_partition_cursor is the same object that DeclarativeStream uses to save/update stream_state
partition_state = self._per_partition_cursor.select_state(stream_slice=stream_slice)
return partition_state.get(self._cursor_field) if partition_state else None

if self._is_global_parent_cursor:
return stream_state.get("state", {}).get(self._cursor_field)

return stream_state.get(self._cursor_field)

def _get_filter_date(self, state_value: Optional[str]) -> datetime.datetime:
Expand Down

0 comments on commit b382990

Please sign in to comment.