From 2266c3ae7e0c0f17d9918ccdbaf254906812d73c Mon Sep 17 00:00:00 2001 From: Maxime Carbonneau-Leclerc <3360483+maxi297@users.noreply.github.com> Date: Thu, 7 Nov 2024 12:37:23 -0500 Subject: [PATCH] =?UTF-8?q?bugfix:=20Move=20to=20CustomFormatConcurrentStr?= =?UTF-8?q?eamStateConverter=20on=20epoch=20as=20well=20a=E2=80=A6=20(#483?= =?UTF-8?q?89)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../parsers/model_to_component_factory.py | 20 +++++++---------- .../sources/streams/concurrent/cursor.py | 15 +++++++++++-- .../streams/concurrent/default_stream.py | 2 +- .../datetime_stream_state_converter.py | 2 +- .../test_model_to_component_factory.py | 2 +- .../sources/streams/concurrent/test_cursor.py | 22 +++++++++++++++++++ .../streams/concurrent/test_default_stream.py | 6 ++--- 7 files changed, 49 insertions(+), 20 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 01b76dcdb492..f0eb4f388854 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -174,7 +174,6 @@ from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import ( CustomFormatConcurrentStreamStateConverter, DateTimeStreamStateConverter, - EpochValueConcurrentStreamStateConverter, ) from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction from airbyte_cdk.sources.types import Config @@ -529,16 +528,13 @@ def create_concurrent_cursor_from_datetime_based_cursor( lookback_window = parse_duration(evaluated_lookback_window) connector_state_converter: DateTimeStreamStateConverter - if datetime_format == self.EPOCH_DATETIME_FORMAT: - connector_state_converter = EpochValueConcurrentStreamStateConverter(is_sequential_state=True) - else: - connector_state_converter = CustomFormatConcurrentStreamStateConverter( - datetime_format=datetime_format, - input_datetime_formats=datetime_based_cursor_model.cursor_datetime_formats, - is_sequential_state=True, - cursor_granularity=cursor_granularity, - # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice - ) + connector_state_converter = CustomFormatConcurrentStreamStateConverter( + datetime_format=datetime_format, + input_datetime_formats=datetime_based_cursor_model.cursor_datetime_formats, + is_sequential_state=True, + cursor_granularity=cursor_granularity, + # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice + ) start_date_runtime_value: Union[InterpolatedString, str, MinMaxDatetime] if isinstance(datetime_based_cursor_model.start_datetime, MinMaxDatetimeModel): @@ -579,7 +575,7 @@ def create_concurrent_cursor_from_datetime_based_cursor( ) # When step is not defined, default to a step size from the starting date to the present moment - step_length = datetime.datetime.now(tz=datetime.timezone.utc) - start_date + step_length = datetime.timedelta.max interpolated_step = ( InterpolatedString.create(datetime_based_cursor_model.step, parameters=datetime_based_cursor_model.parameters or {}) if datetime_based_cursor_model.step diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/cursor.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/cursor.py index 6864d563666a..e212693b0270 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/cursor.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/cursor.py @@ -344,7 +344,7 @@ def _split_per_slice_range( return lower = max(lower, self._start) if self._start else lower - if not self._slice_range or lower + self._slice_range >= upper: + if not self._slice_range or self._evaluate_upper_safely(lower, self._slice_range) >= upper: if self._cursor_granularity and not upper_is_end: yield lower, upper - self._cursor_granularity else: @@ -353,7 +353,7 @@ def _split_per_slice_range( stop_processing = False current_lower_boundary = lower while not stop_processing: - current_upper_boundary = min(current_lower_boundary + self._slice_range, upper) + current_upper_boundary = min(self._evaluate_upper_safely(current_lower_boundary, self._slice_range), upper) has_reached_upper_boundary = current_upper_boundary >= upper if self._cursor_granularity and (not upper_is_end or not has_reached_upper_boundary): yield current_lower_boundary, current_upper_boundary - self._cursor_granularity @@ -362,3 +362,14 @@ def _split_per_slice_range( current_lower_boundary = current_upper_boundary if current_upper_boundary >= upper: stop_processing = True + + def _evaluate_upper_safely(self, lower: CursorValueType, step: GapType) -> CursorValueType: + """ + Given that we set the default step at datetime.timedelta.max, we will generate an OverflowError when evaluating the next start_date + This method assumes that users would never enter a step that would generate an overflow. Given that would be the case, the code + would have broken anyway. + """ + try: + return lower + step + except OverflowError: + return self._end_provider() diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/default_stream.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/default_stream.py index 16b1c2777a6b..a48d897e191e 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/default_stream.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/default_stream.py @@ -73,7 +73,7 @@ def as_airbyte_stream(self) -> AirbyteStream: keys = self._primary_key if keys and len(keys) > 0: - stream.source_defined_primary_key = [keys] + stream.source_defined_primary_key = [[key] for key in keys] return stream diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/state_converters/datetime_stream_state_converter.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/state_converters/datetime_stream_state_converter.py index 6447a02d8acd..f6f181e6bbfc 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/state_converters/datetime_stream_state_converter.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/state_converters/datetime_stream_state_converter.py @@ -183,7 +183,7 @@ def __init__( self._parser = DatetimeParser() def output_format(self, timestamp: datetime) -> str: - return timestamp.strftime(self._datetime_format) + return self._parser.format(timestamp, self._datetime_format) def parse_timestamp(self, timestamp: str) -> datetime: for datetime_format in self._input_datetime_formats: diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index e28d60679e32..f47a890bbcaf 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -2667,7 +2667,7 @@ def test_create_concurrent_cursor_from_datetime_based_cursor_all_fields(stream_s "end_time": None, "cursor_granularity": None, "step": None, - }, "_slice_range", datetime.timedelta(days=61), None, id="test_uses_a_single_time_interval_when_no_specified_step_and_granularity"), + }, "_slice_range", datetime.timedelta.max, None, id="test_uses_a_single_time_interval_when_no_specified_step_and_granularity"), ] ) @freezegun.freeze_time("2024-10-01T00:00:00") diff --git a/airbyte-cdk/python/unit_tests/sources/streams/concurrent/test_cursor.py b/airbyte-cdk/python/unit_tests/sources/streams/concurrent/test_cursor.py index 3b255844214c..5da0a55b62d1 100644 --- a/airbyte-cdk/python/unit_tests/sources/streams/concurrent/test_cursor.py +++ b/airbyte-cdk/python/unit_tests/sources/streams/concurrent/test_cursor.py @@ -573,6 +573,28 @@ def test_non_continuous_slices_on_sequential_state_when_close_then_cursor_value_ _A_CURSOR_FIELD_KEY: 7 } + @freezegun.freeze_time(time_to_freeze=datetime.fromtimestamp(10, timezone.utc)) + def test_given_overflowing_slice_gap_when_generate_slices_then_cap_upper_bound_to_end_provider(self) -> None: + a_very_big_slice_range = timedelta.max + cursor = ConcurrentCursor( + _A_STREAM_NAME, + _A_STREAM_NAMESPACE, + {_A_CURSOR_FIELD_KEY: 0}, + self._message_repository, + self._state_manager, + EpochValueConcurrentStreamStateConverter(False), + CursorField(_A_CURSOR_FIELD_KEY), + _SLICE_BOUNDARY_FIELDS, + None, + EpochValueConcurrentStreamStateConverter.get_end_provider(), + _NO_LOOKBACK_WINDOW, + slice_range=a_very_big_slice_range, + ) + + slices = list(cursor.generate_slices()) + + assert slices == [(datetime.fromtimestamp(0, timezone.utc), datetime.fromtimestamp(10, timezone.utc))] + @freezegun.freeze_time(time_to_freeze=datetime(2024, 4, 1, 0, 0, 0, 0, tzinfo=timezone.utc)) @pytest.mark.parametrize( diff --git a/airbyte-cdk/python/unit_tests/sources/streams/concurrent/test_default_stream.py b/airbyte-cdk/python/unit_tests/sources/streams/concurrent/test_default_stream.py index a0d36e1683e4..bb06a7b75e65 100644 --- a/airbyte-cdk/python/unit_tests/sources/streams/concurrent/test_default_stream.py +++ b/airbyte-cdk/python/unit_tests/sources/streams/concurrent/test_default_stream.py @@ -88,7 +88,7 @@ def test_as_airbyte_stream_with_primary_key(self): self._name, json_schema, self._availability_strategy, - ["id"], + ["composite_key_1", "composite_key_2"], self._cursor_field, self._logger, FinalStateCursor(stream_name=self._name, stream_namespace=None, message_repository=self._message_repository), @@ -100,7 +100,7 @@ def test_as_airbyte_stream_with_primary_key(self): supported_sync_modes=[SyncMode.full_refresh], source_defined_cursor=None, default_cursor_field=None, - source_defined_primary_key=[["id"]], + source_defined_primary_key=[["composite_key_1"], ["composite_key_2"]], namespace=None, ) @@ -132,7 +132,7 @@ def test_as_airbyte_stream_with_composite_primary_key(self): supported_sync_modes=[SyncMode.full_refresh], source_defined_cursor=None, default_cursor_field=None, - source_defined_primary_key=[["id_a", "id_b"]], + source_defined_primary_key=[["id_a"], ["id_b"]], namespace=None, )