diff --git a/airbyte/_executors/util.py b/airbyte/_executors/util.py index 8d0412b3..7c7c150d 100644 --- a/airbyte/_executors/util.py +++ b/airbyte/_executors/util.py @@ -16,7 +16,7 @@ from airbyte._executors.python import VenvExecutor from airbyte._util.meta import which from airbyte._util.telemetry import EventState, log_install_state # Non-public API -from airbyte.constants import TEMP_DIR_OVERRIDE +from airbyte.constants import AIRBYTE_OFFLINE_MODE, TEMP_DIR_OVERRIDE from airbyte.sources.registry import ConnectorMetadata, InstallType, get_connector_metadata from airbyte.version import get_version @@ -115,7 +115,7 @@ def _get_local_executor( ) -def get_connector_executor( # noqa: PLR0912, PLR0913 # Too complex +def get_connector_executor( # noqa: PLR0912, PLR0913, PLR0915 # Too many branches/arugments/statements name: str, *, version: str | None = None, @@ -161,6 +161,21 @@ def get_connector_executor( # noqa: PLR0912, PLR0913 # Too complex # Fail the install. log_install_state(name, state=EventState.FAILED, exception=ex) raise + except requests.exceptions.ConnectionError as ex: + if not AIRBYTE_OFFLINE_MODE: + # If the user has not enabled offline mode, raise an error. + raise exc.AirbyteConnectorRegistryError( + message="Failed to connect to the connector registry.", + context={"connector_name": name}, + guidance=( + "\nThere was a problem connecting to the Airbyte connector registry. " + "Please check your internet connection and try again.\nTo operate " + "offline, set the `AIRBYTE_OFFLINE_MODE` environment variable to `1`." + "This will prevent errors related to registry connectivity and disable " + "telemetry. \nIf you have a custom registry, set `_REGISTRY_ENV_VAR` " + "environment variable to the URL of your custom registry." + ), + ) from ex if install_method_count == 0: # User has not specified how to install the connector. diff --git a/airbyte/_util/telemetry.py b/airbyte/_util/telemetry.py index 0e4a5a83..83ae694c 100644 --- a/airbyte/_util/telemetry.py +++ b/airbyte/_util/telemetry.py @@ -50,6 +50,7 @@ WriterRuntimeInfo, ) from airbyte._util.hashing import one_way_hash +from airbyte.constants import AIRBYTE_OFFLINE_MODE from airbyte.version import get_version @@ -89,7 +90,7 @@ def _setup_analytics() -> str | bool: anonymous_user_id: str | None = None issues: list[str] = [] - if os.environ.get(DO_NOT_TRACK): + if os.environ.get(DO_NOT_TRACK) or AIRBYTE_OFFLINE_MODE: # User has opted out of tracking. return False @@ -207,7 +208,7 @@ def send_telemetry( exception: Exception | None = None, ) -> None: # If DO_NOT_TRACK is set, we don't send any telemetry - if os.environ.get(DO_NOT_TRACK): + if os.environ.get(DO_NOT_TRACK) or AIRBYTE_OFFLINE_MODE: return payload_props: dict[str, str | int | dict] = { diff --git a/airbyte/constants.py b/airbyte/constants.py index 377d8d6b..a70f1ce1 100644 --- a/airbyte/constants.py +++ b/airbyte/constants.py @@ -89,3 +89,23 @@ def _str_to_bool(value: str) -> bool: This value is read from the `AIRBYTE_TEMP_FILE_CLEANUP` environment variable. If the variable is not set, the default value is `True`. """ + +AIRBYTE_OFFLINE_MODE = _str_to_bool( + os.getenv( + key="AIRBYTE_OFFLINE_MODE", + default="false", + ) +) +"""Enable or disable offline mode. + +When offline mode is enabled, PyAirbyte will attempt to fetch metadata for connectors from the +Airbyte registry but will not raise an error if the registry is unavailable. This can be useful in +environments without internet access or with air-gapped networks. + +Offline mode also disables telemetry, similar to a `DO_NOT_TRACK` setting, ensuring no usage data +is sent from your environment. You may also specify a custom registry URL via the`_REGISTRY_ENV_VAR` +environment variable if you prefer to use a different registry source for metadata. + +This setting helps you make informed choices about data privacy and operation in restricted and +air-gapped environments. +""" diff --git a/airbyte/sources/registry.py b/airbyte/sources/registry.py index c2308a0a..ecdb2ca6 100644 --- a/airbyte/sources/registry.py +++ b/airbyte/sources/registry.py @@ -16,6 +16,8 @@ from airbyte import exceptions as exc from airbyte._util.meta import is_docker_installed +from airbyte.constants import AIRBYTE_OFFLINE_MODE +from airbyte.logs import warn_once from airbyte.version import get_version @@ -180,6 +182,10 @@ def _get_registry_url() -> str: return _REGISTRY_URL +def _is_registry_disabled(url: str) -> bool: + return url.upper() in {"0", "F", "FALSE"} or AIRBYTE_OFFLINE_MODE + + def _registry_entry_to_connector_metadata(entry: dict) -> ConnectorMetadata: name = entry["dockerRepository"].replace("airbyte/", "") latest_version: str | None = entry.get("dockerImageTag") @@ -233,7 +239,12 @@ def _get_registry_cache(*, force_refresh: bool = False) -> dict[str, ConnectorMe return __cache registry_url = _get_registry_url() + + if _is_registry_disabled(registry_url): + return {} + if registry_url.startswith("http"): + response = requests.get( registry_url, headers={"User-Agent": f"PyAirbyte/{get_version()}"}, @@ -256,23 +267,29 @@ def _get_registry_cache(*, force_refresh: bool = False) -> dict[str, ConnectorMe new_cache[connector_metadata.name] = connector_metadata if len(new_cache) == 0: - raise exc.PyAirbyteInternalError( - message="Connector registry is empty.", - context={ - "registry_url": _get_registry_url(), - }, + # This isn't necessarily fatal, since users can bring their own + # connector definitions. + warn_once( + message=f"Connector registry is empty: {registry_url}", + with_stack=False, ) __cache = new_cache return __cache -def get_connector_metadata(name: str) -> ConnectorMetadata: +def get_connector_metadata(name: str) -> None | ConnectorMetadata: """Check the cache for the connector. If the cache is empty, populate by calling update_cache. """ + registry_url = _get_registry_url() + + if _is_registry_disabled(registry_url): + return None + cache = copy(_get_registry_cache()) + if not cache: raise exc.PyAirbyteInternalError( message="Connector registry could not be loaded.",