From 53246a3b8377f8cbf7281f9719f52e65979c3ac6 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Sun, 6 Oct 2024 17:19:49 -0700 Subject: [PATCH] Feat: Add new PyAirbyte CLI for connector validation and benchmarking; add helper functions `get_noop_destination()` and `get_benchmark_source()` (#411) --- airbyte/__init__.py | 2 + airbyte/cli.py | 411 +++++++++++++++++++++++++++++++ airbyte/destinations/__init__.py | 2 + airbyte/destinations/util.py | 27 ++ airbyte/sources/__init__.py | 6 +- airbyte/sources/util.py | 49 ++++ examples/run_perf_test_reads.py | 126 ++++------ pyproject.toml | 3 +- 8 files changed, 549 insertions(+), 77 deletions(-) create mode 100644 airbyte/cli.py diff --git a/airbyte/__init__.py b/airbyte/__init__.py index 93037fea..ef6b1b1e 100644 --- a/airbyte/__init__.py +++ b/airbyte/__init__.py @@ -125,6 +125,7 @@ from airbyte import ( caches, + # cli, # Causes circular import if included cloud, constants, datasets, @@ -156,6 +157,7 @@ __all__ = [ # Modules "caches", + # "cli", # Causes circular import if included "cloud", "constants", "datasets", diff --git a/airbyte/cli.py b/airbyte/cli.py new file mode 100644 index 00000000..5d3082c1 --- /dev/null +++ b/airbyte/cli.py @@ -0,0 +1,411 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""CLI for PyAirbyte. + +The PyAirbyte CLI provides a command-line interface for testing connectors and running benchmarks. + +PyAirbyte CLI can be invoked with the `pyairbyte` CLI executable, or the +shorter `pyab` alias. + +These are equivalent: + + ```bash + python -m airbyte.cli --help + pyairbyte --help + pyab --help + ``` + +You can also use the fast and powerful `uv` tool to run the CLI without pre-installing: + + ``` + # Install `uv` if you haven't already: + brew install uv + + # Run the PyAirbyte CLI using `uvx`: + uvx --from=airbyte pyab --help + ``` +""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import TYPE_CHECKING, Any + +import click +import yaml + +from airbyte.destinations.util import get_destination, get_noop_destination +from airbyte.exceptions import PyAirbyteInputError +from airbyte.secrets.util import get_secret +from airbyte.sources.util import get_benchmark_source, get_source + + +if TYPE_CHECKING: + from airbyte.destinations.base import Destination + from airbyte.sources.base import Source + + +CLI_GUIDANCE = """ +---------------------- + +PyAirbyte CLI Guidance + +Providing connector configuration: + +When providing configuration via `--config`, you can providing any of the following: + +1. A path to a configuration file, in yaml or json format. + +2. An inline yaml string, e.g. `--config='{key: value}'`, --config='{key: {nested: value}}'. + +When providing an inline yaml string, it is recommended to use single quotes to avoid shell +interpolation. + +Providing secrets: + +You can provide secrets in your configuration file by prefixing the secret value with `SECRET:`. +For example, --config='{password: "SECRET:my_password"'} will look for a secret named `my_password` +in the secret store. By default, PyAirbyte will look for secrets in environment variables and +dotenv (.env) files. If a secret is not found, you'll be prompted to provide the secret value +interactively in the terminal. + +It is highly recommended to use secrets when using inline yaml strings, in order to avoid +exposing secrets in plain text in the terminal history. Secrets provided interactively will +not be echoed to the terminal. +""" + +# Add the CLI guidance to the module docstring. +globals()["__doc__"] = globals().get("__doc__", "") + CLI_GUIDANCE + +CONFIG_HELP = ( + "Either a path to a configuration file for the named source or destination, " + "or an inline yaml string. If providing an inline yaml string, use single quotes " + "to avoid shell interpolation. For example, --config='{key: value}' or " + "--config='{key: {nested: value}}'. \n" + "PyAirbyte secrets can be accessed by prefixing the secret name with 'SECRET:'. " + """For example, --config='{password: "SECRET:MY_PASSWORD"}'.""" +) + + +def _resolve_config( + config: str, +) -> dict[str, Any]: + """Resolve the configuration file into a dictionary.""" + + def _inject_secrets(config_dict: dict[str, Any]) -> None: + """Inject secrets into the configuration dictionary.""" + for key, value in config_dict.items(): + if isinstance(value, dict): + _inject_secrets(value) + elif isinstance(value, str) and value.startswith("SECRET:"): + config_dict[key] = get_secret(value.removeprefix("SECRET:").strip()) + + config_dict: dict[str, Any] + if config.startswith("{"): + # Treat this as an inline yaml string: + config_dict = yaml.safe_load(config) + else: + # Treat this as a path to a config file: + config_path = Path(config) + if not config_path.exists(): + raise PyAirbyteInputError( + message="Config file not found.", + input_value=str(config_path), + ) + config_dict = json.loads(config_path.read_text(encoding="utf-8")) + + _inject_secrets(config_dict) + return config_dict + + +def _resolve_source_job( + *, + source: str | None = None, + config: str | None = None, + streams: str | None = None, +) -> Source: + """Resolve the source job into a configured Source object. + + Args: + source: The source name, with an optional version declaration. + If a path is provided, the source will be loaded from the local path. + If the string `'.'` is provided, the source will be loaded from the current + working directory. + config: The path to a configuration file for the named source or destination. + streams: A comma-separated list of stream names to select for reading. If set to "*", + all streams will be selected. If not provided, all streams will be selected. + """ + source_obj: Source + if source and (source.startswith(".") or "/" in source): + # Treat the source as a path. + source_executable = Path(source) + if not source_executable.exists(): + raise PyAirbyteInputError( + message="Source executable not found.", + context={ + "source": source, + }, + ) + source_obj = get_source( + name=source_executable.stem, + local_executable=source_executable, + ) + return source_obj + if not config: + raise PyAirbyteInputError( + message="No configuration found.", + ) + if not source or not source.startswith("source-"): + raise PyAirbyteInputError( + message="Expected a source name or path to executable.", + input_value=source, + ) + + source_name: str = source + streams_list: str | list[str] = streams or "*" + if isinstance(streams, str) and streams != "*": + streams_list = [stream.strip() for stream in streams.split(",")] + + return get_source( + name=source_name, + config=_resolve_config(config) if config else {}, + streams=streams_list, + ) + + +def _resolve_destination_job( + *, + destination: str, + config: str | None = None, +) -> Destination: + """Resolve the destination job into a configured Destination object. + + Args: + destination: The destination name, with an optional version declaration. + If a path is provided, the destination will be loaded from the local path. + If the string `'.'` is provided, the destination will be loaded from the current + working directory. + config: The path to a configuration file for the named source or destination. + """ + if not config: + raise PyAirbyteInputError( + message="No configuration found.", + ) + + config_dict = _resolve_config(config) + + if destination and (destination.startswith(".") or "/" in destination): + # Treat the destination as a path. + destination_executable = Path(destination) + if not destination_executable.exists(): + raise PyAirbyteInputError( + message="Destination executable not found.", + context={ + "destination": destination, + }, + ) + return get_destination( + name=destination_executable.stem, + local_executable=destination_executable, + config=config_dict, + ) + + # else: # Treat the destination as a name. + + return get_destination( + name=destination, + config=config_dict, + ) + + +@click.command( + help=( + "Validate the connector has a valid CLI and is able to run `spec`. " + "If 'config' is provided, we will also run a `check` on the connector " + "with the provided config.\n\n" + CLI_GUIDANCE + ), +) +@click.option( + "--connector", + type=str, + help="The connector name or a path to the local executable.", +) +@click.option( + "--config", + type=str, + required=False, + help=CONFIG_HELP, +) +@click.option( + "--install", + is_flag=True, + default=False, + help=( + "Whether to install the connector if it is not available locally. " + "Defaults to False, meaning the connector is expected to be already be installed." + ), +) +def validate( + connector: str | None = None, + config: str | None = None, + *, + install: bool = False, +) -> None: + """Validate the connector.""" + local_executable: Path | None = None + if not connector: + raise PyAirbyteInputError( + message="No connector provided.", + ) + if connector.startswith(".") or "/" in connector: + # Treat the connector as a path. + local_executable = Path(connector) + if not local_executable.exists(): + raise PyAirbyteInputError( + message="Connector executable not found.", + context={ + "connector": connector, + }, + ) + connector_name = local_executable.stem + else: + connector_name = connector + + if not connector_name.startswith("source-") and not connector_name.startswith("destination-"): + raise PyAirbyteInputError( + message=( + "Expected a connector name or path to executable. " + "Connector names are expected to begin with 'source-' or 'destination-'." + ), + input_value=connector, + ) + + connector_obj: Source | Destination + if connector_name.startswith("source-"): + connector_obj = get_source( + name=connector_name, + local_executable=local_executable, + install_if_missing=install, + ) + else: # destination + connector_obj = get_destination( + name=connector_name, + local_executable=local_executable, + install_if_missing=install, + ) + + print("Getting `spec` output from connector...") + connector_obj.print_config_spec() + + if config: + print("Running connector check...") + config_dict: dict[str, Any] = _resolve_config(config) + connector_obj.set_config(config_dict) + connector_obj.check() + + +@click.command() +@click.option( + "--source", + type=str, + help=( + "The source name, with an optional version declaration. " + "If a path is provided, it will be interpreted as a path to the local executable. " + ), +) +@click.option( + "--streams", + type=str, + default="*", + help=( + "A comma-separated list of stream names to select for reading. If set to '*', all streams " + "will be selected. Defaults to '*'." + ), +) +@click.option( + "--num-records", + type=str, + default="5e5", + help=( + "The number of records to generate for the benchmark. Ignored if a source is provided. " + "You can specify the number of records to generate using scientific notation. " + "For example, `5e6` will generate 5 million records. By default, 500,000 records will " + "be generated (`5e5` records). If underscores are providing within a numeric a string, " + "they will be ignored." + ), +) +@click.option( + "--destination", + type=str, + help=( + "The destination name, with an optional version declaration. " + "If a path is provided, it will be interpreted as a path to the local executable. " + ), +) +@click.option( + "--config", + type=str, + help=CONFIG_HELP, +) +def benchmark( + source: str | None = None, + streams: str = "*", + num_records: int | str = "5e5", # 500,000 records + destination: str | None = None, + config: str | None = None, +) -> None: + """Run benchmarks. + + You can provide either a source or a destination, but not both. If a destination is being + benchmarked, you can use `--num-records` to specify the number of records to generate for the + benchmark. + + If a source is being benchmarked, you can provide a configuration file or a job + definition file to run the source job. + """ + if source and destination: + raise PyAirbyteInputError( + message="For benchmarking, source or destination can be provided, but not both.", + ) + destination_obj: Destination + source_obj: Source + + source_obj = ( + _resolve_source_job( + source=source, + config=config, + streams=streams, + ) + if source + else get_benchmark_source( + num_records=num_records, + ) + ) + destination_obj = ( + _resolve_destination_job( + destination=destination, + config=config, + ) + if destination + else get_noop_destination() + ) + + click.echo("Running benchmarks...") + destination_obj.write( + source_data=source_obj, + cache=False, + state_cache=False, + ) + + +@click.group() +def cli() -> None: + """PyAirbyte CLI.""" + pass + + +cli.add_command(validate) +cli.add_command(benchmark) + +if __name__ == "__main__": + cli() diff --git a/airbyte/destinations/__init__.py b/airbyte/destinations/__init__.py index c35d1786..8a03ab75 100644 --- a/airbyte/destinations/__init__.py +++ b/airbyte/destinations/__init__.py @@ -79,6 +79,7 @@ from airbyte.destinations.base import Destination from airbyte.destinations.util import ( get_destination, + get_noop_destination, ) @@ -87,6 +88,7 @@ "util", # Methods "get_destination", + "get_noop_destination", # Classes "Destination", ] diff --git a/airbyte/destinations/util.py b/airbyte/destinations/util.py index f3b2f33a..0531af4a 100644 --- a/airbyte/destinations/util.py +++ b/airbyte/destinations/util.py @@ -68,3 +68,30 @@ def get_destination( install_if_missing=install_if_missing, ), ) + + +def get_noop_destination() -> Destination: + """Get a devnull (no-op) destination. + + This is useful for performance benchmarking of sources, without + adding the overhead of writing data to a real destination. + """ + return get_destination( + "destination-dev-null", + config={ + "test_destination": { + "test_destination_type": "LOGGING", + "logging_config": { + "logging_type": "FirstN", + "max_entry_count": 100, + }, + } + }, + docker_image=True, + ) + + +__all__ = [ + "get_destination", + "get_noop_destination", +] diff --git a/airbyte/sources/__init__.py b/airbyte/sources/__init__.py index 8af539e3..f29c6ea7 100644 --- a/airbyte/sources/__init__.py +++ b/airbyte/sources/__init__.py @@ -10,7 +10,10 @@ get_available_connectors, get_connector_metadata, ) -from airbyte.sources.util import get_source +from airbyte.sources.util import ( + get_benchmark_source, + get_source, +) __all__ = [ @@ -19,6 +22,7 @@ "util", # Factories "get_source", + "get_benchmark_source", # Helper Functions "get_available_connectors", "get_connector_metadata", diff --git a/airbyte/sources/util.py b/airbyte/sources/util.py index 35837444..b2c01939 100644 --- a/airbyte/sources/util.py +++ b/airbyte/sources/util.py @@ -4,9 +4,11 @@ from __future__ import annotations import warnings +from decimal import Decimal, InvalidOperation from typing import TYPE_CHECKING, Any from airbyte._executors.util import get_connector_executor +from airbyte.exceptions import PyAirbyteInputError from airbyte.sources.base import Source @@ -116,6 +118,53 @@ def get_source( # noqa: PLR0913 # Too many arguments ) +def get_benchmark_source( + num_records: int | str = "5e5", +) -> Source: + """Get a source for benchmarking. + + This source will generate dummy records for performance benchmarking purposes. + You can specify the number of records to generate using the `num_records` parameter. + The `num_records` parameter can be an integer or a string in scientific notation. + For example, `"5e6"` will generate 5 million records. If underscores are providing + within a numeric a string, they will be ignored. + + Args: + num_records (int | str): The number of records to generate. Defaults to "5e5", or + 500,000 records. + Can be an integer (`1000`) or a string in scientific notation. + For example, `"5e6"` will generate 5 million records. + + Returns: + Source: The source object for benchmarking. + """ + if isinstance(num_records, str): + try: + num_records = int(Decimal(num_records.replace("_", ""))) + except InvalidOperation as ex: + raise PyAirbyteInputError( + message="Invalid number format.", + original_exception=ex, + input_value=str(num_records), + ) from None + + return get_source( + name="source-e2e-test", + docker_image=True, + # docker_image="airbyte/source-e2e-test:latest", + config={ + "type": "BENCHMARK", + "schema": "FIVE_STRING_COLUMNS", + "terminationCondition": { + "type": "MAX_RECORDS", + "max": num_records, + }, + }, + streams="*", + ) + + __all__ = [ "get_source", + "get_benchmark_source", ] diff --git a/examples/run_perf_test_reads.py b/examples/run_perf_test_reads.py index 4d5ce6cf..f4a07893 100644 --- a/examples/run_perf_test_reads.py +++ b/examples/run_perf_test_reads.py @@ -2,80 +2,77 @@ """ Simple script to get performance profile of read throughput. -This script accepts a single argument `-e=SCALE` as a power of 10. +This script accepts a single argument `-n=NUM_RECORDS` with record count +provided as a regular number or in scientific notation. --e=2 is equivalent to 500 records. --e=3 is equivalent to 5_000 records. --e=4 is equivalent to 50_000 records. --e=5 is equivalent to 500_000 records. --e=6 is equivalent to 5_000_000 records. +When providing in scientific notation: -Use smaller values of `e` (2-3) to understand read and overhead costs. -Use larger values of `e` (4-5) to understand write throughput at scale. +-n=5e2 is equivalent to 500 records. +-n=5e3 is equivalent to 5_000 records. +-n=5e4 is equivalent to 50_000 records. +-n=5e5 is equivalent to 500_000 records. +-n=5e6 is equivalent to 5_000_000 records. For performance profiling, use `viztracer` to generate a flamegraph: ``` -poetry run viztracer --open -- ./examples/run_perf_test_reads.py -e=3 -poetry run viztracer --open -- ./examples/run_perf_test_reads.py -e=5 +poetry run viztracer --open -- ./examples/run_perf_test_reads.py -n=1e3 +poetry run viztracer --open -- ./examples/run_perf_test_reads.py -n=1e5 ``` To run without profiling, prefix script name with `poetry run python`: + ``` # Run with 5_000 records -poetry run python ./examples/run_perf_test_reads.py -e=3 +poetry run python ./examples/run_perf_test_reads.py -n=1e3 # Run with 500_000 records -poetry run python ./examples/run_perf_test_reads.py -e=5 +poetry run python ./examples/run_perf_test_reads.py -n=1e5 # Load 5_000 records to Snowflake -poetry run python ./examples/run_perf_test_reads.py -e=3 --cache=snowflake +poetry run python ./examples/run_perf_test_reads.py -n=1e3 --cache=snowflake # Load 5_000 records to BigQuery -poetry run python ./examples/run_perf_test_reads.py -e=3 --cache=bigquery +poetry run python ./examples/run_perf_test_reads.py -n=1e3 --cache=bigquery ``` You can also use this script to test destination load performance: ```bash # Load 5_000 records to BigQuery -poetry run python ./examples/run_perf_test_reads.py -e=5 --destination=e2e +poetry run python ./examples/run_perf_test_reads.py -n=1e3 --destination=e2e ``` Testing raw PyAirbyte throughput with and without caching: ```bash # Test raw PyAirbyte throughput with caching (Source->Cache): -poetry run python ./examples/run_perf_test_reads.py -e=5 +poetry run python ./examples/run_perf_test_reads.py -n=1e3 # Test raw PyAirbyte throughput without caching (Source->Destination): -poetry run python ./examples/run_perf_test_reads.py -e=5 --destination=e2e --no-cache +poetry run python ./examples/run_perf_test_reads.py -n=1e3 --destination=e2e --no-cache ``` Testing Python CDK throughput: ```bash -# Test max throughput: -poetry run python ./examples/run_perf_test_reads.py -n=2400000 --source=hardcoded --destination=e2e +# Test max throughput with 2.4 million records: +poetry run python ./examples/run_perf_test_reads.py -n=2.4e6 --source=hardcoded --destination=e2e + # Analyze tracing data: -poetry run viztracer --open -- ./examples/run_perf_test_reads.py -e=3 --source=hardcoded --destination=e2e +poetry run viztracer --open -- ./examples/run_perf_test_reads.py -n=1e3 --source=hardcoded --destination=e2e ``` - - -Note: -- The Faker stream ('purchases') is assumed to be 220 bytes, meaning 4_500 records is - approximately 1 MB. Based on this: 25K records/second is approximately 5.5 MB/s. -- The E2E stream is assumed to be 180 bytes, meaning 5_500 records is - approximately 1 MB. Based on this: 40K records/second is approximately 7.2 MB/s - and 60K records/second is approximately 10.5 MB/s. """ from __future__ import annotations import argparse import tempfile +from decimal import Decimal from typing import TYPE_CHECKING import airbyte as ab from airbyte.caches import BigQueryCache, CacheBase, SnowflakeCache +from airbyte.destinations import Destination, get_noop_destination from airbyte.secrets.google_gsm import GoogleGSMSecretManager +from airbyte.sources import get_benchmark_source from typing_extensions import Literal if TYPE_CHECKING: @@ -142,8 +139,11 @@ def get_cache( def get_source( source_alias: str, - num_records: int, + num_records: int | str, ) -> Source: + if isinstance(num_records, str): + num_records = int(Decimal(num_records)) + if source_alias == "faker": return ab.get_source( "source-faker", @@ -152,20 +152,8 @@ def get_source( streams=["purchases"], ) - if source_alias == "e2e": - return ab.get_source( - "source-e2e", - docker_image="airbyte/source-e2e-test:latest", - streams="*", - config={ - "type": "BENCHMARK", - "schema": "FIVE_STRING_COLUMNS", - "terminationCondition": { - "type": "MAX_RECORDS", - "max": num_records, - }, - }, - ) + if source_alias in ["e2e", "benchmark"]: + return get_benchmark_source(num_records=num_records) if source_alias == "hardcoded": return ab.get_source( @@ -180,32 +168,19 @@ def get_source( def get_destination(destination_type: str) -> ab.Destination: - if destination_type == "e2e": - return ab.get_destination( - name="destination-e2e-test", - config={ - "test_destination": { - "test_destination_type": "LOGGING", - "logging_config": { - "logging_type": "FirstN", - "max_entry_count": 100, - }, - } - }, - docker_image="airbyte/destination-e2e-test:latest", - ) + if destination_type in ["e2e", "noop"]: + return get_noop_destination() raise ValueError(f"Unknown destination type: {destination_type}") # noqa: TRY003 def main( - e: int | None = None, - n: int | None = None, + n: int | str = "5e5", cache_type: Literal["duckdb", "bigquery", "snowflake", False] = "duckdb", source_alias: str = "e2e", destination_type: str | None = None, ) -> None: - num_records: int = n or 5 * (10 ** (e or 3)) + num_records = int(Decimal(n)) cache_type = "duckdb" if cache_type is None else cache_type cache: CacheBase | Literal[False] = get_cache( @@ -216,34 +191,31 @@ def main( num_records=num_records, ) source.check() + destination: Destination | None = None + if destination_type: destination = get_destination(destination_type=destination_type) + if cache is not False: read_result = source.read(cache) - if destination_type: + if destination: destination.write(read_result) else: + assert ( + destination is not None + ), "Destination is required when caching is disabled." destination.write(source, cache=False) if __name__ == "__main__": parser = argparse.ArgumentParser(description="Run performance test reads.") - parser.add_argument( - "-e", - type=int, - help=( - "The scale, as a power of 10." - "Recommended values: 2-3 (500 or 5_000) for read and overhead costs, " - " 4-6 (50K or 5MM) for write throughput. " - "This is mutually exclusive with -n." - ), - ) parser.add_argument( "-n", - type=int, + type=str, help=( "The number of records to generate in the source. " - "This is mutually exclusive with -e." + "This can be provided in scientific notation, for instance " + "'2.4e6' for 2.4 million and '5e5' for 500K." ), ) parser.add_argument( @@ -266,7 +238,12 @@ def main( "while the `faker` source runs natively in Python. The 'hardcoded' source is " "similar to the 'e2e' source, but written in Python." ), - choices=["faker", "e2e", "hardcoded"], + choices=[ + "benchmark", + "e2e", + "hardcoded", + "faker", + ], default="hardcoded", ) parser.add_argument( @@ -279,7 +256,6 @@ def main( args = parser.parse_args() main( - e=args.e, n=args.n, cache_type=args.cache if not args.no_cache else False, source_alias=args.source, diff --git a/pyproject.toml b/pyproject.toml index d8de045c..9c3d65f0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -313,7 +313,8 @@ venvPath = "./" # Assuming .venv is at the root of your project venv = ".venv" [tool.poetry.scripts] -airbyte-lib-validate-source = "airbyte.validate:run" +pyairbyte = "airbyte.cli:cli" +pyab = "airbyte.cli:cli" [tool.poe.tasks] test = { shell = "pytest" }