Skip to content

Commit

Permalink
move streams to config class
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers committed Feb 20, 2024
1 parent e550ac5 commit 3039498
Showing 1 changed file with 32 additions and 21 deletions.
53 changes: 32 additions & 21 deletions airbyte/caches/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,17 @@ class SQLCacheBase(CacheConfigBase):
table_suffix: str = ""
"""A suffix to add to all table names."""

_sql_processor: SQLCacheInstanceBase | None = None

def processor(self) -> SQLCacheInstanceBase:
"""Return the SQL processor instance."""
if self._sql_processor is None:
self._sql_processor = self._create_processor()
return self._sql_processor

def _create_processor(self) -> SQLCacheInstanceBase:
return SQLCacheInstanceBase(self)

@abc.abstractmethod
def get_sql_alchemy_url(self) -> str:
"""Returns a SQL Alchemy URL."""
Expand All @@ -91,6 +102,27 @@ def get_database_name(self) -> str:
"""Return the name of the database."""
...

@final
@property
def streams(
self,
) -> dict[str, CachedDataset]:
"""Return a temporary table name."""
result = {}
for stream_name in self._streams_with_data:
result[stream_name] = CachedDataset(self, stream_name)

return result

def __getitem__(self, stream: str) -> DatasetBase:
return self.streams[stream]

def __contains__(self, stream: str) -> bool:
return stream in self._streams_with_data

def __iter__(self) -> Iterator[str]:
return iter(self._streams_with_data)


class SQLCacheInstanceBase(RecordProcessor):
"""A base class to be used for SQL Caches.
Expand Down Expand Up @@ -128,15 +160,6 @@ def __init__(
self.type_converter = self.type_converter_class()
self._cached_table_definitions: dict[str, sqlalchemy.Table] = {}

def __getitem__(self, stream: str) -> DatasetBase:
return self.streams[stream]

def __contains__(self, stream: str) -> bool:
return stream in self._streams_with_data

def __iter__(self) -> Iterator[str]:
return iter(self._streams_with_data)

# Public interface:

def get_sql_alchemy_url(self) -> str:
Expand Down Expand Up @@ -249,18 +272,6 @@ def get_sql_table(
"""Return the main table object for the stream."""
return self._get_table_by_name(self.get_sql_table_name(stream_name))

@final
@property
def streams(
self,
) -> dict[str, CachedDataset]:
"""Return a temporary table name."""
result = {}
for stream_name in self._streams_with_data:
result[stream_name] = CachedDataset(self, stream_name)

return result

# Read methods:

def get_records(
Expand Down

0 comments on commit 3039498

Please sign in to comment.