Skip to content

Commit

Permalink
Merge branch 'main' into aj/add-poke-api-integ-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers authored Mar 22, 2024
2 parents 413f364 + 8e3441b commit d98f6dd
Show file tree
Hide file tree
Showing 10 changed files with 320 additions and 154 deletions.
2 changes: 2 additions & 0 deletions airbyte/_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from airbyte import exceptions as exc
from airbyte._util.meta import is_windows
from airbyte._util.telemetry import EventState, log_install_state
from airbyte.sources.registry import ConnectorMetadata


Expand Down Expand Up @@ -238,6 +239,7 @@ def install(self) -> None:

# Assuming the installation succeeded, store the installed version
self.reported_version = self._get_installed_version(raise_on_error=False, recheck=True)
log_install_state(self.name, state=EventState.SUCCEEDED)
print(
f"Connector '{self.name}' installed successfully!\n"
f"For more information, see the {self.name} documentation:\n"
Expand Down
5 changes: 2 additions & 3 deletions airbyte/_util/meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,11 @@ def is_jupyter() -> bool:
@lru_cache
def get_notebook_name() -> str | None:
if is_colab():
session_info = None
response = None
session_info: dict | None = None
with suppress(Exception):
response = requests.get(COLAB_SESSION_URL)
if response.status_code == 200: # noqa: PLR2004 # Magic number
session_info = response.json()
session_info = response.json()[0]

if session_info and "name" in session_info:
return session_info["name"]
Expand Down
47 changes: 47 additions & 0 deletions airbyte/_util/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ class EventState(str, Enum):
class EventType(str, Enum):
INSTALL = "install"
SYNC = "sync"
VALIDATE = "validate"
CHECK = "check"


@dataclass
Expand Down Expand Up @@ -293,3 +295,48 @@ def send_telemetry(
"timestamp": datetime.datetime.utcnow().isoformat(), # noqa: DTZ003
},
)


def log_config_validation_result(
name: str,
state: EventState,
exception: Exception | None = None,
) -> None:
"""Log a config validation event."""
send_telemetry(
source=name,
cache=None,
state=state,
event_type=EventType.VALIDATE,
exception=exception,
)


def log_source_check_result(
name: str,
state: EventState,
exception: Exception | None = None,
) -> None:
"""Log a source `check` result."""
send_telemetry(
source=name,
cache=None,
state=state,
event_type=EventType.CHECK,
exception=exception,
)


def log_install_state(
name: str,
state: EventState,
exception: Exception | None = None,
) -> None:
"""Log an install event."""
send_telemetry(
source=name,
cache=None,
state=state,
event_type=EventType.INSTALL,
exception=exception,
)
8 changes: 8 additions & 0 deletions airbyte/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,14 @@ class AirbyteConnectorMissingSpecError(AirbyteConnectorError):
"""Connector did not return a spec."""


class AirbyteConnectorValidationFailedError(AirbyteConnectorError):
"""Connector config validation failed."""

guidance = (
"Please double-check your config and review the validation errors for more information."
)


class AirbyteConnectorCheckFailedError(AirbyteConnectorError):
"""Connector check failed."""

Expand Down
110 changes: 101 additions & 9 deletions airbyte/sources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
import tempfile
import warnings
from contextlib import contextmanager, suppress
from pathlib import Path
from typing import TYPE_CHECKING, Any, cast

import jsonschema
import pendulum
import yaml
from rich import print
from rich.syntax import Syntax
from typing_extensions import Literal

from airbyte_protocol.models import (
AirbyteCatalog,
Expand All @@ -29,12 +32,19 @@
from airbyte import exceptions as exc
from airbyte._util import protocol_util
from airbyte._util.name_normalizers import normalize_records
from airbyte._util.telemetry import EventState, EventType, send_telemetry
from airbyte._util.telemetry import (
EventState,
EventType,
log_config_validation_result,
log_source_check_result,
send_telemetry,
)
from airbyte.caches.util import get_default_cache
from airbyte.datasets._lazy import LazyDataset
from airbyte.progress import progress
from airbyte.results import ReadResult
from airbyte.strategies import WriteStrategy
from airbyte.warnings import PyAirbyteDataLossWarning


if TYPE_CHECKING:
Expand Down Expand Up @@ -150,7 +160,7 @@ def set_config(
self,
config: dict[str, Any],
*,
validate: bool = False,
validate: bool = True,
) -> None:
"""Set the config for the connector.
Expand Down Expand Up @@ -200,7 +210,28 @@ def validate_config(self, config: dict[str, Any] | None = None) -> None:
"""
spec = self._get_spec(force_refresh=False)
config = self._config if config is None else config
jsonschema.validate(config, spec.connectionSpecification)
try:
jsonschema.validate(config, spec.connectionSpecification)
log_config_validation_result(
name=self.name,
state=EventState.SUCCEEDED,
)
except jsonschema.ValidationError as ex:
validation_ex = exc.AirbyteConnectorValidationFailedError(
message="The provided config is not valid.",
context={
"error_message": ex.message,
"error_path": ex.path,
"error_instance": ex.instance,
"error_schema": ex.schema,
},
)
log_config_validation_result(
name=self.name,
state=EventState.FAILED,
exception=validation_ex,
)
raise validation_ex from ex

def get_available_streams(self) -> list[str]:
"""Get the available streams from the spec."""
Expand All @@ -227,6 +258,51 @@ def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification:
log_text=self._last_log_messages,
)

@property
def config_spec(self) -> dict[str, Any]:
"""Generate a configuration spec for this connector, as a JSON Schema definition.
This function generates a JSON Schema dictionary with configuration specs for the
current connector, as a dictionary.
Returns:
dict: The JSON Schema configuration spec as a dictionary.
"""
return self._get_spec(force_refresh=True).connectionSpecification

def print_config_spec(
self,
format: Literal["yaml", "json"] = "yaml", # noqa: A002
*,
output_file: Path | str | None = None,
) -> None:
"""Print the configuration spec for this connector.
Args:
- format: The format to print the spec in. Must be "yaml" or "json".
- output_file: Optional. If set, the spec will be written to the given file path. Otherwise,
it will be printed to the console.
"""
if format not in ["yaml", "json"]:
raise exc.AirbyteLibInputError(
message="Invalid format. Expected 'yaml' or 'json'",
input_value=format,
)
if isinstance(output_file, str):
output_file = Path(output_file)

if format == "yaml":
content = yaml.dump(self.config_spec, indent=2)
elif format == "json":
content = json.dumps(self.config_spec, indent=2)

if output_file:
output_file.write_text(content)
return

syntax_highlighted = Syntax(content, format)
print(syntax_highlighted)

@property
def _yaml_spec(self) -> str:
"""Get the spec as a yaml string.
Expand Down Expand Up @@ -414,8 +490,16 @@ def check(self) -> None:
if msg.type == Type.CONNECTION_STATUS and msg.connectionStatus:
if msg.connectionStatus.status != Status.FAILED:
print(f"Connection check succeeded for `{self.name}`.")
log_source_check_result(
name=self.name,
state=EventState.SUCCEEDED,
)
return

log_source_check_result(
name=self.name,
state=EventState.FAILED,
)
raise exc.AirbyteConnectorCheckFailedError(
help_url=self.docs_url,
context={
Expand Down Expand Up @@ -582,6 +666,7 @@ def read(
streams: str | list[str] | None = None,
write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
force_full_refresh: bool = False,
skip_validation: bool = False,
) -> ReadResult:
"""Read from the connector and write to the cache.
Expand All @@ -598,12 +683,16 @@ def read(
must be True when using the "replace" strategy.
"""
if write_strategy == WriteStrategy.REPLACE and not force_full_refresh:
raise exc.AirbyteLibInputError(
message="The replace strategy requires full refresh mode.",
context={
"write_strategy": write_strategy,
"force_full_refresh": force_full_refresh,
},
warnings.warn(
message=(
"Using `REPLACE` strategy without also setting `full_refresh_mode=True` "
"could result in data loss. "
"To silence this warning, use the following: "
'warnings.filterwarnings("ignore", '
'category="airbyte.warnings.PyAirbyteDataLossWarning")`'
),
category=PyAirbyteDataLossWarning,
stacklevel=1,
)
if cache is None:
cache = get_default_cache()
Expand Down Expand Up @@ -643,6 +732,9 @@ def read(
if not force_full_refresh
else None
)
if not skip_validation:
self.validate_config()

self._log_sync_start(cache=cache)
try:
cache.processor.process_airbyte_messages(
Expand Down
21 changes: 3 additions & 18 deletions airbyte/sources/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from airbyte import exceptions as exc
from airbyte._executor import PathExecutor, VenvExecutor
from airbyte._util.telemetry import EventState, EventType, send_telemetry
from airbyte._util.telemetry import EventState, log_install_state
from airbyte.sources.base import Source
from airbyte.sources.registry import ConnectorMetadata, get_connector_metadata

Expand Down Expand Up @@ -122,7 +122,7 @@ def get_source(
metadata = get_connector_metadata(name)
except exc.AirbyteConnectorNotRegisteredError as ex:
if not pip_url:
_log_install_state(name, state=EventState.FAILED, exception=ex)
log_install_state(name, state=EventState.FAILED, exception=ex)
# We don't have a pip url or registry entry, so we can't install the connector
raise

Expand All @@ -143,25 +143,10 @@ def get_source(
executor=executor,
)
except Exception as e:
_log_install_state(name, state=EventState.FAILED, exception=e)
log_install_state(name, state=EventState.FAILED, exception=e)
raise


__all__ = [
"get_source",
]


def _log_install_state(
name: str,
state: EventState,
exception: Exception | None = None,
) -> None:
"""Log an install event."""
send_telemetry(
source=name,
cache=None,
state=state,
event_type=EventType.INSTALL,
exception=exception,
)
12 changes: 12 additions & 0 deletions airbyte/warnings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
"""Warnings for the PyAirbyte library."""

from __future__ import annotations


class PyAirbyteDataLossWarning(Warning):
"""Warning for potential data loss.
Users can ignore this warning by running:
> warnings.filterwarnings("ignore", category="airbyte.exceptions.PyAirbyteDataLossWarning")
"""
Loading

0 comments on commit d98f6dd

Please sign in to comment.