Skip to content

Commit

Permalink
bugfix: Move to CustomFormatConcurrentStreamStateConverter on epoch a…
Browse files Browse the repository at this point in the history
…s well a… (#48389)
  • Loading branch information
maxi297 authored Nov 7, 2024
1 parent 497be1c commit 2266c3a
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@
from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import (
CustomFormatConcurrentStreamStateConverter,
DateTimeStreamStateConverter,
EpochValueConcurrentStreamStateConverter,
)
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction
from airbyte_cdk.sources.types import Config
Expand Down Expand Up @@ -529,16 +528,13 @@ def create_concurrent_cursor_from_datetime_based_cursor(
lookback_window = parse_duration(evaluated_lookback_window)

connector_state_converter: DateTimeStreamStateConverter
if datetime_format == self.EPOCH_DATETIME_FORMAT:
connector_state_converter = EpochValueConcurrentStreamStateConverter(is_sequential_state=True)
else:
connector_state_converter = CustomFormatConcurrentStreamStateConverter(
datetime_format=datetime_format,
input_datetime_formats=datetime_based_cursor_model.cursor_datetime_formats,
is_sequential_state=True,
cursor_granularity=cursor_granularity,
# type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice
)
connector_state_converter = CustomFormatConcurrentStreamStateConverter(
datetime_format=datetime_format,
input_datetime_formats=datetime_based_cursor_model.cursor_datetime_formats,
is_sequential_state=True,
cursor_granularity=cursor_granularity,
# type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice
)

start_date_runtime_value: Union[InterpolatedString, str, MinMaxDatetime]
if isinstance(datetime_based_cursor_model.start_datetime, MinMaxDatetimeModel):
Expand Down Expand Up @@ -579,7 +575,7 @@ def create_concurrent_cursor_from_datetime_based_cursor(
)

# When step is not defined, default to a step size from the starting date to the present moment
step_length = datetime.datetime.now(tz=datetime.timezone.utc) - start_date
step_length = datetime.timedelta.max
interpolated_step = (
InterpolatedString.create(datetime_based_cursor_model.step, parameters=datetime_based_cursor_model.parameters or {})
if datetime_based_cursor_model.step
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ def _split_per_slice_range(
return

lower = max(lower, self._start) if self._start else lower
if not self._slice_range or lower + self._slice_range >= upper:
if not self._slice_range or self._evaluate_upper_safely(lower, self._slice_range) >= upper:
if self._cursor_granularity and not upper_is_end:
yield lower, upper - self._cursor_granularity
else:
Expand All @@ -353,7 +353,7 @@ def _split_per_slice_range(
stop_processing = False
current_lower_boundary = lower
while not stop_processing:
current_upper_boundary = min(current_lower_boundary + self._slice_range, upper)
current_upper_boundary = min(self._evaluate_upper_safely(current_lower_boundary, self._slice_range), upper)
has_reached_upper_boundary = current_upper_boundary >= upper
if self._cursor_granularity and (not upper_is_end or not has_reached_upper_boundary):
yield current_lower_boundary, current_upper_boundary - self._cursor_granularity
Expand All @@ -362,3 +362,14 @@ def _split_per_slice_range(
current_lower_boundary = current_upper_boundary
if current_upper_boundary >= upper:
stop_processing = True

def _evaluate_upper_safely(self, lower: CursorValueType, step: GapType) -> CursorValueType:
"""
Given that we set the default step at datetime.timedelta.max, we will generate an OverflowError when evaluating the next start_date
This method assumes that users would never enter a step that would generate an overflow. Given that would be the case, the code
would have broken anyway.
"""
try:
return lower + step
except OverflowError:
return self._end_provider()
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def as_airbyte_stream(self) -> AirbyteStream:

keys = self._primary_key
if keys and len(keys) > 0:
stream.source_defined_primary_key = [keys]
stream.source_defined_primary_key = [[key] for key in keys]

return stream

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def __init__(
self._parser = DatetimeParser()

def output_format(self, timestamp: datetime) -> str:
return timestamp.strftime(self._datetime_format)
return self._parser.format(timestamp, self._datetime_format)

def parse_timestamp(self, timestamp: str) -> datetime:
for datetime_format in self._input_datetime_formats:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2667,7 +2667,7 @@ def test_create_concurrent_cursor_from_datetime_based_cursor_all_fields(stream_s
"end_time": None,
"cursor_granularity": None,
"step": None,
}, "_slice_range", datetime.timedelta(days=61), None, id="test_uses_a_single_time_interval_when_no_specified_step_and_granularity"),
}, "_slice_range", datetime.timedelta.max, None, id="test_uses_a_single_time_interval_when_no_specified_step_and_granularity"),
]
)
@freezegun.freeze_time("2024-10-01T00:00:00")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,28 @@ def test_non_continuous_slices_on_sequential_state_when_close_then_cursor_value_
_A_CURSOR_FIELD_KEY: 7
}

@freezegun.freeze_time(time_to_freeze=datetime.fromtimestamp(10, timezone.utc))
def test_given_overflowing_slice_gap_when_generate_slices_then_cap_upper_bound_to_end_provider(self) -> None:
a_very_big_slice_range = timedelta.max
cursor = ConcurrentCursor(
_A_STREAM_NAME,
_A_STREAM_NAMESPACE,
{_A_CURSOR_FIELD_KEY: 0},
self._message_repository,
self._state_manager,
EpochValueConcurrentStreamStateConverter(False),
CursorField(_A_CURSOR_FIELD_KEY),
_SLICE_BOUNDARY_FIELDS,
None,
EpochValueConcurrentStreamStateConverter.get_end_provider(),
_NO_LOOKBACK_WINDOW,
slice_range=a_very_big_slice_range,
)

slices = list(cursor.generate_slices())

assert slices == [(datetime.fromtimestamp(0, timezone.utc), datetime.fromtimestamp(10, timezone.utc))]


@freezegun.freeze_time(time_to_freeze=datetime(2024, 4, 1, 0, 0, 0, 0, tzinfo=timezone.utc))
@pytest.mark.parametrize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def test_as_airbyte_stream_with_primary_key(self):
self._name,
json_schema,
self._availability_strategy,
["id"],
["composite_key_1", "composite_key_2"],
self._cursor_field,
self._logger,
FinalStateCursor(stream_name=self._name, stream_namespace=None, message_repository=self._message_repository),
Expand All @@ -100,7 +100,7 @@ def test_as_airbyte_stream_with_primary_key(self):
supported_sync_modes=[SyncMode.full_refresh],
source_defined_cursor=None,
default_cursor_field=None,
source_defined_primary_key=[["id"]],
source_defined_primary_key=[["composite_key_1"], ["composite_key_2"]],
namespace=None,
)

Expand Down Expand Up @@ -132,7 +132,7 @@ def test_as_airbyte_stream_with_composite_primary_key(self):
supported_sync_modes=[SyncMode.full_refresh],
source_defined_cursor=None,
default_cursor_field=None,
source_defined_primary_key=[["id_a", "id_b"]],
source_defined_primary_key=[["id_a"], ["id_b"]],
namespace=None,
)

Expand Down

0 comments on commit 2266c3a

Please sign in to comment.