From 9e23b3f89bdf5f784a09739e610ca0364a5bda14 Mon Sep 17 00:00:00 2001 From: Brian Lai <51336873+brianjlai@users.noreply.github.com> Date: Thu, 11 Jul 2024 02:53:20 -0400 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20[airbyte-cdk]=20Fix=20bug=20wher?= =?UTF-8?q?e=20substreams=20depending=20on=20an=20RFR=20parent=20stream=20?= =?UTF-8?q?don't=20paginate=20or=20use=20existing=20state=20(#40671)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../substream_partition_router.py | 100 ++- .../airbyte_cdk/sources/streams/core.py | 37 +- .../airbyte_cdk/sources/streams/http/http.py | 29 +- .../test_per_partition_cursor_integration.py | 8 +- .../test_parent_state_stream.py | 606 ++++++++++++------ .../test_substream_partition_router.py | 275 +++++++- .../sources/streams/http/test_http.py | 222 ++++++- 7 files changed, 1011 insertions(+), 266 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py index d0f55b657962..1bdf2dcdc36e 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py @@ -1,16 +1,18 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # - +import logging from dataclasses import InitVar, dataclass from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Mapping, Optional, Union import dpath -from airbyte_cdk.models import AirbyteMessage, SyncMode, Type +from airbyte_cdk.models import AirbyteMessage +from airbyte_cdk.models import Type as MessageType from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState +from airbyte_cdk.utils import AirbyteTracedException if TYPE_CHECKING: from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream @@ -131,40 +133,70 @@ def stream_slices(self) -> Iterable[StreamSlice]: parent_field = parent_stream_config.parent_key.eval(self.config) # type: ignore # parent_key is always casted to an interpolated string partition_field = parent_stream_config.partition_field.eval(self.config) # type: ignore # partition_field is always casted to an interpolated string incremental_dependency = parent_stream_config.incremental_dependency - for parent_stream_slice in parent_stream.stream_slices( - sync_mode=SyncMode.full_refresh, cursor_field=None, stream_state=None - ): - parent_partition = parent_stream_slice.partition if parent_stream_slice else {} - - # we need to read all records for slice to update the parent stream cursor - stream_slices_for_parent = [] - - # only stream_slice param is used in the declarative stream, stream state is set in PerPartitionCursor set_initial_state - for parent_record in parent_stream.read_records( - sync_mode=SyncMode.full_refresh, cursor_field=None, stream_slice=parent_stream_slice, stream_state=None - ): - # Skip non-records (eg AirbyteLogMessage) - if isinstance(parent_record, AirbyteMessage): - if parent_record.type == Type.RECORD: - parent_record = parent_record.record.data - else: - continue - elif isinstance(parent_record, Record): - parent_record = parent_record.data - try: - partition_value = dpath.get(parent_record, parent_field) - except KeyError: - pass + + stream_slices_for_parent = [] + previous_associated_slice = None + + # read_stateless() assumes the parent is not concurrent. This is currently okay since the concurrent CDK does + # not support either substreams or RFR, but something that needs to be considered once we do + for parent_record in parent_stream.read_only_records(): + parent_partition = None + parent_associated_slice = None + # Skip non-records (eg AirbyteLogMessage) + if isinstance(parent_record, AirbyteMessage): + self.logger.warning( + f"Parent stream {parent_stream.name} returns records of type AirbyteMessage. This SubstreamPartitionRouter is not able to checkpoint incremental parent state." + ) + if parent_record.type == MessageType.RECORD: + parent_record = parent_record.record.data else: - stream_slices_for_parent.append( - StreamSlice(partition={partition_field: partition_value, "parent_slice": parent_partition}, cursor_slice={}) + continue + elif isinstance(parent_record, Record): + parent_partition = parent_record.associated_slice.partition if parent_record.associated_slice else {} + parent_associated_slice = parent_record.associated_slice + parent_record = parent_record.data + elif not isinstance(parent_record, Mapping): + # The parent_record should only take the form of a Record, AirbyteMessage, or Mapping. Anything else is invalid + raise AirbyteTracedException(message=f"Parent stream returned records as invalid type {type(parent_record)}") + try: + partition_value = dpath.get(parent_record, parent_field) + except KeyError: + pass + else: + if incremental_dependency: + if previous_associated_slice is None: + previous_associated_slice = parent_associated_slice + elif previous_associated_slice != parent_associated_slice: + # Update the parent state, as parent stream read all record for current slice and state + # is already updated. + # + # When the associated slice of the current record of the parent stream changes, this + # indicates the parent stream has finished processing the current slice and has moved onto + # the next. When this happens, we should update the partition router's current state and + # flush the previous set of collected records and start a new set + # + # Note: One tricky aspect to take note of here is that parent_stream.state will actually + # fetch state of the stream of the previous record's slice NOT the current record's slice. + # This is because in the retriever, we only update stream state after yielding all the + # records. And since we are in the middle of the current slice, parent_stream.state is + # still set to the previous state. + self._parent_state[parent_stream.name] = parent_stream.state + yield from stream_slices_for_parent + + # Reset stream_slices_for_parent after we've flushed parent records for the previous parent slice + stream_slices_for_parent = [] + previous_associated_slice = parent_associated_slice + stream_slices_for_parent.append( + StreamSlice( + partition={partition_field: partition_value, "parent_slice": parent_partition or {}}, cursor_slice={} ) + ) - # update the parent state, as parent stream read all record for current slice and state is already updated - if incremental_dependency: - self._parent_state[parent_stream.name] = parent_stream.state + # A final parent state update and yield of records is needed, so we don't skip records for the final parent slice + if incremental_dependency: + self._parent_state[parent_stream.name] = parent_stream.state - yield from stream_slices_for_parent + yield from stream_slices_for_parent def set_initial_state(self, stream_state: StreamState) -> None: """ @@ -215,3 +247,7 @@ def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: } """ return self._parent_state + + @property + def logger(self) -> logging.Logger: + return logging.getLogger("airbyte.SubstreamPartitionRouter") diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py index 3d2b817c0e41..218095dc5b79 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py @@ -10,7 +10,7 @@ from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union import airbyte_cdk.sources.utils.casing as casing -from airbyte_cdk.models import AirbyteMessage, AirbyteStream, ConfiguredAirbyteStream, SyncMode +from airbyte_cdk.models import AirbyteMessage, AirbyteStream, ConfiguredAirbyteStream, DestinationSyncMode, SyncMode from airbyte_cdk.models import Type as MessageType from airbyte_cdk.sources.streams.checkpoint import ( CheckpointMode, @@ -24,7 +24,7 @@ # list of all possible HTTP methods which can be used for sending of request bodies from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, ResourceSchemaLoader -from airbyte_cdk.sources.utils.slice_logger import SliceLogger +from airbyte_cdk.sources.utils.slice_logger import DebugSliceLogger, SliceLogger from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer from deprecated import deprecated @@ -156,6 +156,7 @@ def read( # type: ignore # ignoring typing for ConnectorStateManager because o except AttributeError: pass + should_checkpoint = bool(state_manager) checkpoint_reader = self._get_checkpoint_reader( logger=logger, cursor_field=cursor_field, sync_mode=sync_mode, stream_state=stream_state ) @@ -193,7 +194,7 @@ def read( # type: ignore # ignoring typing for ConnectorStateManager because o checkpoint_interval = self.state_checkpoint_interval checkpoint = checkpoint_reader.get_checkpoint() - if checkpoint_interval and record_counter % checkpoint_interval == 0 and checkpoint is not None: + if should_checkpoint and checkpoint_interval and record_counter % checkpoint_interval == 0 and checkpoint is not None: airbyte_state_message = self._checkpoint_state(checkpoint, state_manager=state_manager) yield airbyte_state_message @@ -201,17 +202,43 @@ def read( # type: ignore # ignoring typing for ConnectorStateManager because o break self._observe_state(checkpoint_reader) checkpoint_state = checkpoint_reader.get_checkpoint() - if checkpoint_state is not None: + if should_checkpoint and checkpoint_state is not None: airbyte_state_message = self._checkpoint_state(checkpoint_state, state_manager=state_manager) yield airbyte_state_message next_slice = checkpoint_reader.next() checkpoint = checkpoint_reader.get_checkpoint() - if checkpoint is not None: + if should_checkpoint and checkpoint is not None: airbyte_state_message = self._checkpoint_state(checkpoint, state_manager=state_manager) yield airbyte_state_message + def read_only_records(self, state: Optional[Mapping[str, Any]] = None) -> Iterable[StreamData]: + """ + Helper method that performs a read on a stream with an optional state and emits records. If the parent stream supports + incremental, this operation does not update the stream's internal state (if it uses the modern state setter/getter) + or emit state messages. + """ + + configured_stream = ConfiguredAirbyteStream( + stream=AirbyteStream( + name=self.name, + json_schema={}, + supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental], + ), + sync_mode=SyncMode.incremental if state else SyncMode.full_refresh, + destination_sync_mode=DestinationSyncMode.append, + ) + + yield from self.read( + configured_stream=configured_stream, + logger=self.logger, + slice_logger=DebugSliceLogger(), + stream_state=dict(state) if state else {}, # read() expects MutableMapping instead of Mapping which is used more often + state_manager=None, + internal_config=InternalConfig(), + ) + @abstractmethod def read_records( self, diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py index 87452516863e..7599caf2ab36 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py @@ -9,7 +9,8 @@ from urllib.parse import urljoin import requests -from airbyte_cdk.models import FailureType, SyncMode +from airbyte_cdk.models import AirbyteMessage, FailureType, SyncMode +from airbyte_cdk.models import Type as MessageType from airbyte_cdk.sources.message.repository import InMemoryMessageRepository from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy from airbyte_cdk.sources.streams.call_rate import APIBudget @@ -18,6 +19,7 @@ from airbyte_cdk.sources.streams.http.error_handlers import BackoffStrategy, ErrorHandler, HttpStatusErrorHandler from airbyte_cdk.sources.streams.http.error_handlers.response_models import ErrorResolution, ResponseAction from airbyte_cdk.sources.streams.http.http_client import HttpClient +from airbyte_cdk.sources.types import Record from airbyte_cdk.sources.utils.types import JsonType from deprecated import deprecated from requests.auth import AuthBase @@ -380,19 +382,18 @@ def __init__(self, parent: HttpStream, **kwargs: Any): def stream_slices( self, sync_mode: SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None ) -> Iterable[Optional[Mapping[str, Any]]]: - parent_stream_slices = self.parent.stream_slices( - sync_mode=SyncMode.full_refresh, cursor_field=cursor_field, stream_state=stream_state - ) - - # iterate over all parent stream_slices - for stream_slice in parent_stream_slices: - parent_records = self.parent.read_records( - sync_mode=SyncMode.full_refresh, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state - ) - - # iterate over all parent records with current stream_slice - for record in parent_records: - yield {"parent": record} + # read_stateless() assumes the parent is not concurrent. This is currently okay since the concurrent CDK does + # not support either substreams or RFR, but something that needs to be considered once we do + for parent_record in self.parent.read_only_records(stream_state): + # Skip non-records (eg AirbyteLogMessage) + if isinstance(parent_record, AirbyteMessage): + if parent_record.type == MessageType.RECORD: + parent_record = parent_record.record.data + else: + continue + elif isinstance(parent_record, Record): + parent_record = parent_record.data + yield {"parent": parent_record} @deprecated(version="3.0.0", reason="You should set backoff_strategies explicitly in HttpStream.get_backoff_strategy() instead.") diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/incremental/test_per_partition_cursor_integration.py b/airbyte-cdk/python/unit_tests/sources/declarative/incremental/test_per_partition_cursor_integration.py index a10aff2da236..1b3550a99861 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/incremental/test_per_partition_cursor_integration.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/incremental/test_per_partition_cursor_integration.py @@ -232,12 +232,12 @@ def test_substream_without_input_state(): stream_instance = test_source.streams({})[1] - stream_slice = StreamSlice(partition={"parent_id": "1"}, - cursor_slice={"start_time": "2022-01-01", "end_time": "2022-01-31"}) + parent_stream_slice = StreamSlice(partition={}, cursor_slice={"start_time": "2022-01-01", "end_time": "2022-01-31"}) + # This mocks the resulting records of the Rates stream which acts as the parent stream of the SubstreamPartitionRouter being tested with patch.object( - SimpleRetriever, "_read_pages", side_effect=[[Record({"id": "1", CURSOR_FIELD: "2022-01-15"}, stream_slice)], - [Record({"id": "2", CURSOR_FIELD: "2022-01-15"}, stream_slice)]] + SimpleRetriever, "_read_pages", side_effect=[[Record({"id": "1", CURSOR_FIELD: "2022-01-15"}, parent_stream_slice)], + [Record({"id": "2", CURSOR_FIELD: "2022-01-15"}, parent_stream_slice)]] ): slices = list(stream_instance.stream_slices(sync_mode=SYNC_MODE)) assert list(slices) == [ diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/partition_routers/test_parent_state_stream.py b/airbyte-cdk/python/unit_tests/sources/declarative/partition_routers/test_parent_state_stream.py index d394971fbdd7..773ed96571d0 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/partition_routers/test_parent_state_stream.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/partition_routers/test_parent_state_stream.py @@ -20,6 +20,208 @@ ) from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource +SUBSTREAM_MANIFEST: MutableMapping[str, Any] = { + "version": "0.51.42", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["post_comment_votes"]}, + "definitions": { + "basic_authenticator": { + "type": "BasicHttpAuthenticator", + "username": "{{ config['credentials']['email'] + '/token' }}", + "password": "{{ config['credentials']['api_token'] }}", + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.example.com", + "http_method": "GET", + "authenticator": "#/definitions/basic_authenticator", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": { + "type": "DpathExtractor", + "field_path": ["{{ parameters.get('data_path') or parameters['name'] }}"], + }, + "schema_normalization": "Default", + }, + "paginator": { + "type": "DefaultPaginator", + "page_size_option": {"type": "RequestOption", "field_name": "per_page", "inject_into": "request_parameter"}, + "pagination_strategy": { + "type": "CursorPagination", + "page_size": 100, + "cursor_value": "{{ response.get('next_page', {}) }}", + "stop_condition": "{{ not response.get('next_page', {}) }}", + }, + "page_token_option": {"type": "RequestPath"}, + }, + }, + "cursor_incremental_sync": { + "type": "DatetimeBasedCursor", + "cursor_datetime_formats": ["%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S%z"], + "datetime_format": "%Y-%m-%dT%H:%M:%SZ", + "cursor_field": "{{ parameters.get('cursor_field', 'updated_at') }}", + "start_datetime": {"datetime": "{{ config.get('start_date')}}"}, + "start_time_option": {"inject_into": "request_parameter", "field_name": "start_time", "type": "RequestOption"}, + }, + "posts_stream": { + "type": "DeclarativeStream", + "name": "posts", + "primary_key": ["id"], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "id": {"type": "integer"}, + "updated_at": {"type": "string", "format": "date-time"}, + "title": {"type": "string"}, + "content": {"type": "string"}, + }, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.example.com", + "path": "/community/posts", + "http_method": "GET", + "authenticator": "#/definitions/basic_authenticator", + }, + "record_selector": "#/definitions/retriever/record_selector", + "paginator": "#/definitions/retriever/paginator", + }, + "incremental_sync": "#/definitions/cursor_incremental_sync", + "$parameters": { + "name": "posts", + "path": "community/posts", + "data_path": "posts", + "cursor_field": "updated_at", + "primary_key": "id", + }, + }, + "post_comments_stream": { + "type": "DeclarativeStream", + "name": "post_comments", + "primary_key": ["id"], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "id": {"type": "integer"}, + "updated_at": {"type": "string", "format": "date-time"}, + "post_id": {"type": "integer"}, + "comment": {"type": "string"}, + }, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.example.com", + "path": "/community/posts/{{ stream_slice.id }}/comments", + "http_method": "GET", + "authenticator": "#/definitions/basic_authenticator", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": ["comments"]}, + "record_filter": { + "condition": "{{ record['updated_at'] >= stream_state.get('updated_at', config.get('start_date')) }}" + }, + }, + "paginator": "#/definitions/retriever/paginator", + "partition_router": { + "type": "SubstreamPartitionRouter", + "parent_stream_configs": [ + { + "stream": "#/definitions/posts_stream", + "parent_key": "id", + "partition_field": "id", + "incremental_dependency": True, + } + ], + }, + }, + "incremental_sync": { + "type": "DatetimeBasedCursor", + "cursor_datetime_formats": ["%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S%z"], + "datetime_format": "%Y-%m-%dT%H:%M:%SZ", + "cursor_field": "{{ parameters.get('cursor_field', 'updated_at') }}", + "start_datetime": {"datetime": "{{ config.get('start_date') }}"}, + }, + "$parameters": { + "name": "post_comments", + "path": "community/posts/{{ stream_slice.id }}/comments", + "data_path": "comments", + "cursor_field": "updated_at", + "primary_key": "id", + }, + }, + "post_comment_votes_stream": { + "type": "DeclarativeStream", + "name": "post_comment_votes", + "primary_key": ["id"], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "id": {"type": "integer"}, + "created_at": {"type": "string", "format": "date-time"}, + "comment_id": {"type": "integer"}, + "vote": {"type": "number"}, + }, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.example.com", + "path": "/community/posts/{{ stream_slice.parent_slice.id }}/comments/{{ stream_slice.id }}/votes", + "http_method": "GET", + "authenticator": "#/definitions/basic_authenticator", + }, + "record_selector": "#/definitions/retriever/record_selector", + "paginator": "#/definitions/retriever/paginator", + "partition_router": { + "type": "SubstreamPartitionRouter", + "parent_stream_configs": [ + { + "stream": "#/definitions/post_comments_stream", + "parent_key": "id", + "partition_field": "id", + "incremental_dependency": True, + } + ], + }, + }, + "incremental_sync": "#/definitions/cursor_incremental_sync", + "$parameters": { + "name": "post_comment_votes", + "path": "community/posts/{{ stream_slice.parent_slice.id }}/comments/{{ stream_slice.id }}/votes", + "data_path": "votes", + "cursor_field": "created_at", + "primary_key": "id", + }, + }, + }, + "streams": [ + {"$ref": "#/definitions/posts_stream"}, + {"$ref": "#/definitions/post_comments_stream"}, + {"$ref": "#/definitions/post_comment_votes_stream"}, + ], +} + def _run_read( manifest: Mapping[str, Any], @@ -46,207 +248,7 @@ def _run_read( [ ( "test_incremental_parent_state", - { - "version": "0.51.42", - "type": "DeclarativeSource", - "check": {"type": "CheckStream", "stream_names": ["post_comment_votes"]}, - "definitions": { - "basic_authenticator": { - "type": "BasicHttpAuthenticator", - "username": "{{ config['credentials']['email'] + '/token' }}", - "password": "{{ config['credentials']['api_token'] }}", - }, - "retriever": { - "type": "SimpleRetriever", - "requester": { - "type": "HttpRequester", - "url_base": "https://api.example.com", - "http_method": "GET", - "authenticator": "#/definitions/basic_authenticator", - }, - "record_selector": { - "type": "RecordSelector", - "extractor": { - "type": "DpathExtractor", - "field_path": ["{{ parameters.get('data_path') or parameters['name'] }}"], - }, - "schema_normalization": "Default", - }, - "paginator": { - "type": "DefaultPaginator", - "page_size_option": {"type": "RequestOption", "field_name": "per_page", "inject_into": "request_parameter"}, - "pagination_strategy": { - "type": "CursorPagination", - "page_size": 100, - "cursor_value": "{{ response.get('next_page', {}) }}", - "stop_condition": "{{ not response.get('next_page', {}) }}", - }, - "page_token_option": {"type": "RequestPath"}, - }, - }, - "cursor_incremental_sync": { - "type": "DatetimeBasedCursor", - "cursor_datetime_formats": ["%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S%z"], - "datetime_format": "%Y-%m-%dT%H:%M:%SZ", - "cursor_field": "{{ parameters.get('cursor_field', 'updated_at') }}", - "start_datetime": {"datetime": "{{ config.get('start_date')}}"}, - "start_time_option": {"inject_into": "request_parameter", "field_name": "start_time", "type": "RequestOption"}, - }, - "posts_stream": { - "type": "DeclarativeStream", - "name": "posts", - "primary_key": ["id"], - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": { - "$schema": "http://json-schema.org/schema#", - "properties": { - "id": {"type": "integer"}, - "updated_at": {"type": "string", "format": "date-time"}, - "title": {"type": "string"}, - "content": {"type": "string"}, - }, - "type": "object", - }, - }, - "retriever": { - "type": "SimpleRetriever", - "requester": { - "type": "HttpRequester", - "url_base": "https://api.example.com", - "path": "/community/posts", - "http_method": "GET", - "authenticator": "#/definitions/basic_authenticator", - }, - "record_selector": "#/definitions/retriever/record_selector", - "paginator": "#/definitions/retriever/paginator", - }, - "incremental_sync": "#/definitions/cursor_incremental_sync", - "$parameters": { - "name": "posts", - "path": "community/posts", - "data_path": "posts", - "cursor_field": "updated_at", - "primary_key": "id", - }, - }, - "post_comments_stream": { - "type": "DeclarativeStream", - "name": "post_comments", - "primary_key": ["id"], - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": { - "$schema": "http://json-schema.org/schema#", - "properties": { - "id": {"type": "integer"}, - "updated_at": {"type": "string", "format": "date-time"}, - "post_id": {"type": "integer"}, - "comment": {"type": "string"}, - }, - "type": "object", - }, - }, - "retriever": { - "type": "SimpleRetriever", - "requester": { - "type": "HttpRequester", - "url_base": "https://api.example.com", - "path": "/community/posts/{{ stream_slice.id }}/comments", - "http_method": "GET", - "authenticator": "#/definitions/basic_authenticator", - }, - "record_selector": { - "type": "RecordSelector", - "extractor": {"type": "DpathExtractor", "field_path": ["comments"]}, - "record_filter": { - "condition": "{{ record['updated_at'] >= stream_state.get('updated_at', config.get('start_date')) }}" - }, - }, - "paginator": "#/definitions/retriever/paginator", - "partition_router": { - "type": "SubstreamPartitionRouter", - "parent_stream_configs": [ - { - "stream": "#/definitions/posts_stream", - "parent_key": "id", - "partition_field": "id", - "incremental_dependency": True, - } - ], - }, - }, - "incremental_sync": { - "type": "DatetimeBasedCursor", - "cursor_datetime_formats": ["%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S%z"], - "datetime_format": "%Y-%m-%dT%H:%M:%SZ", - "cursor_field": "{{ parameters.get('cursor_field', 'updated_at') }}", - "start_datetime": {"datetime": "{{ config.get('start_date') }}"}, - }, - "$parameters": { - "name": "post_comments", - "path": "community/posts/{{ stream_slice.id }}/comments", - "data_path": "comments", - "cursor_field": "updated_at", - "primary_key": "id", - }, - }, - "post_comment_votes_stream": { - "type": "DeclarativeStream", - "name": "post_comment_votes", - "primary_key": ["id"], - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": { - "$schema": "http://json-schema.org/schema#", - "properties": { - "id": {"type": "integer"}, - "created_at": {"type": "string", "format": "date-time"}, - "comment_id": {"type": "integer"}, - "vote": {"type": "number"}, - }, - "type": "object", - }, - }, - "retriever": { - "type": "SimpleRetriever", - "requester": { - "type": "HttpRequester", - "url_base": "https://api.example.com", - "path": "/community/posts/{{ stream_slice.parent_slice.id }}/comments/{{ stream_slice.id }}/votes", - "http_method": "GET", - "authenticator": "#/definitions/basic_authenticator", - }, - "record_selector": "#/definitions/retriever/record_selector", - "paginator": "#/definitions/retriever/paginator", - "partition_router": { - "type": "SubstreamPartitionRouter", - "parent_stream_configs": [ - { - "stream": "#/definitions/post_comments_stream", - "parent_key": "id", - "partition_field": "id", - "incremental_dependency": True, - } - ], - }, - }, - "incremental_sync": "#/definitions/cursor_incremental_sync", - "$parameters": { - "name": "post_comment_votes", - "path": "community/posts/{{ stream_slice.parent_slice.id }}/comments/{{ stream_slice.id }}/votes", - "data_path": "votes", - "cursor_field": "created_at", - "primary_key": "id", - }, - }, - }, - "streams": [ - {"$ref": "#/definitions/posts_stream"}, - {"$ref": "#/definitions/post_comments_stream"}, - {"$ref": "#/definitions/post_comment_votes_stream"}, - ], - }, + SUBSTREAM_MANIFEST, [ # Fetch the first page of posts ( @@ -423,5 +425,205 @@ def test_incremental_parent_state(test_name, manifest, mock_requests, expected_r assert output_data == expected_records final_state = [message.state.stream.stream_state.dict() for message in output if message.state] - print("final_state", final_state[-1]) + assert final_state[-1] == expected_state + + +@pytest.mark.parametrize( + "test_name, manifest, mock_requests, expected_records, initial_state, expected_state", + [ + ( + "test_incremental_parent_state", + SUBSTREAM_MANIFEST, + [ + # Fetch the first page of posts + ( + "https://api.example.com/community/posts?per_page=100&start_time=2024-01-01T00:00:01Z", + { + "posts": [{"id": 1, "updated_at": "2024-01-30T00:00:00Z"}, {"id": 2, "updated_at": "2024-01-29T00:00:00Z"}], + "next_page": "https://api.example.com/community/posts?per_page=100&start_time=2024-01-05T00:00:00Z&page=2", + }, + ), + # Fetch the second page of posts + ( + "https://api.example.com/community/posts?per_page=100&start_time=2024-01-01T00:00:01Z&page=2", + {"posts": [{"id": 3, "updated_at": "2024-01-28T00:00:00Z"}]}, + ), + # Fetch the first page of comments for post 1 + ( + "https://api.example.com/community/posts/1/comments?per_page=100", + { + "comments": [ + {"id": 9, "post_id": 1, "updated_at": "2023-01-01T00:00:00Z"}, + {"id": 10, "post_id": 1, "updated_at": "2024-01-25T00:00:00Z"}, + {"id": 11, "post_id": 1, "updated_at": "2024-01-24T00:00:00Z"}, + ], + "next_page": "https://api.example.com/community/posts/1/comments?per_page=100&page=2", + }, + ), + # Fetch the second page of comments for post 1 + ( + "https://api.example.com/community/posts/1/comments?per_page=100&page=2", + {"comments": [{"id": 12, "post_id": 1, "updated_at": "2024-01-23T00:00:00Z"}]}, + ), + # Fetch the first page of votes for comment 10 of post 1 + ( + "https://api.example.com/community/posts/1/comments/10/votes?per_page=100&start_time=2024-01-02T00:00:00Z", + { + "votes": [{"id": 100, "comment_id": 10, "created_at": "2024-01-15T00:00:00Z"}], + "next_page": "https://api.example.com/community/posts/1/comments/10/votes?per_page=100&page=2&start_time=2024-01-01T00:00:01Z", + }, + ), + # Fetch the second page of votes for comment 10 of post 1 + ( + "https://api.example.com/community/posts/1/comments/10/votes?per_page=100&page=2&start_time=2024-01-01T00:00:01Z", + {"votes": [{"id": 101, "comment_id": 10, "created_at": "2024-01-14T00:00:00Z"}]}, + ), + # Fetch the first page of votes for comment 11 of post 1 + ( + "https://api.example.com/community/posts/1/comments/11/votes?per_page=100&start_time=2024-01-03T00:00:00Z", + {"votes": [{"id": 102, "comment_id": 11, "created_at": "2024-01-13T00:00:00Z"}]}, + ), + # Fetch the first page of votes for comment 12 of post 1 + ("https://api.example.com/community/posts/1/comments/12/votes?per_page=100&start_time=2024-01-01T00:00:01Z", {"votes": []}), + # Fetch the first page of comments for post 2 + ( + "https://api.example.com/community/posts/2/comments?per_page=100", + { + "comments": [{"id": 20, "post_id": 2, "updated_at": "2024-01-22T00:00:00Z"}], + "next_page": "https://api.example.com/community/posts/2/comments?per_page=100&page=2", + }, + ), + # Fetch the second page of comments for post 2 + ( + "https://api.example.com/community/posts/2/comments?per_page=100&page=2", + {"comments": [{"id": 21, "post_id": 2, "updated_at": "2024-01-21T00:00:00Z"}]}, + ), + # Fetch the first page of votes for comment 20 of post 2 + ( + "https://api.example.com/community/posts/2/comments/20/votes?per_page=100&start_time=2024-01-01T00:00:01Z", + {"votes": [{"id": 200, "comment_id": 20, "created_at": "2024-01-12T00:00:00Z"}]}, + ), + # Fetch the first page of votes for comment 21 of post 2 + ( + "https://api.example.com/community/posts/2/comments/21/votes?per_page=100&start_time=2024-01-01T00:00:01Z", + {"votes": [{"id": 201, "comment_id": 21, "created_at": "2024-01-12T00:00:15Z"}]}, + ), + # Fetch the first page of comments for post 3 + ( + "https://api.example.com/community/posts/3/comments?per_page=100", + {"comments": [{"id": 30, "post_id": 3, "updated_at": "2024-01-09T00:00:00Z"}]}, + ), + # Fetch the first page of votes for comment 30 of post 3 + ( + "https://api.example.com/community/posts/3/comments/30/votes?per_page=100", + {"votes": [{"id": 300, "comment_id": 30, "created_at": "2024-01-10T00:00:00Z"}]}, + ), + ], + # Expected records + [ + {"id": 100, "comment_id": 10, "created_at": "2024-01-15T00:00:00Z"}, + {"id": 101, "comment_id": 10, "created_at": "2024-01-14T00:00:00Z"}, + {"id": 102, "comment_id": 11, "created_at": "2024-01-13T00:00:00Z"}, + {"id": 200, "comment_id": 20, "created_at": "2024-01-12T00:00:00Z"}, + {"id": 201, "comment_id": 21, "created_at": "2024-01-12T00:00:15Z"}, + {"id": 300, "comment_id": 30, "created_at": "2024-01-10T00:00:00Z"}, + ], + # Initial state + [ + AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name="post_comment_votes", namespace=None), + stream_state=AirbyteStateBlob.parse_obj( + { + # This should not happen since parent state is disabled, but I've added this to validate that and + # incoming parent_state is ignored when the parent stream's incremental_dependency is disabled + "parent_state": { + "post_comments": { + "states": [ + {"partition": {"id": 1, "parent_slice": {}}, "cursor": {"updated_at": "2023-01-04T00:00:00Z"}} + ], + "parent_state": {"posts": {"updated_at": "2024-01-05T00:00:00Z"}}, + } + }, + "states": [ + { + "partition": {"id": 10, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-02T00:00:00Z"}, + }, + { + "partition": {"id": 11, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-03T00:00:00Z"}, + }, + ], + } + ), + ), + ) + ], + # Expected state + { + "states": [ + { + "partition": {"id": 10, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-15T00:00:00Z"}, + }, + { + "partition": {"id": 11, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-13T00:00:00Z"}, + }, + { + "partition": {"id": 20, "parent_slice": {"id": 2, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-12T00:00:00Z"}, + }, + { + "partition": {"id": 21, "parent_slice": {"id": 2, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-12T00:00:15Z"}, + }, + { + "partition": {"id": 30, "parent_slice": {"id": 3, "parent_slice": {}}}, + "cursor": {"created_at": "2024-01-10T00:00:00Z"}, + }, + ], + }, + ), + ], +) +def test_incremental_parent_state_no_incremental_dependency( + test_name, + manifest, + mock_requests, + expected_records, + initial_state, + expected_state +): + """ + This is a pretty complicated test that syncs a low-code connector stream with three levels of substreams + - posts: (ids: 1, 2, 3) + - post comments: (parent post 1 with ids: 9, 10, 11, 12; parent post 2 with ids: 20, 21; parent post 3 with id: 30) + - post comment votes: (parent comment 10 with ids: 100, 101; parent comment 11 with id: 102; + parent comment 20 with id: 200; parent comment 21 with id: 201, parent comment 30 with id: 300) + + By setting incremental_dependency to false, parent streams will not use the incoming state and will not update state. + The post_comment_votes substream is incremental and will emit state messages We verify this by ensuring that mocked + parent stream requests use the incoming config as query parameters and the substream state messages does not + contain parent stream state. + """ + + _stream_name = "post_comment_votes" + config = {"start_date": "2024-01-01T00:00:01Z", "credentials": {"email": "email", "api_token": "api_token"}} + + # Disable incremental_dependency + manifest["definitions"]["post_comments_stream"]["retriever"]["partition_router"]["parent_stream_configs"][0]["incremental_dependency"] = False + manifest["definitions"]["post_comment_votes_stream"]["retriever"]["partition_router"]["parent_stream_configs"][0]["incremental_dependency"] = False + + with requests_mock.Mocker() as m: + for url, response in mock_requests: + m.get(url, json=response) + + output = _run_read(manifest, config, _stream_name, initial_state) + output_data = [message.record.data for message in output if message.record] + + assert output_data == expected_records + final_state = [message.state.stream.stream_state.dict() for message in output if message.state] assert final_state[-1] == expected_state diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py b/airbyte-cdk/python/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py index 9f25c627a76e..8f2f9dbe0a38 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py @@ -2,15 +2,19 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # -from typing import Any, Iterable, List, Mapping, Optional, Union +from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union import pytest as pytest from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, SyncMode, Type from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream +from airbyte_cdk.sources.declarative.incremental import ResumableFullRefreshCursor from airbyte_cdk.sources.declarative.incremental.per_partition_cursor import StreamSlice +from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import ParentStreamConfig, SubstreamPartitionRouter from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType +from airbyte_cdk.sources.streams.checkpoint import Cursor from airbyte_cdk.sources.types import Record +from airbyte_cdk.utils import AirbyteTracedException parent_records = [{"id": 1, "data": "data1"}, {"id": 2, "data": "data2"}] more_records = [{"id": 10, "data": "data10", "slice": "second_parent"}, {"id": 20, "data": "data20", "slice": "second_parent"}] @@ -31,12 +35,18 @@ class MockStream(DeclarativeStream): - def __init__(self, slices, records, name, cursor_field=None): + def __init__(self, slices, records, name, cursor_field="", cursor=None): + self.config = {} self._slices = slices self._records = records - self._cursor_field = cursor_field + self._stream_cursor_field = ( + InterpolatedString.create(cursor_field, parameters={}) + if isinstance(cursor_field, str) + else cursor_field + ) self._name = name self._state = {"states": []} + self._cursor = cursor @property def name(self) -> str: @@ -54,6 +64,13 @@ def state(self) -> Mapping[str, Any]: def state(self, value: Mapping[str, Any]) -> None: self._state = value + @property + def is_resumable(self) -> bool: + return bool(self._cursor) + + def get_cursor(self) -> Optional[Cursor]: + return self._cursor + def stream_slices( self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None ) -> Iterable[Optional[StreamSlice]]: @@ -76,13 +93,71 @@ def read_records( if not stream_slice: result = self._records else: - result = [r for r in self._records if r["slice"] == stream_slice["slice"]] + result = [Record(data=r, associated_slice=stream_slice) for r in self._records if r["slice"] == stream_slice["slice"]] yield from result # Update the state only after reading the full slice - if stream_slice and self._cursor_field and result: - self._state["states"].append({self._cursor_field: result[-1][self._cursor_field], "partition": stream_slice["slice"]}) + cursor_field = self._stream_cursor_field.eval(config=self.config) + if stream_slice and cursor_field and result: + self._state["states"].append({cursor_field: result[-1][cursor_field], "partition": stream_slice["slice"]}) + + +class MockIncrementalStream(MockStream): + def __init__(self, slices, records, name, cursor_field="", cursor=None, date_ranges=None): + super().__init__(slices, records, name, cursor_field, cursor) + if date_ranges is None: + date_ranges = [] + self._date_ranges = date_ranges + self._state = {} + + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Mapping[str, Any]]: + results = [record for record in self._records if stream_slice["start_time"] <= record["updated_at"] <= stream_slice["end_time"]] + print(f"about to emit {results}") + yield from results + print(f"setting state to {stream_slice}") + self._state = stream_slice + + +class MockResumableFullRefreshStream(MockStream): + def __init__(self, slices, name, cursor_field="", cursor=None, record_pages: Optional[List[List[Mapping[str, Any]]]] = None): + super().__init__(slices, [], name, cursor_field, cursor) + if record_pages: + self._record_pages = record_pages + else: + self._record_pages = [] + self._state: MutableMapping[str, Any] = {} + + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Mapping[str, Any]]: + page_number = self.state.get("next_page_token") or 1 + yield from self._record_pages[page_number - 1] + + cursor = self.get_cursor() + if page_number < len(self._record_pages): + cursor.close_slice(StreamSlice(cursor_slice={"next_page_token": page_number + 1}, partition={})) + else: + cursor.close_slice(StreamSlice(cursor_slice={"__ab_full_refresh_sync_complete": True}, partition={})) + + @property + def state(self) -> Mapping[str, Any]: + cursor = self.get_cursor() + return cursor.get_stream_state() if cursor else {} + + @state.setter + def state(self, value: Mapping[str, Any]) -> None: + self._state = value @pytest.mark.parametrize( @@ -218,7 +293,7 @@ def read_records( "test_dpath_extraction", ], ) -def test_substream_slicer(parent_stream_configs, expected_slices): +def test_substream_partition_router(parent_stream_configs, expected_slices): if expected_slices is None: try: SubstreamPartitionRouter(parent_stream_configs=parent_stream_configs, parameters={}, config={}) @@ -230,6 +305,23 @@ def test_substream_slicer(parent_stream_configs, expected_slices): assert slices == expected_slices +def test_substream_partition_router_invalid_parent_record_type(): + partition_router = SubstreamPartitionRouter( + parent_stream_configs=[ParentStreamConfig( + stream=MockStream([{}], [list()], "first_stream"), + parent_key="id", + partition_field="first_stream_id", + parameters={}, + config={}, + )], + parameters={}, + config={} + ) + + with pytest.raises(AirbyteTracedException): + _ = [s for s in partition_router.stream_slices()] + + @pytest.mark.parametrize( "parent_stream_request_parameters, expected_req_params, expected_headers, expected_body_json, expected_body_data", [ @@ -465,3 +557,172 @@ def test_given_record_is_record_object_when_stream_slices_then_use_record_data() slices = list(partition_router.stream_slices()) assert slices == [{"partition_field": "record value", "parent_slice": parent_slice}] + + +def test_substream_using_incremental_parent_stream(): + mock_slices = [ + StreamSlice(cursor_slice={"start_time": "2024-04-27", "end_time": "2024-05-27"}, partition={}), + StreamSlice(cursor_slice={"start_time": "2024-05-27", "end_time": "2024-06-27"}, partition={}), + ] + + expected_slices = [ + {"partition_field": "may_record_0", "parent_slice": {}}, + {"partition_field": "may_record_1", "parent_slice": {}}, + {"partition_field": "jun_record_0", "parent_slice": {}}, + {"partition_field": "jun_record_1", "parent_slice": {}}, + ] + + partition_router = SubstreamPartitionRouter( + parent_stream_configs=[ + ParentStreamConfig( + stream=MockIncrementalStream( + slices=mock_slices, + records=[ + Record({"id": "may_record_0", "updated_at": "2024-05-15"}, mock_slices[0]), + Record({"id": "may_record_1", "updated_at": "2024-05-16"}, mock_slices[0]), + Record({"id": "jun_record_0", "updated_at": "2024-06-15"}, mock_slices[1]), + Record({"id": "jun_record_1", "updated_at": "2024-06-16"}, mock_slices[1]), + ], + name="first_stream", + ), + parent_key="id", + partition_field="partition_field", + parameters={}, + config={}, + ) + ], + parameters={}, + config={}, + ) + + actual_slices = list(partition_router.stream_slices()) + assert actual_slices == expected_slices + + +def test_substream_checkpoints_after_each_parent_partition(): + """ + This test validates the specific behavior that when getting all parent records for a substream, + we are still updating state so that the parent stream's state is updated after we finish getting all + parent records for the parent slice (not just the substream) + """ + mock_slices = [ + StreamSlice(cursor_slice={"start_time": "2024-04-27", "end_time": "2024-05-27"}, partition={}), + StreamSlice(cursor_slice={"start_time": "2024-05-27", "end_time": "2024-06-27"}, partition={}), + ] + + expected_slices = [ + {"partition_field": "may_record_0", "parent_slice": {}}, + {"partition_field": "may_record_1", "parent_slice": {}}, + {"partition_field": "jun_record_0", "parent_slice": {}}, + {"partition_field": "jun_record_1", "parent_slice": {}}, + ] + + expected_parent_state = [ + {"start_time": "2024-04-27", "end_time": "2024-05-27"}, + {"start_time": "2024-04-27", "end_time": "2024-05-27"}, + {"start_time": "2024-05-27", "end_time": "2024-06-27"}, + {"start_time": "2024-05-27", "end_time": "2024-06-27"}, + ] + + partition_router = SubstreamPartitionRouter( + parent_stream_configs=[ + ParentStreamConfig( + stream=MockIncrementalStream( + slices=mock_slices, + records=[ + Record({"id": "may_record_0", "updated_at": "2024-05-15"}, mock_slices[0]), + Record({"id": "may_record_1", "updated_at": "2024-05-16"}, mock_slices[0]), + Record({"id": "jun_record_0", "updated_at": "2024-06-15"}, mock_slices[1]), + Record({"id": "jun_record_1", "updated_at": "2024-06-16"}, mock_slices[1]), + ], + name="first_stream", + ), + incremental_dependency=True, + parent_key="id", + partition_field="partition_field", + parameters={}, + config={}, + ) + ], + parameters={}, + config={}, + ) + + expected_counter = 0 + for actual_slice in partition_router.stream_slices(): + assert actual_slice == expected_slices[expected_counter] + assert partition_router._parent_state["first_stream"] == expected_parent_state[expected_counter] + expected_counter += 1 + + +@pytest.mark.parametrize( + "use_incremental_dependency", + [ + pytest.param(False, id="test_resumable_full_refresh_stream_without_parent_checkpoint"), + pytest.param(True, id="test_resumable_full_refresh_stream_with_use_incremental_dependency_for_parent_checkpoint"), + ] +) +def test_substream_using_resumable_full_refresh_parent_stream(use_incremental_dependency): + mock_slices = [ + StreamSlice(cursor_slice={}, partition={}), + StreamSlice(cursor_slice={"next_page_token": 2}, partition={}), + StreamSlice(cursor_slice={"next_page_token": 3}, partition={}), + ] + + expected_slices = [ + {"partition_field": "makoto_yuki", "parent_slice": {}}, + {"partition_field": "yukari_takeba", "parent_slice": {}}, + {"partition_field": "mitsuru_kirijo", "parent_slice": {}}, + {"partition_field": "akihiko_sanada", "parent_slice": {}}, + {"partition_field": "junpei_iori", "parent_slice": {}}, + {"partition_field": "fuuka_yamagishi", "parent_slice": {}}, + ] + + expected_parent_state = [ + {"next_page_token": 2}, + {"next_page_token": 2}, + {"next_page_token": 3}, + {"next_page_token": 3}, + {'__ab_full_refresh_sync_complete': True}, + {'__ab_full_refresh_sync_complete': True}, + ] + + partition_router = SubstreamPartitionRouter( + parent_stream_configs=[ + ParentStreamConfig( + stream=MockResumableFullRefreshStream( + slices=[StreamSlice(partition={}, cursor_slice={})], + cursor=ResumableFullRefreshCursor(parameters={}), + record_pages=[ + [ + Record(data={"id": "makoto_yuki"}, associated_slice=mock_slices[0]), + Record(data={"id": "yukari_takeba"}, associated_slice=mock_slices[0]), + ], + [ + Record(data={"id": "mitsuru_kirijo"}, associated_slice=mock_slices[1]), + Record(data={"id": "akihiko_sanada"}, associated_slice=mock_slices[1]), + ], + [ + Record(data={"id": "junpei_iori"}, associated_slice=mock_slices[2]), + Record(data={"id": "fuuka_yamagishi"}, associated_slice=mock_slices[2]), + ], + ], + name="persona_3_characters", + ), + incremental_dependency=use_incremental_dependency, + parent_key="id", + partition_field="partition_field", + parameters={}, + config={}, + ) + ], + parameters={}, + config={}, + ) + + expected_counter = 0 + for actual_slice in partition_router.stream_slices(): + assert actual_slice == expected_slices[expected_counter] + if use_incremental_dependency: + assert partition_router._parent_state["persona_3_characters"] == expected_parent_state[expected_counter] + expected_counter += 1 diff --git a/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py b/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py index 1953f7a7d4d3..7d6638368348 100644 --- a/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py +++ b/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py @@ -6,12 +6,14 @@ import json import logging from http import HTTPStatus -from typing import Any, Iterable, Mapping, Optional +from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional from unittest.mock import ANY, MagicMock, patch import pytest import requests -from airbyte_cdk.models import SyncMode +from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level, SyncMode, Type +from airbyte_cdk.sources.streams import CheckpointMixin +from airbyte_cdk.sources.streams.core import StreamData from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler, HttpStatusErrorHandler from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction @@ -661,3 +663,219 @@ def test_duplicate_request_params_are_deduped(deduplicate_query_params, path, pa def test_connection_pool(): stream = StubBasicReadHttpStream(authenticator=TokenAuthenticator("test-token")) assert stream._http_client._session.adapters["https://"]._pool_connections == 20 + + +class StubParentHttpStream(HttpStream, CheckpointMixin): + primary_key = "primary_key" + + counter = 0 + + def __init__(self, records: List[Mapping[str, Any]]): + super().__init__() + self._records = records + self._state: MutableMapping[str, Any] = {} + + @property + def url_base(self) -> str: + return "https://airbyte.io/api/v1" + + def path(self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[Mapping[str, Any]] = None, + next_page_token: Optional[Mapping[str, Any]] = None) -> str: + return "/stub" + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + return {"__ab_full_refresh_sync_complete": True} + + def _read_pages( + self, + records_generator_fn: Callable[ + [requests.PreparedRequest, requests.Response, Mapping[str, Any], Optional[Mapping[str, Any]]], Iterable[StreamData] + ], + stream_slice: Optional[Mapping[str, Any]] = None, + stream_state: Optional[Mapping[str, Any]] = None, + ) -> Iterable[StreamData]: + yield from self._records + + self.state = {"__ab_full_refresh_sync_complete": True} + + def parse_response( + self, + response: requests.Response, + *, + stream_state: Mapping[str, Any], + stream_slice: Optional[Mapping[str, Any]] = None, + next_page_token: Optional[Mapping[str, Any]] = None + ) -> Iterable[Mapping[str, Any]]: + return [] + + @property + def state(self) -> MutableMapping[str, Any]: + return self._state + + @state.setter + def state(self, value: MutableMapping[str, Any]) -> None: + self._state = value + + +class StubParentResumableFullRefreshStream(HttpStream, CheckpointMixin): + primary_key = "primary_key" + + counter = 0 + + def __init__(self, record_pages: List[List[Mapping[str, Any]]]): + super().__init__() + self._record_pages = record_pages + self._state: MutableMapping[str, Any] = {} + + @property + def url_base(self) -> str: + return "https://airbyte.io/api/v1" + + def path(self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[Mapping[str, Any]] = None, + next_page_token: Optional[Mapping[str, Any]] = None) -> str: + return "/stub" + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + return {"__ab_full_refresh_sync_complete": True} + + def read_records( + self, + sync_mode: SyncMode, + cursor_field: Optional[List[str]] = None, + stream_slice: Optional[Mapping[str, Any]] = None, + stream_state: Optional[Mapping[str, Any]] = None, + ) -> Iterable[StreamData]: + page_number = self.state.get("page") or 1 + yield from self._record_pages[page_number - 1] + + if page_number < len(self._record_pages): + self.state = {"page": page_number + 1} + else: + self.state = {"__ab_full_refresh_sync_complete": True} + + def parse_response( + self, + response: requests.Response, + *, + stream_state: Mapping[str, Any], + stream_slice: Optional[Mapping[str, Any]] = None, + next_page_token: Optional[Mapping[str, Any]] = None + ) -> Iterable[Mapping[str, Any]]: + return [] + + @property + def state(self) -> MutableMapping[str, Any]: + return self._state + + @state.setter + def state(self, value: MutableMapping[str, Any]) -> None: + self._state = value + + +class StubHttpSubstream(HttpSubStream): + primary_key = "primary_key" + + @property + def url_base(self) -> str: + return "https://airbyte.io/api/v1" + + def path(self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[Mapping[str, Any]] = None, + next_page_token: Optional[Mapping[str, Any]] = None) -> str: + return "/stub" + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + return None + + def _read_pages( + self, + records_generator_fn: Callable[ + [requests.PreparedRequest, requests.Response, Mapping[str, Any], Optional[Mapping[str, Any]]], Iterable[StreamData] + ], + stream_slice: Optional[Mapping[str, Any]] = None, + stream_state: Optional[Mapping[str, Any]] = None, + ) -> Iterable[StreamData]: + return [ + {"id": "abc", "parent": stream_slice.get("id")}, + {"id", "def", "parent", stream_slice.get("id")}, + ] + + def parse_response( + self, + response: requests.Response, + *, + stream_state: Mapping[str, Any], + stream_slice: Optional[Mapping[str, Any]] = None, + next_page_token: Optional[Mapping[str, Any]] = None + ) -> Iterable[Mapping[str, Any]]: + return [] + + +def test_substream_with_incremental_parent(): + expected_slices = [ + {"parent": {"id": "abc"}}, + {"parent": {"id": "def"}}, + ] + + parent_records = [ + {"id": "abc"}, + {"id": "def"}, + ] + + parent_stream = StubParentHttpStream(records=parent_records) + substream = StubHttpSubstream(parent=parent_stream) + + actual_slices = [slice for slice in substream.stream_slices(sync_mode=SyncMode.full_refresh)] + assert actual_slices == expected_slices + + +def test_substream_with_resumable_full_refresh_parent(): + parent_pages = [ + [ + {"id": "page_1_abc"}, + {"id": "page_1_def"}, + ], + [ + {"id": "page_2_abc"}, + {"id": "page_2_def"}, + ], + [ + {"id": "page_3_abc"}, + {"id": "page_3_def"}, + ] + ] + + expected_slices = [ + {"parent": {"id": "page_1_abc"}}, + {"parent": {"id": "page_1_def"}}, + {"parent": {"id": "page_2_abc"}}, + {"parent": {"id": "page_2_def"}}, + {"parent": {"id": "page_3_abc"}}, + {"parent": {"id": "page_3_def"}}, + ] + + parent_stream = StubParentResumableFullRefreshStream(record_pages=parent_pages) + substream = StubHttpSubstream(parent=parent_stream) + + actual_slices = [slice for slice in substream.stream_slices(sync_mode=SyncMode.full_refresh)] + assert actual_slices == expected_slices + + +def test_substream_skips_non_record_messages(): + expected_slices = [ + {"parent": {"id": "abc"}}, + {"parent": {"id": "def"}}, + {"parent": {"id": "ghi"}}, + ] + + parent_records = [ + {"id": "abc"}, + AirbyteMessage(type=Type.LOG, log=AirbyteLogMessage(level=Level.INFO, message="should_not_be_parent_record")), + {"id": "def"}, + {"id": "ghi"}, + ] + + parent_stream = StubParentHttpStream(records=parent_records) + substream = StubHttpSubstream(parent=parent_stream) + + actual_slices = [slice for slice in substream.stream_slices(sync_mode=SyncMode.full_refresh)] + assert actual_slices == expected_slices