diff --git a/airbyte/cli.py b/airbyte/cli.py index 97b39c92..2ae46033 100644 --- a/airbyte/cli.py +++ b/airbyte/cli.py @@ -76,6 +76,7 @@ """For example, --config='{password: "SECRET:MY_PASSWORD"}'.""" ) + def _resolve_config( config: str, ) -> dict[str, Any]: diff --git a/examples/run_perf_test_reads.py b/examples/run_perf_test_reads.py index 4d5ce6cf..f4a07893 100644 --- a/examples/run_perf_test_reads.py +++ b/examples/run_perf_test_reads.py @@ -2,80 +2,77 @@ """ Simple script to get performance profile of read throughput. -This script accepts a single argument `-e=SCALE` as a power of 10. +This script accepts a single argument `-n=NUM_RECORDS` with record count +provided as a regular number or in scientific notation. --e=2 is equivalent to 500 records. --e=3 is equivalent to 5_000 records. --e=4 is equivalent to 50_000 records. --e=5 is equivalent to 500_000 records. --e=6 is equivalent to 5_000_000 records. +When providing in scientific notation: -Use smaller values of `e` (2-3) to understand read and overhead costs. -Use larger values of `e` (4-5) to understand write throughput at scale. +-n=5e2 is equivalent to 500 records. +-n=5e3 is equivalent to 5_000 records. +-n=5e4 is equivalent to 50_000 records. +-n=5e5 is equivalent to 500_000 records. +-n=5e6 is equivalent to 5_000_000 records. For performance profiling, use `viztracer` to generate a flamegraph: ``` -poetry run viztracer --open -- ./examples/run_perf_test_reads.py -e=3 -poetry run viztracer --open -- ./examples/run_perf_test_reads.py -e=5 +poetry run viztracer --open -- ./examples/run_perf_test_reads.py -n=1e3 +poetry run viztracer --open -- ./examples/run_perf_test_reads.py -n=1e5 ``` To run without profiling, prefix script name with `poetry run python`: + ``` # Run with 5_000 records -poetry run python ./examples/run_perf_test_reads.py -e=3 +poetry run python ./examples/run_perf_test_reads.py -n=1e3 # Run with 500_000 records -poetry run python ./examples/run_perf_test_reads.py -e=5 +poetry run python ./examples/run_perf_test_reads.py -n=1e5 # Load 5_000 records to Snowflake -poetry run python ./examples/run_perf_test_reads.py -e=3 --cache=snowflake +poetry run python ./examples/run_perf_test_reads.py -n=1e3 --cache=snowflake # Load 5_000 records to BigQuery -poetry run python ./examples/run_perf_test_reads.py -e=3 --cache=bigquery +poetry run python ./examples/run_perf_test_reads.py -n=1e3 --cache=bigquery ``` You can also use this script to test destination load performance: ```bash # Load 5_000 records to BigQuery -poetry run python ./examples/run_perf_test_reads.py -e=5 --destination=e2e +poetry run python ./examples/run_perf_test_reads.py -n=1e3 --destination=e2e ``` Testing raw PyAirbyte throughput with and without caching: ```bash # Test raw PyAirbyte throughput with caching (Source->Cache): -poetry run python ./examples/run_perf_test_reads.py -e=5 +poetry run python ./examples/run_perf_test_reads.py -n=1e3 # Test raw PyAirbyte throughput without caching (Source->Destination): -poetry run python ./examples/run_perf_test_reads.py -e=5 --destination=e2e --no-cache +poetry run python ./examples/run_perf_test_reads.py -n=1e3 --destination=e2e --no-cache ``` Testing Python CDK throughput: ```bash -# Test max throughput: -poetry run python ./examples/run_perf_test_reads.py -n=2400000 --source=hardcoded --destination=e2e +# Test max throughput with 2.4 million records: +poetry run python ./examples/run_perf_test_reads.py -n=2.4e6 --source=hardcoded --destination=e2e + # Analyze tracing data: -poetry run viztracer --open -- ./examples/run_perf_test_reads.py -e=3 --source=hardcoded --destination=e2e +poetry run viztracer --open -- ./examples/run_perf_test_reads.py -n=1e3 --source=hardcoded --destination=e2e ``` - - -Note: -- The Faker stream ('purchases') is assumed to be 220 bytes, meaning 4_500 records is - approximately 1 MB. Based on this: 25K records/second is approximately 5.5 MB/s. -- The E2E stream is assumed to be 180 bytes, meaning 5_500 records is - approximately 1 MB. Based on this: 40K records/second is approximately 7.2 MB/s - and 60K records/second is approximately 10.5 MB/s. """ from __future__ import annotations import argparse import tempfile +from decimal import Decimal from typing import TYPE_CHECKING import airbyte as ab from airbyte.caches import BigQueryCache, CacheBase, SnowflakeCache +from airbyte.destinations import Destination, get_noop_destination from airbyte.secrets.google_gsm import GoogleGSMSecretManager +from airbyte.sources import get_benchmark_source from typing_extensions import Literal if TYPE_CHECKING: @@ -142,8 +139,11 @@ def get_cache( def get_source( source_alias: str, - num_records: int, + num_records: int | str, ) -> Source: + if isinstance(num_records, str): + num_records = int(Decimal(num_records)) + if source_alias == "faker": return ab.get_source( "source-faker", @@ -152,20 +152,8 @@ def get_source( streams=["purchases"], ) - if source_alias == "e2e": - return ab.get_source( - "source-e2e", - docker_image="airbyte/source-e2e-test:latest", - streams="*", - config={ - "type": "BENCHMARK", - "schema": "FIVE_STRING_COLUMNS", - "terminationCondition": { - "type": "MAX_RECORDS", - "max": num_records, - }, - }, - ) + if source_alias in ["e2e", "benchmark"]: + return get_benchmark_source(num_records=num_records) if source_alias == "hardcoded": return ab.get_source( @@ -180,32 +168,19 @@ def get_source( def get_destination(destination_type: str) -> ab.Destination: - if destination_type == "e2e": - return ab.get_destination( - name="destination-e2e-test", - config={ - "test_destination": { - "test_destination_type": "LOGGING", - "logging_config": { - "logging_type": "FirstN", - "max_entry_count": 100, - }, - } - }, - docker_image="airbyte/destination-e2e-test:latest", - ) + if destination_type in ["e2e", "noop"]: + return get_noop_destination() raise ValueError(f"Unknown destination type: {destination_type}") # noqa: TRY003 def main( - e: int | None = None, - n: int | None = None, + n: int | str = "5e5", cache_type: Literal["duckdb", "bigquery", "snowflake", False] = "duckdb", source_alias: str = "e2e", destination_type: str | None = None, ) -> None: - num_records: int = n or 5 * (10 ** (e or 3)) + num_records = int(Decimal(n)) cache_type = "duckdb" if cache_type is None else cache_type cache: CacheBase | Literal[False] = get_cache( @@ -216,34 +191,31 @@ def main( num_records=num_records, ) source.check() + destination: Destination | None = None + if destination_type: destination = get_destination(destination_type=destination_type) + if cache is not False: read_result = source.read(cache) - if destination_type: + if destination: destination.write(read_result) else: + assert ( + destination is not None + ), "Destination is required when caching is disabled." destination.write(source, cache=False) if __name__ == "__main__": parser = argparse.ArgumentParser(description="Run performance test reads.") - parser.add_argument( - "-e", - type=int, - help=( - "The scale, as a power of 10." - "Recommended values: 2-3 (500 or 5_000) for read and overhead costs, " - " 4-6 (50K or 5MM) for write throughput. " - "This is mutually exclusive with -n." - ), - ) parser.add_argument( "-n", - type=int, + type=str, help=( "The number of records to generate in the source. " - "This is mutually exclusive with -e." + "This can be provided in scientific notation, for instance " + "'2.4e6' for 2.4 million and '5e5' for 500K." ), ) parser.add_argument( @@ -266,7 +238,12 @@ def main( "while the `faker` source runs natively in Python. The 'hardcoded' source is " "similar to the 'e2e' source, but written in Python." ), - choices=["faker", "e2e", "hardcoded"], + choices=[ + "benchmark", + "e2e", + "hardcoded", + "faker", + ], default="hardcoded", ) parser.add_argument( @@ -279,7 +256,6 @@ def main( args = parser.parse_args() main( - e=args.e, n=args.n, cache_type=args.cache if not args.no_cache else False, source_alias=args.source,