Skip to content

Commit

Permalink
✨Feature(airbyte-cdk): File Transfer implementation (#47686)
Browse files Browse the repository at this point in the history
  • Loading branch information
aldogonzalez8 authored Oct 31, 2024
1 parent 711dfaa commit d166bc6
Show file tree
Hide file tree
Showing 19 changed files with 622 additions and 201 deletions.
5 changes: 3 additions & 2 deletions airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
#

from dataclasses import InitVar, dataclass
from typing import Annotated, Any, Dict, List, Mapping, Optional
from typing import Annotated, Any, Dict, List, Mapping, Optional, Union

from airbyte_cdk.models.file_transfer_record_message import AirbyteFileTransferRecordMessage
from airbyte_protocol_dataclasses.models import *
from serpyco_rs.metadata import Alias

Expand Down Expand Up @@ -76,7 +77,7 @@ class AirbyteMessage:
spec: Optional[ConnectorSpecification] = None # type: ignore [name-defined]
connectionStatus: Optional[AirbyteConnectionStatus] = None # type: ignore [name-defined]
catalog: Optional[AirbyteCatalog] = None # type: ignore [name-defined]
record: Optional[AirbyteRecordMessage] = None # type: ignore [name-defined]
record: Optional[Union[AirbyteFileTransferRecordMessage, AirbyteRecordMessage]] = None # type: ignore [name-defined]
state: Optional[AirbyteStateMessage] = None
trace: Optional[AirbyteTraceMessage] = None # type: ignore [name-defined]
control: Optional[AirbyteControlMessage] = None # type: ignore [name-defined]
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

from dataclasses import dataclass
from typing import Any, Dict, Optional


@dataclass
class AirbyteFileTransferRecordMessage:
stream: str
file: Dict[str, Any]
emitted_at: int
namespace: Optional[str] = None
data: Optional[Dict[str, Any]] = None
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,33 @@

import copy
from abc import abstractmethod
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Literal, Optional, Union

import dpath
from airbyte_cdk import OneOfOptionConfig
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.sources.utils import schema_helpers
from pydantic.v1 import AnyUrl, BaseModel, Field


class DeliverRecords(BaseModel):
class Config(OneOfOptionConfig):
title = "Replicate Records"
description = "Recommended - Extract and load structured records into your destination of choice. This is the classic method of moving data in Airbyte. It allows for blocking and hashing individual fields or files from a structured schema. Data can be flattened, typed and deduped depending on the destination."
discriminator = "delivery_type"

delivery_type: Literal["use_records_transfer"] = Field("use_records_transfer", const=True)


class DeliverRawFiles(BaseModel):
class Config(OneOfOptionConfig):
title = "Copy Raw Files"
description = "Copy raw files without parsing their contents. Bits are copied into the destination exactly as they appeared in the source. Recommended for use with unstructured text data, non-text and compressed files."
discriminator = "delivery_type"

delivery_type: Literal["use_file_transfer"] = Field("use_file_transfer", const=True)


class AbstractFileBasedSpec(BaseModel):
"""
Used during spec; allows the developer to configure the cloud provider specific options
Expand All @@ -34,6 +53,17 @@ class AbstractFileBasedSpec(BaseModel):
order=10,
)

delivery_method: Union[DeliverRecords, DeliverRawFiles] = Field(
title="Delivery Method",
discriminator="delivery_type",
type="object",
order=7,
display_type="radio",
group="advanced",
default="use_records_transfer",
airbyte_hidden=True,
)

@classmethod
@abstractmethod
def documentation_url(cls) -> AnyUrl:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,7 @@ class CustomFileBasedException(AirbyteTracedException):
"""

pass


class FileSizeLimitError(CustomFileBasedException):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import traceback
from abc import ABC
from collections import Counter
from typing import Any, Iterator, List, Mapping, MutableMapping, Optional, Tuple, Type, Union
from typing import Any, Iterator, List, Mapping, Optional, Tuple, Type, Union

from airbyte_cdk.logger import AirbyteLogFormatter, init_logger
from airbyte_cdk.models import (
Expand Down Expand Up @@ -127,10 +127,16 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) ->
if not isinstance(stream, AbstractFileBasedStream):
raise ValueError(f"Stream {stream} is not a file-based stream.")
try:
parsed_config = self._get_parsed_config(config)
availability_method = (
stream.availability_strategy.check_availability
if self._use_file_transfer(parsed_config)
else stream.availability_strategy.check_availability_and_parsability
)
(
stream_is_available,
reason,
) = stream.availability_strategy.check_availability_and_parsability(stream, logger, self)
) = availability_method(stream, logger, self)
except AirbyteTracedException as ate:
errors.append(f"Unable to connect to stream {stream.name} - {ate.message}")
tracebacks.append(traceback.format_exc())
Expand Down Expand Up @@ -217,11 +223,19 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
CursorField(DefaultFileBasedStream.ab_last_mod_col),
)
stream = FileBasedStreamFacade.create_from_stream(
self._make_default_stream(stream_config, cursor), self, self.logger, stream_state, cursor
stream=self._make_default_stream(
stream_config=stream_config, cursor=cursor, use_file_transfer=self._use_file_transfer(parsed_config)
),
source=self,
logger=self.logger,
state=stream_state,
cursor=cursor,
)
else:
cursor = self.cursor_cls(stream_config)
stream = self._make_default_stream(stream_config, cursor)
stream = self._make_default_stream(
stream_config=stream_config, cursor=cursor, use_file_transfer=self._use_file_transfer(parsed_config)
)

streams.append(stream)
return streams
Expand All @@ -230,7 +244,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
raise ConfigValidationError(FileBasedSourceError.CONFIG_VALIDATION_ERROR) from exc

def _make_default_stream(
self, stream_config: FileBasedStreamConfig, cursor: Optional[AbstractFileBasedCursor]
self, stream_config: FileBasedStreamConfig, cursor: Optional[AbstractFileBasedCursor], use_file_transfer: bool = False
) -> AbstractFileBasedStream:
return DefaultFileBasedStream(
config=stream_config,
Expand All @@ -242,6 +256,7 @@ def _make_default_stream(
validation_policy=self._validate_and_get_validation_policy(stream_config),
errors_collector=self.errors_collector,
cursor=cursor,
use_file_transfer=use_file_transfer,
)

def _get_stream_from_catalog(self, stream_config: FileBasedStreamConfig) -> Optional[AirbyteStream]:
Expand All @@ -264,7 +279,7 @@ def read(
logger: logging.Logger,
config: Mapping[str, Any],
catalog: ConfiguredAirbyteCatalog,
state: Optional[Union[List[AirbyteStateMessage], MutableMapping[str, Any]]] = None,
state: Optional[List[AirbyteStateMessage]] = None,
) -> Iterator[AirbyteMessage]:
yield from super().read(logger, config, catalog, state)
# emit all the errors collected
Expand Down Expand Up @@ -298,3 +313,10 @@ def _validate_and_get_validation_policy(self, stream_config: FileBasedStreamConf
def _validate_input_schema(self, stream_config: FileBasedStreamConfig) -> None:
if stream_config.schemaless and stream_config.input_schema:
raise ValidationError("`input_schema` and `schemaless` options cannot both be set", model=FileBasedStreamConfig)

@staticmethod
def _use_file_transfer(parsed_config: AbstractFileBasedSpec) -> bool:
use_file_transfer = (
hasattr(parsed_config.delivery_method, "delivery_type") and parsed_config.delivery_method.delivery_type == "use_file_transfer"
)
return use_file_transfer
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,16 @@ def filter_files_by_globs_and_start_date(self, files: List[RemoteFile], globs: L
seen.add(file.uri)
yield file

@abstractmethod
def file_size(self, file: RemoteFile) -> int:
"""Utility method to get size of the remote file.
This is required for connectors that will support writing to
files. If the connector does not support writing files, then the
subclass can simply `return 0`.
"""
...

@staticmethod
def file_matches_globs(file: RemoteFile, globs: List[str]) -> bool:
# Use the GLOBSTAR flag to enable recursive ** matching
Expand All @@ -105,3 +115,11 @@ def get_prefixes_from_globs(globs: List[str]) -> Set[str]:
"""
prefixes = {glob.split("*")[0] for glob in globs}
return set(filter(lambda x: bool(x), prefixes))

def use_file_transfer(self) -> bool:
if self.config:
use_file_transfer = (
hasattr(self.config.delivery_method, "delivery_type") and self.config.delivery_method.delivery_type == "use_file_transfer"
)
return use_file_transfer
return False
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from .jsonl_parser import JsonlParser
from .parquet_parser import ParquetParser
from .unstructured_parser import UnstructuredParser
from .blob_transfer import BlobTransfer

default_parsers: Mapping[Type[Any], FileTypeParser] = {
AvroFormat: AvroParser(),
Expand All @@ -24,4 +25,4 @@
UnstructuredFormat: UnstructuredParser(),
}

__all__ = ["AvroParser", "CsvParser", "ExcelParser", "JsonlParser", "ParquetParser", "UnstructuredParser", "default_parsers"]
__all__ = ["AvroParser", "CsvParser", "ExcelParser", "JsonlParser", "ParquetParser", "UnstructuredParser", "BlobTransfer", "default_parsers"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import logging
from io import IOBase
from typing import Any, Dict, Generator, Iterable, Optional, Tuple

from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from airbyte_cdk.sources.file_based.writers.local_file_client import LocalFileTransferClient


class _FileReader:
def read_data(
self,
config: FileBasedStreamConfig,
file: RemoteFile,
stream_reader: AbstractFileBasedStreamReader,
logger: logging.Logger,
file_read_mode: FileReadMode,
) -> Generator[Tuple[IOBase, int], None, None]:

try:
file_size = stream_reader.file_size(file)
with stream_reader.open_file(file, file_read_mode, "UTF-8", logger) as fp:
yield fp, file_size

except Exception as ex:
logger.error("An error has occurred while reading file: %s", str(ex))


class BlobTransfer:
def __init__(self, file_reader: Optional[_FileReader] = None):
self._file_reader = file_reader if file_reader else _FileReader()

def write_streams(
self,
config: FileBasedStreamConfig,
file: RemoteFile,
stream_reader: AbstractFileBasedStreamReader,
logger: logging.Logger,
) -> Iterable[Dict[str, Any]]:
file_no = 0
try:
data_generator = self._file_reader.read_data(config, file, stream_reader, logger, self.file_read_mode)
local_writer = LocalFileTransferClient()
for file_opened, file_size in data_generator:
yield local_writer.write(file.uri, file_opened, file_size, logger)
file_no += 1
except Exception as ex:
logger.error("An error has occurred while writing file: %s", str(ex))
raise ex
finally:
data_generator.close()

@property
def file_read_mode(self) -> FileReadMode:
return FileReadMode.READ
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
SchemaType = Mapping[str, Mapping[str, JsonSchemaSupportedType]]

schemaless_schema = {"type": "object", "properties": {"data": {"type": "object"}}}
file_transfer_schema = {"type": "object", "properties": {"data": {"type": "object"}, "file": {"type": "object"}}}


@total_ordering
Expand Down
Loading

0 comments on commit d166bc6

Please sign in to comment.