From b3829901f1b0f62199fc2e13010c7d66c8ae87f2 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Thu, 11 Jul 2024 17:52:49 +0300 Subject: [PATCH] Add local filtering for Global Parent cursor --- .../sources/declarative/extractors/record_filter.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_filter.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_filter.py index 21022589bbe9..fd9972bde40a 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_filter.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_filter.py @@ -48,15 +48,18 @@ class ClientSideIncrementalRecordFilterDecorator(RecordFilter): """ def __init__( - self, date_time_based_cursor: DatetimeBasedCursor, per_partition_cursor: Optional[PerPartitionCursor] = None, **kwargs: Any + self, date_time_based_cursor: DatetimeBasedCursor, per_partition_cursor: Optional[PerPartitionCursor] = None, + is_global_parent_cursor: bool = False, **kwargs: Any ): super().__init__(**kwargs) self._date_time_based_cursor = date_time_based_cursor self._per_partition_cursor = per_partition_cursor + self._is_global_parent_cursor = is_global_parent_cursor @property def _cursor_field(self) -> str: - return self._date_time_based_cursor.cursor_field.eval(self._date_time_based_cursor.config) # type: ignore # eval returns a string in this context + return self._date_time_based_cursor.cursor_field.eval( + self._date_time_based_cursor.config) # type: ignore # eval returns a string in this context @property def _start_date_from_config(self) -> datetime.datetime: @@ -102,6 +105,10 @@ def _get_state_value(self, stream_state: StreamState, stream_slice: StreamSlice) # self._per_partition_cursor is the same object that DeclarativeStream uses to save/update stream_state partition_state = self._per_partition_cursor.select_state(stream_slice=stream_slice) return partition_state.get(self._cursor_field) if partition_state else None + + if self._is_global_parent_cursor: + return stream_state.get("state", {}).get(self._cursor_field) + return stream_state.get(self._cursor_field) def _get_filter_date(self, state_value: Optional[str]) -> datetime.datetime: