Skip to content

Commit

Permalink
refactor: Additional config options, upgrade SDK, fix tests, run pip …
Browse files Browse the repository at this point in the history
…check (#23)

I put together a few fixes and new features aimed at giving users a bit
more control over the way wrapped sources execute. Now you can force
execution in docker.

Fixes:

* Fix existing tests that were broken
* Fix natively executable sources on Windows platforms

Updates:
* Update Meltano SDK
* Use `venv` instead of a direct subprocess call to create a virtual
environment.

New:
* Two new configuration options
* `skip_native_check` This allows users to opt out of the check for
natively executable sources. This could be preferable when you want to
force execution in docker containers.
* `native_source_python` This allows you to use a different version of
Python to create the virtual environment instead of just the one on your
PATH.
* Run a pip check after creating a native venv.
  • Loading branch information
SpaceCondor authored Aug 30, 2024
1 parent e1349aa commit c3d1620
Show file tree
Hide file tree
Showing 8 changed files with 720 additions and 2,607 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ jobs:
build:
strategy:
matrix:
py_version: ["3.8", "3.9", "3.10"]
py_version: ["3.9", "3.10"]
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Setup Python ${{ matrix.py_version }}
uses: actions/setup-python@v3
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.py_version }}
- name: Install Tap Airbyte Wrapper
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/project_add.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
runs-on: ubuntu-latest
if: ${{ github.actor != 'dependabot[bot]' }}
steps:
- uses: actions/add-to-project@v0.5.0
- uses: actions/add-to-project@v1.0.2
with:
project-url: https://github.com/orgs/MeltanoLabs/projects/3
github-token: ${{ secrets.MELTYBOT_PROJECT_ADD_PAT }}
1,568 changes: 530 additions & 1,038 deletions poetry.lock

Large diffs are not rendered by default.

75 changes: 48 additions & 27 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,44 +1,65 @@
[tool.poetry]
name = "tap-airbyte"
version = "0.6.0"
description = "`tap-airbyte` is a Singer tap for Airbyte, built with the Meltano Singer SDK."
version = "0.7.0"
description = "Singer tap for Airbyte, built with the Meltano Singer SDK."
readme = "README.md"
authors = ["Alex Butler"]
keywords = ["ELT", "Airbyte"]
license = "Apache 2.0"
keywords = [
"ELT",
"Airbyte",
]
classifiers = [
"Intended Audience :: Developers",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
]
license = "Apache-2.0"

[tool.poetry.dependencies]
python = "<3.12,>=3.8" # airbyte cdk is broken on 3.11 due to mutable defaults in dataclasses, go figure
requests = "^2.25.1"
singer-sdk = { version = "^0.36.0" }
fs-s3fs = { version = "^1.1.1", optional = true }
orjson = "^3.8.3"

[tool.poetry.dev-dependencies]
pytest = "^6.2.5"
tox = "^3.24.4"
flake8 = "^3.9.2"
black = ">=21"
pydocstyle = "^6.1.1"
mypy = "^0.910"
types-requests = "^2.26.1"
isort = "^5.10.1"
python = ">=3.9,<3.11"
singer-sdk = { version="~=0.39.0", extras = [] }
fs-s3fs = { version = "~=1.1.1", optional = true }
orjson = "^3.10.6"
virtualenv = "^20.26.3"

[tool.poetry.group.dev.dependencies]
pytest = ">=8"
singer-sdk = { version="~=0.39.0", extras = ["testing"] }

[tool.poetry.extras]
s3 = ["fs-s3fs"]

[tool.mypy]
python_version = "3.10"
warn_unused_configs = true

[tool.ruff]
src = ["tap_airbyte"]
target-version = "py39"

[tool.ruff.lint]
ignore = [
"ANN101", # missing-type-self
"ANN102", # missing-type-cls
"COM812", # missing-trailing-comma
"ISC001", # single-line-implicit-string-concatenation
]
select = ["ALL"]

[tool.ruff.lint.flake8-annotations]
allow-star-arg-any = true

[tool.ruff.lint.isort]
known-first-party = ["tap_airbyte"]

[tool.black] # https://black.readthedocs.io/en/stable/usage_and_configuration/the_basics.html#configuration-via-a-file
line-length = 100
target-version = ["py39"]
target-version = ["py310"]
preview = true

[tool.isort] # https://pycqa.github.io/isort/docs/configuration/options.html
color_output = true
line_length = 100
profile = "black"
src_paths = "tap_airbyte"

[build-system]
requires = ["poetry-core>=1.0.8"]
requires = ["poetry-core==1.9.0"]
build-backend = "poetry.core.masonry.api"

[tool.poetry.scripts]
Expand Down
139 changes: 98 additions & 41 deletions tap_airbyte/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import orjson
import requests
import singer_sdk._singerlib as singer
import virtualenv
from singer_sdk import Stream, Tap
from singer_sdk import typing as th
from singer_sdk.cli import common_options
Expand Down Expand Up @@ -163,10 +164,26 @@ class TapAirbyte(Tap):
" documentation"
),
),
th.Property(
"skip_native_check",
th.BooleanType,
required=False,
default=False,
description="Disables the check for natively executable sources. By default, AirByte sources are checked "
"to see if they are able to be executed natively without using containers. This disables that "
"check and forces them to run in containers.",
),
th.Property(
"native_source_python",
th.StringType,
required=False,
description="Path to Python executable to use.",
)

).to_dict()
airbyte_mount_dir: str = os.getenv("AIRBYTE_MOUNT_DIR", "/tmp")
pipe_status = None

eof_received = None
# Airbyte image to run
_image: t.Optional[str] = None # type: ignore
_tag: t.Optional[str] = None # type: ignore
Expand Down Expand Up @@ -212,14 +229,14 @@ def cli(cls) -> t.Callable:
context_settings={"help_option_names": ["--help"]},
)
def cli(
version: bool = False,
about: bool = False,
discover: bool = False,
test: bool = False,
config: tuple[str, ...] = (),
state: t.Optional[str] = None,
catalog: t.Optional[str] = None,
format: t.Optional[str] = None,
version: bool = False,
about: bool = False,
discover: bool = False,
test: bool = False,
config: tuple[str, ...] = (),
state: t.Optional[str] = None,
catalog: t.Optional[str] = None,
cli_format: t.Optional[str] = None,
) -> None:
if version:
cls.print_version()
Expand Down Expand Up @@ -255,13 +272,13 @@ def cli(
spec = tap.run_spec()["connectionSpecification"]
except Exception:
cls.logger.info("Tap-Airbyte instantiation failed. Printing basic about info.")
cls.print_about(output_format=format)
cls.print_about(output_format=cli_format)
else:
cls.logger.info(
"Tap-Airbyte instantiation succeeded. Printing spec-enriched about info."
)
cls.config_jsonschema["properties"]["airbyte_config"] = spec
cls.print_about(output_format=format)
cls.print_about(output_format=cli_format)
cls.print_spec_as_config(spec)
return
# End modification
Expand Down Expand Up @@ -310,20 +327,59 @@ def _ensure_oci(self) -> None:
sys.exit(1)
self.logger.info("Successfully executed %s version.", self.container_runtime)

def _ensure_installed(self) -> None:
"""Install the source connector from PyPI if necessary."""
if not self.venv.exists():
subprocess.run(
[sys.executable, "-m", "venv", self.venv],
check=True,
stdout=subprocess.PIPE,
)
if not (self.venv / "bin" / self.source_name).exists():
subprocess.run(
[self.venv / "bin" / "pip", "install", self._get_requirement_string()],
check=True,
stdout=subprocess.PIPE,
)
@property
def native_venv_path(self) -> Path:
"""Get the path to the virtual environment for the connector."""
return Path(__file__).parent.resolve() / f".venv-airbyte-{self.source_name}"

@property
def native_venv_bin_path(self) -> Path:
"""Get the path to the virtual environment for the connector bin."""
return self.native_venv_path / ("Scripts" if sys.platform == "win32" else "bin")

def setup_native_connector_venv(self) -> None:
"""Creates a virtual environment and installs the source connector via PyPI"""
if self.native_venv_path.exists():
self.logger.info("Virtual environment for source already exists.")
return

self.logger.info(
"Creating virtual environment at %s, using %s Python.",
self.native_venv_path,
self.config.get("native_source_python", "default")
)

# Construct the arguments list for virtualenv
args = []

if self.config.get("native_source_python"):
args.append(["-p", self.config["native_source_python"]])
args.append(str(self.native_venv_path))

# Run the virtualenv command
virtualenv.cli_run(args)

self.logger.info(
"Installing %s in the virtual environment..",
self._get_requirement_string()
)

subprocess.run(
[self.native_venv_bin_path / "pip", "install",
self._get_requirement_string()],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT
)

def _run_pip_check(self) -> str:
process = subprocess.run(
[self.native_venv_bin_path / "pip", "check"],
capture_output=True,
text=True
)

return process.stdout

def _get_requirement_string(self) -> str:
"""Get the requirement string for the source connector."""
Expand All @@ -336,6 +392,8 @@ def _get_requirement_string(self) -> str:
def is_native(self) -> bool:
"""Check if the connector is available on PyPI and can be managed natively without Docker."""
is_native = False
if self.config.get("skip_native_check", False):
return is_native
try:
response = requests.get(
"https://connectors.airbyte.com/files/registries/v0/oss_registry.json",
Expand All @@ -352,13 +410,15 @@ def is_native(self) -> bool:
except Exception:
pass
if is_native:
self._ensure_installed()
self.setup_native_connector_venv()
pip_result = self._run_pip_check()
self.logger.info(f"pip check results: {pip_result}")
else:
self._ensure_oci()
return is_native

def to_command(
self, *airbyte_cmd: str, docker_args: t.Optional[t.List[str]] = None
self, *airbyte_cmd: str, docker_args: t.Optional[t.List[str]] = None
) -> t.List[t.Union[str, Path]]:
"""Construct the command to run the Airbyte connector."""
return (
Expand All @@ -369,7 +429,6 @@ def to_command(
"run",
*(docker_args or []),
f"{self.image}:{self.tag}",
"--",
*airbyte_cmd,
]
)
Expand Down Expand Up @@ -505,10 +564,8 @@ def run_connection_test(self) -> bool:
def run_read(self) -> t.Iterator[subprocess.Popen]:
"""Run the read command for the Airbyte connector."""
with TemporaryDirectory() as host_tmpdir:
with (
open(f"{host_tmpdir}/config.json", "wb") as config,
open(f"{host_tmpdir}/catalog.json", "wb") as catalog,
):
with open(f"{host_tmpdir}/config.json", "wb") as config, open(f"{host_tmpdir}/catalog.json",
"wb") as catalog:
config.write(orjson.dumps(self.config.get("airbyte_config", {})))
catalog.write(orjson.dumps(self.configured_airbyte_catalog))
if self.airbyte_state:
Expand Down Expand Up @@ -727,8 +784,8 @@ def sync_all(self) -> None:
)
stream_buffer.put_nowait(airbyte_message["record"]["data"])
elif airbyte_message["type"] in (
AirbyteMessage.LOG,
AirbyteMessage.TRACE,
AirbyteMessage.LOG,
AirbyteMessage.TRACE,
):
self._process_log_message(airbyte_message)
elif airbyte_message["type"] == AirbyteMessage.STATE:
Expand All @@ -747,7 +804,7 @@ def sync_all(self) -> None:
singer.write_message(singer.StateMessage(self.airbyte_state))
else:
self.logger.warning("Unhandled message: %s", airbyte_message)
# Daemon threads will be terminated when the main thread exits
# Daemon threads will be terminated when the main thread exits,
# so we do not need to wait on them to join after SIGPIPE
if TapAirbyte.pipe_status is not PIPE_CLOSED:
self.logger.info("Waiting for sync threads to finish...")
Expand Down Expand Up @@ -777,13 +834,13 @@ def discover_streams(self) -> t.List["AirbyteStream"]:
if "cursor_field" in stream and isinstance(stream["cursor_field"][0], str):
airbyte_stream.replication_key = stream["cursor_field"][0]
elif (
"source_defined_cursor" in stream
and isinstance(stream["source_defined_cursor"], bool)
and stream["source_defined_cursor"]
"source_defined_cursor" in stream
and isinstance(stream["source_defined_cursor"], bool)
and stream["source_defined_cursor"]
):
# The stream has a source defined cursor. Try using that
if "default_cursor_field" in stream and isinstance(
stream["default_cursor_field"][0], str
stream["default_cursor_field"][0], str
):
airbyte_stream.replication_key = stream["default_cursor_field"][0]
else:
Expand All @@ -797,7 +854,7 @@ def discover_streams(self) -> t.List["AirbyteStream"]:
if "primary_key" in stream and isinstance(stream["primary_key"][0], t.List):
airbyte_stream.primary_keys = stream["primary_key"][0]
elif "source_defined_primary_key" in stream and isinstance(
stream["source_defined_primary_key"][0], t.List
stream["source_defined_primary_key"][0], t.List
):
airbyte_stream.primary_keys = stream["source_defined_primary_key"][0]
except IndexError:
Expand Down Expand Up @@ -843,7 +900,7 @@ def buffer(self) -> Queue:
def get_records(self, context: t.Optional[dict]) -> t.Iterable[dict]:
"""Get records from the stream."""
while (
self.parent.eof_received is False or not self.buffer.empty()
self.parent.eof_received is False or not self.buffer.empty()
) and TapAirbyte.pipe_status is not PIPE_CLOSED:
try:
# The timeout permits the consumer to re-check the producer is alive
Expand Down
Loading

0 comments on commit c3d1620

Please sign in to comment.