Skip to content

Commit

Permalink
Fix: Add "Destinations 1.0" handling to PyAirbyte: sync IDs, generati…
Browse files Browse the repository at this point in the history
…on IDs, and stream success statuses (#330)
  • Loading branch information
aaronsteers authored Aug 9, 2024
1 parent 821c699 commit 4c293d8
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 4 deletions.
19 changes: 18 additions & 1 deletion airbyte/_future_cdk/catalog_providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,24 @@ def __init__(
Since the catalog is passed by reference, the catalog manager may be updated with new
streams as they are discovered.
"""
self._catalog: ConfiguredAirbyteCatalog = configured_catalog
self._catalog: ConfiguredAirbyteCatalog = self.validate_catalog(configured_catalog)

@staticmethod
def validate_catalog(catalog: ConfiguredAirbyteCatalog) -> None:
"""Validate the catalog to ensure it is valid.
This requires ensuring that `generationId` and `minGenerationId` are both set. If
not, both values will be set to `1`.
"""
for stream in catalog.streams:
if stream.generation_id is None:
stream.generation_id = 1
if stream.minimum_generation_id is None:
stream.minimum_generation_id = 1
if stream.sync_id is None:
stream.sync_id = 1 # This should ideally increment monotonically with each sync.

return catalog

@property
def configured_catalog(self) -> ConfiguredAirbyteCatalog:
Expand Down
3 changes: 3 additions & 0 deletions airbyte/_message_iterators.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
)

from airbyte.constants import AB_EXTRACTED_AT_COLUMN
from airbyte.progress import _new_stream_success_message


if TYPE_CHECKING:
Expand Down Expand Up @@ -90,6 +91,8 @@ def generator() -> Generator[AirbyteMessage, None, None]:
state=state_provider.get_stream_state(stream_name),
)

yield _new_stream_success_message(stream_name)

return cls(generator())

@classmethod
Expand Down
46 changes: 43 additions & 3 deletions airbyte/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,15 @@
from rich.markdown import Markdown as RichMarkdown
from typing_extensions import Literal

from airbyte_protocol.models import AirbyteStreamStatus, Type
from airbyte_protocol.models import (
AirbyteMessage,
AirbyteStreamStatus,
AirbyteStreamStatusTraceMessage,
AirbyteTraceMessage,
StreamDescriptor,
TraceType,
Type,
)

from airbyte._util import meta
from airbyte._util.telemetry import EventState, EventType, send_telemetry
Expand All @@ -40,8 +48,6 @@
from collections.abc import Generator, Iterable
from types import ModuleType

from airbyte_protocol.models import AirbyteMessage

from airbyte._message_iterators import AirbyteMessageIterator
from airbyte.caches.base import CacheBase
from airbyte.destinations.base import Destination
Expand All @@ -68,6 +74,24 @@
IS_NOTEBOOK = False


def _new_stream_success_message(stream_name: str) -> AirbyteMessage:
"""Return a new stream success message."""
return AirbyteMessage(
type=Type.TRACE,
trace=AirbyteTraceMessage(
type=TraceType.STREAM_STATUS,
stream=stream_name,
emitted_at=pendulum.now().float_timestamp,
stream_status=AirbyteStreamStatusTraceMessage(
stream_descriptor=StreamDescriptor(
name=stream_name,
),
status=AirbyteStreamStatus.COMPLETE,
),
),
)


class ProgressStyle(Enum):
"""An enum of progress bar styles."""

Expand Down Expand Up @@ -201,6 +225,8 @@ def __init__(
def tally_records_read(
self,
messages: Iterable[AirbyteMessage],
*,
auto_close_streams: bool = False,
) -> Generator[AirbyteMessage, Any, None]:
"""This method simply tallies the number of records processed and yields the messages."""
# Update the display before we start.
Expand Down Expand Up @@ -247,6 +273,11 @@ def tally_records_read(
# Update the display.
self._update_display()

if auto_close_streams:
for stream_name in self._unclosed_stream_names:
yield _new_stream_success_message(stream_name)
self._log_stream_read_end(stream_name)

def tally_pending_writes(
self,
messages: IO[str] | AirbyteMessageIterator,
Expand Down Expand Up @@ -342,6 +373,15 @@ def _log_stream_read_end(self, stream_name: str) -> None:
)
self.stream_read_end_times[stream_name] = time.time()

@property
def _unclosed_stream_names(self) -> list[str]:
"""Return a list of streams that have not yet been fully read."""
return [
stream_name
for stream_name in self.stream_read_counts
if stream_name not in self.stream_read_end_times
]

def log_success(
self,
) -> None:
Expand Down
59 changes: 59 additions & 0 deletions examples/run_bigquery_destination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
"""
Usage:
poetry install
poetry run python examples/run_bigquery_destination.py
"""

from __future__ import annotations

import tempfile
import warnings
from typing import cast

import airbyte as ab
from airbyte.secrets.base import SecretString
from airbyte.secrets.google_gsm import GoogleGSMSecretManager

warnings.filterwarnings("ignore", message="Cannot create BigQuery Storage client")


AIRBYTE_INTERNAL_GCP_PROJECT = "dataline-integration-testing"
SECRET_NAME = "SECRET_DESTINATION-BIGQUERY_CREDENTIALS__CREDS"

bigquery_destination_secret: dict = cast(
SecretString,
GoogleGSMSecretManager(
project=AIRBYTE_INTERNAL_GCP_PROJECT,
credentials_json=ab.get_secret("GCP_GSM_CREDENTIALS"),
).get_secret(SECRET_NAME),
).parse_json()


def main() -> None:
source = ab.get_source(
"source-faker",
config={"count": 1000, "seed": 0, "parallelism": 1, "always_updated": False},
install_if_missing=True,
)
source.check()
source.select_all_streams()

with tempfile.NamedTemporaryFile(mode="w+", delete=False, encoding="utf-8") as temp:
# Write credentials to the temp file
temp.write(bigquery_destination_secret["credentials_json"])
temp.flush()
temp.close()

destination = ab.get_destination(
"destination-bigquery",
config={**bigquery_destination_secret, "dataset_id": "pyairbyte_tests"},
)
write_result = destination.write(
source,
# cache=False, # Toggle comment to test with/without caching
)


if __name__ == "__main__":
main()

0 comments on commit 4c293d8

Please sign in to comment.