Skip to content

Commit

Permalink
Rename DuckDB classes
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers committed Feb 20, 2024
1 parent 7339862 commit c042fa2
Show file tree
Hide file tree
Showing 10 changed files with 50 additions and 47 deletions.
4 changes: 2 additions & 2 deletions airbyte/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from airbyte._factories.cache_factories import get_default_cache, new_local_cache
from airbyte._factories.connector_factories import get_source
from airbyte.caches import DuckDBCache, DuckDBCacheConfig
from airbyte.caches import DuckDBCacheInstance, DuckDBCache
from airbyte.datasets import CachedDataset
from airbyte.registry import get_available_connectors
from airbyte.results import ReadResult
Expand All @@ -17,8 +17,8 @@

__all__ = [
"CachedDataset",
"DuckDBCacheInstance",
"DuckDBCache",
"DuckDBCacheConfig",
"get_available_connectors",
"get_source",
"get_default_cache",
Expand Down
14 changes: 7 additions & 7 deletions airbyte/_factories/cache_factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,27 @@
import ulid

from airbyte import exceptions as exc
from airbyte.caches.duckdb import DuckDBCache, DuckDBCacheConfig
from airbyte.caches.duckdb import DuckDBCacheInstance, DuckDBCache


def get_default_cache() -> DuckDBCache:
def get_default_cache() -> DuckDBCacheInstance:
"""Get a local cache for storing data, using the default database path.
Cache files are stored in the `.cache` directory, relative to the current
working directory.
"""
config = DuckDBCacheConfig(
config = DuckDBCache(
db_path="./.cache/default_cache_db.duckdb",
)
return DuckDBCache(config=config)
return DuckDBCacheInstance(config=config)


def new_local_cache(
cache_name: str | None = None,
cache_dir: str | Path | None = None,
*,
cleanup: bool = True,
) -> DuckDBCache:
) -> DuckDBCacheInstance:
"""Get a local cache for storing data, using a name string to seed the path.
Args:
Expand Down Expand Up @@ -55,9 +55,9 @@ def new_local_cache(
if not isinstance(cache_dir, Path):
cache_dir = Path(cache_dir)

config = DuckDBCacheConfig(
config = DuckDBCache(
db_path=cache_dir / f"db_{cache_name}.duckdb",
cache_dir=cache_dir,
cleanup=cleanup,
)
return DuckDBCache(config=config)
return DuckDBCacheInstance(config=config)
4 changes: 2 additions & 2 deletions airbyte/caches/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
from __future__ import annotations

from airbyte.caches.base import SQLCacheBase
from airbyte.caches.duckdb import DuckDBCache, DuckDBCacheConfig
from airbyte.caches.duckdb import DuckDBCacheInstance, DuckDBCache
from airbyte.caches.postgres import PostgresCache, PostgresCacheConfig
from airbyte.caches.snowflake import SnowflakeCacheConfig, SnowflakeSQLCache


# We export these classes for easy access: `airbyte.caches...`
__all__ = [
"DuckDBCacheInstance",
"DuckDBCache",
"DuckDBCacheConfig",
"PostgresCache",
"PostgresCacheConfig",
"SQLCacheBase",
Expand Down
9 changes: 5 additions & 4 deletions airbyte/caches/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
)


class DuckDBCacheConfig(SQLCacheConfigBase, ParquetWriterConfig):
class DuckDBCache(SQLCacheConfigBase, ParquetWriterConfig):
"""Configuration for the DuckDB cache.
Also inherits config from the ParquetWriter, which is responsible for writing files to disk.
Expand All @@ -45,6 +45,7 @@ def get_sql_alchemy_url(self) -> str:
# return f"duckdb:///{self.db_path}?schema={self.schema_name}"
return f"duckdb:///{self.db_path!s}"

@overrides
def get_database_name(self) -> str:
"""Return the name of the database."""
if self.db_path == ":memory:":
Expand All @@ -62,7 +63,7 @@ class DuckDBCacheBase(SQLCacheBase):
so we insert as values instead.
"""

config_class = DuckDBCacheConfig
config_class = DuckDBCache
supports_merge_insert = False

@overrides
Expand All @@ -72,15 +73,15 @@ def get_telemetry_info(self) -> CacheTelemetryInfo:
@overrides
def _setup(self) -> None:
"""Create the database parent folder if it doesn't yet exist."""
config = cast(DuckDBCacheConfig, self.config)
config = cast(DuckDBCache, self.config)

if config.db_path == ":memory:":
return

Path(config.db_path).parent.mkdir(parents=True, exist_ok=True)


class DuckDBCache(DuckDBCacheBase):
class DuckDBCacheInstance(DuckDBCacheBase):
"""A DuckDB implementation of the cache.
Parquet is used for local file storage before bulk loading.
Expand Down
1 change: 1 addition & 0 deletions airbyte/caches/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def get_sql_alchemy_url(self) -> str:
"""Return the SQLAlchemy URL to use."""
return f"postgresql+psycopg2://{self.username}:{self.password}@{self.host}:{self.port}/{self.database}"

@overrides
def get_database_name(self) -> str:
"""Return the name of the database."""
return self.database
Expand Down
1 change: 1 addition & 0 deletions airbyte/caches/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def get_sql_alchemy_url(self) -> str:
)
)

@overrides
def get_database_name(self) -> str:
"""Return the name of the database."""
return self.database
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/test_snowflake_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def test_replace_strategy(
def test_merge_strategy(
source_faker_seed_a: ab.Source,
source_faker_seed_b: ab.Source,
snowflake_cache: ab.DuckDBCache,
snowflake_cache: ab.DuckDBCacheInstance,
) -> None:
"""Test that the merge strategy works as expected.
Expand Down
20 changes: 10 additions & 10 deletions tests/integration_tests/test_source_faker_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ def source_faker_seed_b() -> ab.Source:


@pytest.fixture(scope="function")
def duckdb_cache() -> Generator[caches.DuckDBCache, None, None]:
def duckdb_cache() -> Generator[caches.DuckDBCacheInstance, None, None]:
"""Fixture to return a fresh cache."""
cache: caches.DuckDBCache = ab.new_local_cache()
cache: caches.DuckDBCacheInstance = ab.new_local_cache()
yield cache
# TODO: Delete cache DB file after test is complete.
return
Expand All @@ -115,7 +115,7 @@ def postgres_cache(new_pg_cache_config) -> Generator[caches.PostgresCache, None,

@pytest.fixture
def all_cache_types(
duckdb_cache: ab.DuckDBCache,
duckdb_cache: ab.DuckDBCacheInstance,
postgres_cache: ab.PostgresCache,
):
_ = postgres_cache
Expand All @@ -126,7 +126,7 @@ def all_cache_types(

def test_faker_pks(
source_faker_seed_a: ab.Source,
duckdb_cache: ab.DuckDBCache,
duckdb_cache: ab.DuckDBCacheInstance,
) -> None:
"""Test that the append strategy works as expected."""

Expand All @@ -143,7 +143,7 @@ def test_faker_pks(
@pytest.mark.slow
def test_replace_strategy(
source_faker_seed_a: ab.Source,
all_cache_types: ab.DuckDBCache,
all_cache_types: ab.DuckDBCacheInstance,
) -> None:
"""Test that the append strategy works as expected."""
for cache in all_cache_types: # Function-scoped fixtures can't be used in parametrized().
Expand All @@ -158,7 +158,7 @@ def test_replace_strategy(
@pytest.mark.slow
def test_append_strategy(
source_faker_seed_a: ab.Source,
all_cache_types: ab.DuckDBCache,
all_cache_types: ab.DuckDBCacheInstance,
) -> None:
"""Test that the append strategy works as expected."""
for cache in all_cache_types: # Function-scoped fixtures can't be used in parametrized().
Expand All @@ -174,7 +174,7 @@ def test_merge_strategy(
strategy: str,
source_faker_seed_a: ab.Source,
source_faker_seed_b: ab.Source,
all_cache_types: ab.DuckDBCache,
all_cache_types: ab.DuckDBCacheInstance,
) -> None:
"""Test that the merge strategy works as expected.
Expand Down Expand Up @@ -208,7 +208,7 @@ def test_merge_strategy(
def test_incremental_sync(
source_faker_seed_a: ab.Source,
source_faker_seed_b: ab.Source,
duckdb_cache: ab.DuckDBCache,
duckdb_cache: ab.DuckDBCacheInstance,
) -> None:
config_a = source_faker_seed_a.get_config()
config_b = source_faker_seed_b.get_config()
Expand Down Expand Up @@ -268,8 +268,8 @@ def test_incremental_state_prefix_isolation(
source_faker_seed_a.set_config(config_a)
cache_name = str(ulid.ULID())
db_path = Path(f"./.cache/{cache_name}.duckdb")
cache = ab.DuckDBCache(config=ab.DuckDBCacheConfig(db_path=db_path, table_prefix="prefix_"))
different_prefix_cache = ab.DuckDBCache(config=ab.DuckDBCacheConfig(db_path=db_path, table_prefix="different_prefix_"))
cache = ab.DuckDBCacheInstance(config=ab.DuckDBCache(db_path=db_path, table_prefix="prefix_"))
different_prefix_cache = ab.DuckDBCacheInstance(config=ab.DuckDBCache(db_path=db_path, table_prefix="different_prefix_"))

result = source_faker_seed_a.read(cache)
assert result.processed_records == NUM_PRODUCTS + FAKER_SCALE_A * 2
Expand Down
14 changes: 7 additions & 7 deletions tests/integration_tests/test_source_test_fixture.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,13 +289,13 @@ def test_read_isolated_by_prefix(expected_test_stream_data: dict[str, list[dict[
db_path = Path(f"./.cache/{cache_name}.duckdb")
source = ab.get_source("source-test", config={"apiKey": "test"})
source.select_all_streams()
cache = ab.DuckDBCache(config=ab.DuckDBCacheConfig(db_path=db_path, table_prefix="prefix_"))
cache = ab.DuckDBCacheInstance(config=ab.DuckDBCache(db_path=db_path, table_prefix="prefix_"))

source.read(cache)

same_prefix_cache = ab.DuckDBCache(config=ab.DuckDBCacheConfig(db_path=db_path, table_prefix="prefix_"))
different_prefix_cache = ab.DuckDBCache(config=ab.DuckDBCacheConfig(db_path=db_path, table_prefix="different_prefix_"))
no_prefix_cache = ab.DuckDBCache(config=ab.DuckDBCacheConfig(db_path=db_path, table_prefix=None))
same_prefix_cache = ab.DuckDBCacheInstance(config=ab.DuckDBCache(db_path=db_path, table_prefix="prefix_"))
different_prefix_cache = ab.DuckDBCacheInstance(config=ab.DuckDBCache(db_path=db_path, table_prefix="different_prefix_"))
no_prefix_cache = ab.DuckDBCacheInstance(config=ab.DuckDBCache(db_path=db_path, table_prefix=None))

# validate that the cache with the same prefix has the data as expected, while the other two are empty
assert_cache_data(expected_test_stream_data, same_prefix_cache)
Expand All @@ -307,9 +307,9 @@ def test_read_isolated_by_prefix(expected_test_stream_data: dict[str, list[dict[
source.read(different_prefix_cache)
source.read(no_prefix_cache)

second_same_prefix_cache = ab.DuckDBCache(config=ab.DuckDBCacheConfig(db_path=db_path, table_prefix="prefix_"))
second_different_prefix_cache = ab.DuckDBCache(config=ab.DuckDBCacheConfig(db_path=db_path, table_prefix="different_prefix_"))
second_no_prefix_cache = ab.DuckDBCache(config=ab.DuckDBCacheConfig(db_path=db_path, table_prefix=None))
second_same_prefix_cache = ab.DuckDBCacheInstance(config=ab.DuckDBCache(db_path=db_path, table_prefix="prefix_"))
second_different_prefix_cache = ab.DuckDBCacheInstance(config=ab.DuckDBCache(db_path=db_path, table_prefix="different_prefix_"))
second_no_prefix_cache = ab.DuckDBCacheInstance(config=ab.DuckDBCache(db_path=db_path, table_prefix=None))

# validate that the first cache still has full data, while the other two have partial data
assert_cache_data(expected_test_stream_data, second_same_prefix_cache)
Expand Down
28 changes: 14 additions & 14 deletions tests/unit_tests/test_caches.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,55 +6,55 @@

from airbyte._file_writers import ParquetWriterConfig
from airbyte.caches.base import SQLCacheBase, SQLCacheConfigBase
from airbyte.caches.duckdb import DuckDBCacheBase, DuckDBCacheConfig
from airbyte.caches.duckdb import DuckDBCacheBase, DuckDBCache


def test_duck_db_cache_config_initialization():
config = DuckDBCacheConfig(db_path='test_path', schema_name='test_schema')
config = DuckDBCache(db_path='test_path', schema_name='test_schema')
assert config.db_path == Path('test_path')
assert config.schema_name == 'test_schema'

def test_duck_db_cache_config_default_schema_name():
config = DuckDBCacheConfig(db_path='test_path')
config = DuckDBCache(db_path='test_path')
assert config.schema_name == 'main'

def test_get_sql_alchemy_url():
config = DuckDBCacheConfig(db_path='test_path', schema_name='test_schema')
config = DuckDBCache(db_path='test_path', schema_name='test_schema')
assert config.get_sql_alchemy_url() == 'duckdb:///test_path'

def test_get_sql_alchemy_url_with_default_schema_name():
config = DuckDBCacheConfig(db_path='test_path')
config = DuckDBCache(db_path='test_path')
assert config.get_sql_alchemy_url() == 'duckdb:///test_path'

def test_duck_db_cache_config_inheritance():
assert issubclass(DuckDBCacheConfig, SQLCacheConfigBase)
assert issubclass(DuckDBCacheConfig, ParquetWriterConfig)
assert issubclass(DuckDBCache, SQLCacheConfigBase)
assert issubclass(DuckDBCache, ParquetWriterConfig)

def test_duck_db_cache_config_get_sql_alchemy_url():
config = DuckDBCacheConfig(db_path='test_path', schema_name='test_schema')
config = DuckDBCache(db_path='test_path', schema_name='test_schema')
assert config.get_sql_alchemy_url() == 'duckdb:///test_path'

def test_duck_db_cache_config_get_database_name():
config = DuckDBCacheConfig(db_path='test_path/test_db.duckdb', schema_name='test_schema')
config = DuckDBCache(db_path='test_path/test_db.duckdb', schema_name='test_schema')
assert config.get_database_name() == 'test_db'

def test_duck_db_cache_base_inheritance():
assert issubclass(DuckDBCacheBase, SQLCacheBase)

def test_duck_db_cache_config_default_schema_name():
config = DuckDBCacheConfig(db_path='test_path')
config = DuckDBCache(db_path='test_path')
assert config.schema_name == 'main'

def test_duck_db_cache_config_get_sql_alchemy_url_with_default_schema_name():
config = DuckDBCacheConfig(db_path='test_path')
config = DuckDBCache(db_path='test_path')
assert config.get_sql_alchemy_url() == 'duckdb:///test_path'

def test_duck_db_cache_config_get_database_name_with_default_schema_name():
config = DuckDBCacheConfig(db_path='test_path/test_db.duckdb')
config = DuckDBCache(db_path='test_path/test_db.duckdb')
assert config.get_database_name() == 'test_db'

def test_duck_db_cache_config_inheritance_from_sql_cache_config_base():
assert issubclass(DuckDBCacheConfig, SQLCacheConfigBase)
assert issubclass(DuckDBCache, SQLCacheConfigBase)

def test_duck_db_cache_config_inheritance_from_parquet_writer_config():
assert issubclass(DuckDBCacheConfig, ParquetWriterConfig)
assert issubclass(DuckDBCache, ParquetWriterConfig)

0 comments on commit c042fa2

Please sign in to comment.