Skip to content

Commit

Permalink
Chore: Refactor new "Writers" interface for destinations and caches (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers authored Sep 2, 2024
1 parent fc18d0a commit 7ca3541
Show file tree
Hide file tree
Showing 25 changed files with 526 additions and 434 deletions.
9 changes: 7 additions & 2 deletions airbyte/_connector_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,20 @@ def __init__(
If config is provided, it will be validated against the spec if validate is True.
"""
self.executor = executor
self.name = name
self._name = name
self._config_dict: dict[str, Any] | None = None
self._last_log_messages: list[str] = []
self._spec: ConnectorSpecification | None = None
self._selected_stream_names: list[str] = []
self._file_logger: logging.Logger = new_passthrough_file_logger(self.name)
self._file_logger: logging.Logger = new_passthrough_file_logger(self._name)
if config is not None:
self.set_config(config, validate=validate)

@property
def name(self) -> str:
"""Get the name of the connector."""
return self._name

def _print_info_message(
self,
message: str,
Expand Down
69 changes: 69 additions & 0 deletions airbyte/_future_cdk/catalog_providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@

from __future__ import annotations

import copy
from typing import TYPE_CHECKING, Any, final

from airbyte_protocol.models import (
ConfiguredAirbyteCatalog,
)

from airbyte import exceptions as exc
from airbyte.strategies import WriteMethod, WriteStrategy


if TYPE_CHECKING:
Expand Down Expand Up @@ -135,3 +137,70 @@ def from_read_result(
]
)
)

def get_primary_keys(
self,
stream_name: str,
) -> list[str]:
pks = self.get_configured_stream_info(stream_name).primary_key
if not pks:
return []

joined_pks = [".".join(pk) for pk in pks]
for pk in joined_pks:
if "." in pk:
msg = f"Nested primary keys are not yet supported. Found: {pk}"
raise NotImplementedError(msg)

return joined_pks

def get_cursor_key(
self,
stream_name: str,
) -> str | None:
return self.get_configured_stream_info(stream_name).cursor_field

def resolve_write_method(
self,
stream_name: str,
write_strategy: WriteStrategy,
) -> WriteMethod:
"""Return the write method for the given stream."""
has_pks: bool = bool(self.get_primary_keys(stream_name))
has_incremental_key: bool = bool(self.get_cursor_key(stream_name))
if write_strategy == WriteStrategy.MERGE and not has_pks:
raise exc.PyAirbyteInputError(
message="Cannot use merge strategy on a stream with no primary keys.",
context={
"stream_name": stream_name,
},
)

if write_strategy != WriteStrategy.AUTO:
return WriteMethod(write_strategy)

if has_pks:
return WriteMethod.MERGE

if has_incremental_key:
return WriteMethod.APPEND

return WriteMethod.REPLACE

def with_write_strategy(
self,
write_strategy: WriteStrategy,
) -> CatalogProvider:
"""Return a new catalog provider with the specified write strategy applied.
The original catalog provider is not modified.
"""
new_catalog: ConfiguredAirbyteCatalog = copy.deepcopy(self.configured_catalog)
for stream in new_catalog.streams:
write_method = self.resolve_write_method(
stream_name=stream.stream.name,
write_strategy=write_strategy,
)
stream.destination_sync_mode = write_method.destination_sync_mode

return CatalogProvider(new_catalog)
Loading

0 comments on commit 7ca3541

Please sign in to comment.