Skip to content

Commit

Permalink
Fix duplicate state
Browse files Browse the repository at this point in the history
  • Loading branch information
tolik0 committed Oct 18, 2024
1 parent 5cb9927 commit 0bb92c9
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def __init__(self, cursor: Cursor, stream_slices: Iterable[Optional[Mapping[str,
self._read_state_from_cursor = read_state_from_cursor
self._current_slice: Optional[StreamSlice] = None
self._finished_sync = False
self._previous_state = None

def next(self) -> Optional[Mapping[str, Any]]:
try:
Expand All @@ -110,7 +111,13 @@ def observe(self, new_state: Mapping[str, Any]) -> None:
pass

def get_checkpoint(self) -> Optional[Mapping[str, Any]]:
return self._cursor.get_stream_state()
# This is used to avoid sending a duplicate state messages
new_state = self._cursor.get_stream_state()
if new_state != self._previous_state:
self._previous_state = new_state
return new_state
else:
return None

def _find_next_slice(self) -> StreamSlice:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -622,10 +622,11 @@ def test_substream_checkpoints_after_each_parent_partition():
]

expected_parent_state = [
{"start_time": "2024-04-27", "end_time": "2024-05-27"},
{"start_time": "2024-04-27", "end_time": "2024-05-27"},
{"start_time": "2024-05-27", "end_time": "2024-06-27"},
{"start_time": "2024-05-27", "end_time": "2024-06-27"},
{},
{"first_stream": {}},
{"first_stream": {}},
{"first_stream": {"start_time": "2024-04-27", "end_time": "2024-05-27"}},
{"first_stream": {"start_time": "2024-05-27", "end_time": "2024-06-27"}},
]

partition_router = SubstreamPartitionRouter(
Expand Down Expand Up @@ -655,8 +656,9 @@ def test_substream_checkpoints_after_each_parent_partition():
expected_counter = 0
for actual_slice in partition_router.stream_slices():
assert actual_slice == expected_slices[expected_counter]
assert partition_router._parent_state["first_stream"] == expected_parent_state[expected_counter]
assert partition_router._parent_state == expected_parent_state[expected_counter]
expected_counter += 1
assert partition_router._parent_state == expected_parent_state[expected_counter]


@pytest.mark.parametrize(
Expand All @@ -683,12 +685,13 @@ def test_substream_using_resumable_full_refresh_parent_stream(use_incremental_de
]

expected_parent_state = [
{"next_page_token": 2},
{"next_page_token": 2},
{"next_page_token": 3},
{"next_page_token": 3},
{"__ab_full_refresh_sync_complete": True},
{"__ab_full_refresh_sync_complete": True},
{},
{"persona_3_characters": {}},
{"persona_3_characters": {}},
{"persona_3_characters": {"next_page_token": 2}},
{"persona_3_characters": {"next_page_token": 2}},
{"persona_3_characters": {"next_page_token": 3}},
{"persona_3_characters": {"__ab_full_refresh_sync_complete": True}},
]

partition_router = SubstreamPartitionRouter(
Expand Down Expand Up @@ -728,8 +731,10 @@ def test_substream_using_resumable_full_refresh_parent_stream(use_incremental_de
for actual_slice in partition_router.stream_slices():
assert actual_slice == expected_slices[expected_counter]
if use_incremental_dependency:
assert partition_router._parent_state["persona_3_characters"] == expected_parent_state[expected_counter]
assert partition_router._parent_state == expected_parent_state[expected_counter]
expected_counter += 1
if use_incremental_dependency:
assert partition_router._parent_state == expected_parent_state[expected_counter]


@pytest.mark.parametrize(
Expand All @@ -756,12 +761,13 @@ def test_substream_using_resumable_full_refresh_parent_stream_slices(use_increme
]

expected_parent_state = [
{"next_page_token": 2},
{"next_page_token": 2},
{"next_page_token": 3},
{"next_page_token": 3},
{"__ab_full_refresh_sync_complete": True},
{"__ab_full_refresh_sync_complete": True},
{},
{"persona_3_characters": {}},
{"persona_3_characters": {}},
{"persona_3_characters": {"next_page_token": 2}},
{"persona_3_characters": {"next_page_token": 2}},
{"persona_3_characters": {"next_page_token": 3}},
{"persona_3_characters": {"__ab_full_refresh_sync_complete": True}},
]

expected_substream_state = {
Expand Down Expand Up @@ -822,10 +828,10 @@ def test_substream_using_resumable_full_refresh_parent_stream_slices(use_increme
assert actual_slice == expected_parent_slices[expected_counter]
# check for parent state
if use_incremental_dependency:
assert (
substream_cursor_slicer._partition_router._parent_state["persona_3_characters"] == expected_parent_state[expected_counter]
)
assert substream_cursor_slicer._partition_router._parent_state == expected_parent_state[expected_counter]
expected_counter += 1
if use_incremental_dependency:
assert substream_cursor_slicer._partition_router._parent_state == expected_parent_state[expected_counter]

# validate final state for closed substream slices
final_state = substream_cursor_slicer.get_stream_state()
Expand All @@ -841,7 +847,14 @@ def test_substream_using_resumable_full_refresh_parent_stream_slices(use_increme
(
[
ParentStreamConfig(
stream=MockStream([{}], [{"id": 1, "field_1": "value_1", "field_2": {"nested_field": "nested_value_1"}}, {"id": 2, "field_1": "value_2", "field_2": {"nested_field": "nested_value_2"}}], "first_stream"),
stream=MockStream(
[{}],
[
{"id": 1, "field_1": "value_1", "field_2": {"nested_field": "nested_value_1"}},
{"id": 2, "field_1": "value_2", "field_2": {"nested_field": "nested_value_2"}},
],
"first_stream",
),
parent_key="id",
partition_field="first_stream_id",
extra_fields=[["field_1"], ["field_2", "nested_field"]],
Expand All @@ -851,7 +864,7 @@ def test_substream_using_resumable_full_refresh_parent_stream_slices(use_increme
],
[
{"field_1": "value_1", "field_2.nested_field": "nested_value_1"},
{"field_1": "value_2", "field_2.nested_field": "nested_value_2"}
{"field_1": "value_2", "field_2.nested_field": "nested_value_2"},
],
),
(
Expand All @@ -865,16 +878,13 @@ def test_substream_using_resumable_full_refresh_parent_stream_slices(use_increme
config={},
)
],
[
{"field_1": "value_1"},
{"field_1": "value_2"}
],
)
[{"field_1": "value_1"}, {"field_1": "value_2"}],
),
],
ids=[
"test_with_nested_extra_keys",
"test_with_single_extra_key",
]
],
)
def test_substream_partition_router_with_extra_keys(parent_stream_configs, expected_slices):
partition_router = SubstreamPartitionRouter(parent_stream_configs=parent_stream_configs, parameters={}, config={})
Expand Down

0 comments on commit 0bb92c9

Please sign in to comment.