From 303949800d4d93b655674dd7c88b3e7da3821f7e Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Mon, 19 Feb 2024 17:27:54 -0800 Subject: [PATCH] move streams to config class --- airbyte/caches/base.py | 53 +++++++++++++++++++++++++----------------- 1 file changed, 32 insertions(+), 21 deletions(-) diff --git a/airbyte/caches/base.py b/airbyte/caches/base.py index 172109a8..6b5857c0 100644 --- a/airbyte/caches/base.py +++ b/airbyte/caches/base.py @@ -81,6 +81,17 @@ class SQLCacheBase(CacheConfigBase): table_suffix: str = "" """A suffix to add to all table names.""" + _sql_processor: SQLCacheInstanceBase | None = None + + def processor(self) -> SQLCacheInstanceBase: + """Return the SQL processor instance.""" + if self._sql_processor is None: + self._sql_processor = self._create_processor() + return self._sql_processor + + def _create_processor(self) -> SQLCacheInstanceBase: + return SQLCacheInstanceBase(self) + @abc.abstractmethod def get_sql_alchemy_url(self) -> str: """Returns a SQL Alchemy URL.""" @@ -91,6 +102,27 @@ def get_database_name(self) -> str: """Return the name of the database.""" ... + @final + @property + def streams( + self, + ) -> dict[str, CachedDataset]: + """Return a temporary table name.""" + result = {} + for stream_name in self._streams_with_data: + result[stream_name] = CachedDataset(self, stream_name) + + return result + + def __getitem__(self, stream: str) -> DatasetBase: + return self.streams[stream] + + def __contains__(self, stream: str) -> bool: + return stream in self._streams_with_data + + def __iter__(self) -> Iterator[str]: + return iter(self._streams_with_data) + class SQLCacheInstanceBase(RecordProcessor): """A base class to be used for SQL Caches. @@ -128,15 +160,6 @@ def __init__( self.type_converter = self.type_converter_class() self._cached_table_definitions: dict[str, sqlalchemy.Table] = {} - def __getitem__(self, stream: str) -> DatasetBase: - return self.streams[stream] - - def __contains__(self, stream: str) -> bool: - return stream in self._streams_with_data - - def __iter__(self) -> Iterator[str]: - return iter(self._streams_with_data) - # Public interface: def get_sql_alchemy_url(self) -> str: @@ -249,18 +272,6 @@ def get_sql_table( """Return the main table object for the stream.""" return self._get_table_by_name(self.get_sql_table_name(stream_name)) - @final - @property - def streams( - self, - ) -> dict[str, CachedDataset]: - """Return a temporary table name.""" - result = {} - for stream_name in self._streams_with_data: - result[stream_name] = CachedDataset(self, stream_name) - - return result - # Read methods: def get_records(