Skip to content

Commit

Permalink
Feat: Introducing AIRBYTE_OFFLINE_MODE for air-gapped environments (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
niyasrad authored Oct 29, 2024
1 parent 02f5ede commit 7c703ac
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 10 deletions.
19 changes: 17 additions & 2 deletions airbyte/_executors/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from airbyte._executors.python import VenvExecutor
from airbyte._util.meta import which
from airbyte._util.telemetry import EventState, log_install_state # Non-public API
from airbyte.constants import TEMP_DIR_OVERRIDE
from airbyte.constants import AIRBYTE_OFFLINE_MODE, TEMP_DIR_OVERRIDE
from airbyte.sources.registry import ConnectorMetadata, InstallType, get_connector_metadata
from airbyte.version import get_version

Expand Down Expand Up @@ -115,7 +115,7 @@ def _get_local_executor(
)


def get_connector_executor( # noqa: PLR0912, PLR0913 # Too complex
def get_connector_executor( # noqa: PLR0912, PLR0913, PLR0915 # Too many branches/arugments/statements
name: str,
*,
version: str | None = None,
Expand Down Expand Up @@ -161,6 +161,21 @@ def get_connector_executor( # noqa: PLR0912, PLR0913 # Too complex
# Fail the install.
log_install_state(name, state=EventState.FAILED, exception=ex)
raise
except requests.exceptions.ConnectionError as ex:
if not AIRBYTE_OFFLINE_MODE:
# If the user has not enabled offline mode, raise an error.
raise exc.AirbyteConnectorRegistryError(
message="Failed to connect to the connector registry.",
context={"connector_name": name},
guidance=(
"\nThere was a problem connecting to the Airbyte connector registry. "
"Please check your internet connection and try again.\nTo operate "
"offline, set the `AIRBYTE_OFFLINE_MODE` environment variable to `1`."
"This will prevent errors related to registry connectivity and disable "
"telemetry. \nIf you have a custom registry, set `_REGISTRY_ENV_VAR` "
"environment variable to the URL of your custom registry."
),
) from ex

if install_method_count == 0:
# User has not specified how to install the connector.
Expand Down
5 changes: 3 additions & 2 deletions airbyte/_util/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
WriterRuntimeInfo,
)
from airbyte._util.hashing import one_way_hash
from airbyte.constants import AIRBYTE_OFFLINE_MODE
from airbyte.version import get_version


Expand Down Expand Up @@ -89,7 +90,7 @@ def _setup_analytics() -> str | bool:
anonymous_user_id: str | None = None
issues: list[str] = []

if os.environ.get(DO_NOT_TRACK):
if os.environ.get(DO_NOT_TRACK) or AIRBYTE_OFFLINE_MODE:
# User has opted out of tracking.
return False

Expand Down Expand Up @@ -207,7 +208,7 @@ def send_telemetry(
exception: Exception | None = None,
) -> None:
# If DO_NOT_TRACK is set, we don't send any telemetry
if os.environ.get(DO_NOT_TRACK):
if os.environ.get(DO_NOT_TRACK) or AIRBYTE_OFFLINE_MODE:
return

payload_props: dict[str, str | int | dict] = {
Expand Down
20 changes: 20 additions & 0 deletions airbyte/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,23 @@ def _str_to_bool(value: str) -> bool:
This value is read from the `AIRBYTE_TEMP_FILE_CLEANUP` environment variable. If the variable is
not set, the default value is `True`.
"""

AIRBYTE_OFFLINE_MODE = _str_to_bool(
os.getenv(
key="AIRBYTE_OFFLINE_MODE",
default="false",
)
)
"""Enable or disable offline mode.
When offline mode is enabled, PyAirbyte will attempt to fetch metadata for connectors from the
Airbyte registry but will not raise an error if the registry is unavailable. This can be useful in
environments without internet access or with air-gapped networks.
Offline mode also disables telemetry, similar to a `DO_NOT_TRACK` setting, ensuring no usage data
is sent from your environment. You may also specify a custom registry URL via the`_REGISTRY_ENV_VAR`
environment variable if you prefer to use a different registry source for metadata.
This setting helps you make informed choices about data privacy and operation in restricted and
air-gapped environments.
"""
28 changes: 22 additions & 6 deletions airbyte/sources/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

from airbyte import exceptions as exc
from airbyte._util.meta import is_docker_installed
from airbyte.constants import AIRBYTE_OFFLINE_MODE
from airbyte.logs import warn_once
from airbyte.version import get_version


Expand Down Expand Up @@ -180,6 +182,10 @@ def _get_registry_url() -> str:
return _REGISTRY_URL


def _is_registry_disabled(url: str) -> bool:
return url.upper() in {"0", "F", "FALSE"} or AIRBYTE_OFFLINE_MODE


def _registry_entry_to_connector_metadata(entry: dict) -> ConnectorMetadata:
name = entry["dockerRepository"].replace("airbyte/", "")
latest_version: str | None = entry.get("dockerImageTag")
Expand Down Expand Up @@ -233,6 +239,10 @@ def _get_registry_cache(*, force_refresh: bool = False) -> dict[str, ConnectorMe
return __cache

registry_url = _get_registry_url()

if _is_registry_disabled(registry_url):
return {}

if registry_url.startswith("http"):
response = requests.get(
registry_url,
Expand All @@ -256,23 +266,29 @@ def _get_registry_cache(*, force_refresh: bool = False) -> dict[str, ConnectorMe
new_cache[connector_metadata.name] = connector_metadata

if len(new_cache) == 0:
raise exc.PyAirbyteInternalError(
message="Connector registry is empty.",
context={
"registry_url": _get_registry_url(),
},
# This isn't necessarily fatal, since users can bring their own
# connector definitions.
warn_once(
message=f"Connector registry is empty: {registry_url}",
with_stack=False,
)

__cache = new_cache
return __cache


def get_connector_metadata(name: str) -> ConnectorMetadata:
def get_connector_metadata(name: str) -> None | ConnectorMetadata:
"""Check the cache for the connector.
If the cache is empty, populate by calling update_cache.
"""
registry_url = _get_registry_url()

if _is_registry_disabled(registry_url):
return None

cache = copy(_get_registry_cache())

if not cache:
raise exc.PyAirbyteInternalError(
message="Connector registry could not be loaded.",
Expand Down

0 comments on commit 7c703ac

Please sign in to comment.