Skip to content

Commit

Permalink
Do not stop processing file on parsing error (airbytehq#29679)
Browse files Browse the repository at this point in the history
  • Loading branch information
maxi297 authored and harrytou committed Sep 1, 2023
1 parent 1f4c43a commit 15f1af2
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def compute_slices(self) -> Iterable[Optional[Mapping[str, Any]]]:
slices = [{"files": list(group[1])} for group in itertools.groupby(sorted_files_to_read, lambda f: f.last_modified)]
return slices

def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Mapping[str, Any]]:
def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[AirbyteMessage]:
"""
Yield all records from all remote files in `list_files_for_this_sync`.
Expand Down Expand Up @@ -127,9 +127,8 @@ def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Mapping
stack_trace=traceback.format_exc(),
),
)
break

else:
finally:
if n_skipped:
yield AirbyteMessage(
type=MessageType.LOG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,20 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from typing import Any, Mapping
import unittest
from datetime import datetime, timezone
from typing import Any, Iterable, Iterator, Mapping
from unittest.mock import Mock

import pytest
from airbyte_cdk.models import Level
from airbyte_cdk.sources.file_based.availability_strategy import AbstractFileBasedAvailabilityStrategy
from airbyte_cdk.sources.file_based.discovery_policy import AbstractDiscoveryPolicy
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from airbyte_cdk.sources.file_based.schema_validation_policies import AbstractSchemaValidationPolicy
from airbyte_cdk.sources.file_based.stream.cursor import AbstractFileBasedCursor
from airbyte_cdk.sources.file_based.stream.default_file_based_stream import DefaultFileBasedStream


Expand Down Expand Up @@ -46,3 +57,74 @@
)
def test_fill_nulls(input_schema: Mapping[str, Any], expected_output: Mapping[str, Any]) -> None:
assert DefaultFileBasedStream._fill_nulls(input_schema) == expected_output


class DefaultFileBasedStreamTest(unittest.TestCase):
_FILE_TYPE = "file_type"
_NOW = datetime(2022, 10, 22, tzinfo=timezone.utc)
_A_RECORD = {"a_record": 1}

def setUp(self) -> None:
self._stream_config = Mock()
self._stream_config.file_type = self._FILE_TYPE
self._stream_config.name = "a stream name"
self._catalog_schema = Mock()
self._stream_reader = Mock(spec=AbstractFileBasedStreamReader)
self._availability_strategy = Mock(spec=AbstractFileBasedAvailabilityStrategy)
self._discovery_policy = Mock(spec=AbstractDiscoveryPolicy)
self._parser = Mock(spec=FileTypeParser)
self._validation_policy = Mock(spec=AbstractSchemaValidationPolicy)
self._validation_policy.name = "validation policy name"
self._cursor = Mock(spec=AbstractFileBasedCursor)

self._stream = DefaultFileBasedStream(
config=self._stream_config,
catalog_schema=self._catalog_schema,
stream_reader=self._stream_reader,
availability_strategy=self._availability_strategy,
discovery_policy=self._discovery_policy,
parsers={self._FILE_TYPE: self._parser},
validation_policy=self._validation_policy,
cursor=self._cursor,
)

def test_when_read_records_from_slice_then_return_records(self) -> None:
self._parser.parse_records.return_value = [self._A_RECORD]
messages = list(self._stream.read_records_from_slice({"files": [RemoteFile(uri="uri", last_modified=self._NOW)]}))
assert list(map(lambda message: message.record.data["data"], messages)) == [self._A_RECORD]

def test_given_exception_when_read_records_from_slice_then_do_process_other_files(self) -> None:
"""
The current behavior for source-s3 v3 does not fail sync on some errors and hence, we will keep this behaviour for now. One example
we can easily reproduce this is by having a file with gzip extension that is not actually a gzip file. The reader will fail to open
the file but the sync won't fail.
Ticket: https://github.com/airbytehq/airbyte/issues/29680
"""
self._parser.parse_records.side_effect = [ValueError("An error"), [self._A_RECORD]]

messages = list(self._stream.read_records_from_slice({"files": [
RemoteFile(uri="invalid_file", last_modified=self._NOW),
RemoteFile(uri="valid_file", last_modified=self._NOW),
]}))

assert messages[0].log.level == Level.ERROR
assert messages[1].record.data["data"] == self._A_RECORD

def test_given_exception_after_skipping_records_when_read_records_from_slice_then_send_warning(self) -> None:
self._stream_config.schemaless = False
self._validation_policy.record_passes_validation_policy.return_value = False
self._parser.parse_records.side_effect = [self._iter([self._A_RECORD, ValueError("An error")])]

messages = list(self._stream.read_records_from_slice({"files": [
RemoteFile(uri="invalid_file", last_modified=self._NOW),
RemoteFile(uri="valid_file", last_modified=self._NOW),
]}))

assert messages[0].log.level == Level.ERROR
assert messages[1].log.level == Level.WARN

def _iter(self, x: Iterable[Any]) -> Iterator[Any]:
for item in x:
if isinstance(item, Exception):
raise item
yield item

0 comments on commit 15f1af2

Please sign in to comment.