From 1f33b88012dfdc8a8180010d7c177aa4b5c96f0c Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Mon, 19 Feb 2024 16:22:35 -0800 Subject: [PATCH] allow specifying streams in read() --- airbyte/source.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/airbyte/source.py b/airbyte/source.py index 67370d61..e9f5fbd4 100644 --- a/airbyte/source.py +++ b/airbyte/source.py @@ -524,6 +524,7 @@ def read( self, cache: SQLCacheBase | None = None, *, + streams: str | list[str] | None = None, write_strategy: str | WriteStrategy = WriteStrategy.AUTO, force_full_refresh: bool = False, ) -> ReadResult: @@ -535,6 +536,8 @@ def read( one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or WriteStrategy.AUTO. + streams: Optional if already set. A list of stream names to select for reading. If set + to "*", all streams will be selected. force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, streams will be read in incremental mode if supported by the connector. This option must be True when using the "replace" strategy. @@ -562,6 +565,9 @@ def read( }, ) from None + if streams: + self.select_streams(streams) + if not self._selected_stream_names: raise exc.AirbyteLibNoStreamsSelectedError( connector_name=self.name,