From 15f1af2d86274125232ca97837443257cffac372 Mon Sep 17 00:00:00 2001 From: Maxime Carbonneau-Leclerc Date: Mon, 21 Aug 2023 15:56:01 -0400 Subject: [PATCH] Do not stop processing file on parsing error (#29679) --- .../stream/default_file_based_stream.py | 5 +- .../stream/test_default_file_based_stream.py | 84 ++++++++++++++++++- 2 files changed, 85 insertions(+), 4 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py index fd9ff174637d..087ea525b3af 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py @@ -65,7 +65,7 @@ def compute_slices(self) -> Iterable[Optional[Mapping[str, Any]]]: slices = [{"files": list(group[1])} for group in itertools.groupby(sorted_files_to_read, lambda f: f.last_modified)] return slices - def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Mapping[str, Any]]: + def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[AirbyteMessage]: """ Yield all records from all remote files in `list_files_for_this_sync`. @@ -127,9 +127,8 @@ def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Mapping stack_trace=traceback.format_exc(), ), ) - break - else: + finally: if n_skipped: yield AirbyteMessage( type=MessageType.LOG, diff --git a/airbyte-cdk/python/unit_tests/sources/file_based/stream/test_default_file_based_stream.py b/airbyte-cdk/python/unit_tests/sources/file_based/stream/test_default_file_based_stream.py index 4889b85f1e8a..99b2ae789a4e 100644 --- a/airbyte-cdk/python/unit_tests/sources/file_based/stream/test_default_file_based_stream.py +++ b/airbyte-cdk/python/unit_tests/sources/file_based/stream/test_default_file_based_stream.py @@ -2,9 +2,20 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # -from typing import Any, Mapping +import unittest +from datetime import datetime, timezone +from typing import Any, Iterable, Iterator, Mapping +from unittest.mock import Mock import pytest +from airbyte_cdk.models import Level +from airbyte_cdk.sources.file_based.availability_strategy import AbstractFileBasedAvailabilityStrategy +from airbyte_cdk.sources.file_based.discovery_policy import AbstractDiscoveryPolicy +from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader +from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser +from airbyte_cdk.sources.file_based.remote_file import RemoteFile +from airbyte_cdk.sources.file_based.schema_validation_policies import AbstractSchemaValidationPolicy +from airbyte_cdk.sources.file_based.stream.cursor import AbstractFileBasedCursor from airbyte_cdk.sources.file_based.stream.default_file_based_stream import DefaultFileBasedStream @@ -46,3 +57,74 @@ ) def test_fill_nulls(input_schema: Mapping[str, Any], expected_output: Mapping[str, Any]) -> None: assert DefaultFileBasedStream._fill_nulls(input_schema) == expected_output + + +class DefaultFileBasedStreamTest(unittest.TestCase): + _FILE_TYPE = "file_type" + _NOW = datetime(2022, 10, 22, tzinfo=timezone.utc) + _A_RECORD = {"a_record": 1} + + def setUp(self) -> None: + self._stream_config = Mock() + self._stream_config.file_type = self._FILE_TYPE + self._stream_config.name = "a stream name" + self._catalog_schema = Mock() + self._stream_reader = Mock(spec=AbstractFileBasedStreamReader) + self._availability_strategy = Mock(spec=AbstractFileBasedAvailabilityStrategy) + self._discovery_policy = Mock(spec=AbstractDiscoveryPolicy) + self._parser = Mock(spec=FileTypeParser) + self._validation_policy = Mock(spec=AbstractSchemaValidationPolicy) + self._validation_policy.name = "validation policy name" + self._cursor = Mock(spec=AbstractFileBasedCursor) + + self._stream = DefaultFileBasedStream( + config=self._stream_config, + catalog_schema=self._catalog_schema, + stream_reader=self._stream_reader, + availability_strategy=self._availability_strategy, + discovery_policy=self._discovery_policy, + parsers={self._FILE_TYPE: self._parser}, + validation_policy=self._validation_policy, + cursor=self._cursor, + ) + + def test_when_read_records_from_slice_then_return_records(self) -> None: + self._parser.parse_records.return_value = [self._A_RECORD] + messages = list(self._stream.read_records_from_slice({"files": [RemoteFile(uri="uri", last_modified=self._NOW)]})) + assert list(map(lambda message: message.record.data["data"], messages)) == [self._A_RECORD] + + def test_given_exception_when_read_records_from_slice_then_do_process_other_files(self) -> None: + """ + The current behavior for source-s3 v3 does not fail sync on some errors and hence, we will keep this behaviour for now. One example + we can easily reproduce this is by having a file with gzip extension that is not actually a gzip file. The reader will fail to open + the file but the sync won't fail. + Ticket: https://github.com/airbytehq/airbyte/issues/29680 + """ + self._parser.parse_records.side_effect = [ValueError("An error"), [self._A_RECORD]] + + messages = list(self._stream.read_records_from_slice({"files": [ + RemoteFile(uri="invalid_file", last_modified=self._NOW), + RemoteFile(uri="valid_file", last_modified=self._NOW), + ]})) + + assert messages[0].log.level == Level.ERROR + assert messages[1].record.data["data"] == self._A_RECORD + + def test_given_exception_after_skipping_records_when_read_records_from_slice_then_send_warning(self) -> None: + self._stream_config.schemaless = False + self._validation_policy.record_passes_validation_policy.return_value = False + self._parser.parse_records.side_effect = [self._iter([self._A_RECORD, ValueError("An error")])] + + messages = list(self._stream.read_records_from_slice({"files": [ + RemoteFile(uri="invalid_file", last_modified=self._NOW), + RemoteFile(uri="valid_file", last_modified=self._NOW), + ]})) + + assert messages[0].log.level == Level.ERROR + assert messages[1].log.level == Level.WARN + + def _iter(self, x: Iterable[Any]) -> Iterator[Any]: + for item in x: + if isinstance(item, Exception): + raise item + yield item