From 510786b3cf8b8020509a9fc75b34b18334538fc2 Mon Sep 17 00:00:00 2001 From: Jay Chia <17691182+jaychia@users.noreply.github.com> Date: Mon, 8 Jul 2024 16:06:16 -0700 Subject: [PATCH] [BUG] Use Daft s3 credentials chain for deltalake reads (#2486) **Before:** Without user intervention (by passing in explicit credentials), Daft would pass in `None` for credentials when instantiating the `DeltaTable` objects, causing auth issues with S3. **After:** We detect and special-case when there are no credentials being passed in, and try to use Daft's credentials chain to override the empty credentials before instantiating the `DeltaTable` image --------- Co-authored-by: Jay Chia --- daft/delta_lake/delta_lake_scan.py | 42 ++++++++++++++++++++++++++++-- 1 file changed, 40 insertions(+), 2 deletions(-) diff --git a/daft/delta_lake/delta_lake_scan.py b/daft/delta_lake/delta_lake_scan.py index b45a91eee7..27cfef2562 100644 --- a/daft/delta_lake/delta_lake_scan.py +++ b/daft/delta_lake/delta_lake_scan.py @@ -7,10 +7,12 @@ from deltalake.table import DeltaTable import daft +import daft.exceptions from daft.daft import ( FileFormatConfig, ParquetSourceConfig, Pushdowns, + S3Config, ScanTask, StorageConfig, ) @@ -24,8 +26,44 @@ class DeltaLakeScanOperator(ScanOperator): def __init__(self, table_uri: str, storage_config: StorageConfig) -> None: super().__init__() - storage_options = io_config_to_storage_options(storage_config.config.io_config, table_uri) - self._table = DeltaTable(table_uri, storage_options=storage_options) + + # Unfortunately delta-rs doesn't do very good inference of credentials for S3. Thus the current Daft behavior of passing + # in `None` for credentials will cause issues when instantiating the DeltaTable without credentials. + # + # Thus, if we don't detect any credentials being available, we attempt to detect it from the environment using our Daft credentials chain. + # + # See: https://github.com/delta-io/delta-rs/issues/2117 + deltalake_sdk_io_config = storage_config.config.io_config + if any([deltalake_sdk_io_config.s3.key_id is None, deltalake_sdk_io_config.s3.region_name is None]): + try: + s3_config_from_env = S3Config.from_env() + # Sometimes S3Config.from_env throws an error, for example on CI machines with weird metadata servers. + except daft.exceptions.DaftCoreException: + pass + else: + if ( + deltalake_sdk_io_config.s3.key_id is None + and deltalake_sdk_io_config.s3.access_key is None + and deltalake_sdk_io_config.s3.session_token is None + ): + deltalake_sdk_io_config = deltalake_sdk_io_config.replace( + s3=deltalake_sdk_io_config.s3.replace( + key_id=s3_config_from_env.key_id, + access_key=s3_config_from_env.access_key, + session_token=s3_config_from_env.session_token, + ) + ) + if deltalake_sdk_io_config.s3.region_name is None: + deltalake_sdk_io_config = deltalake_sdk_io_config.replace( + s3=deltalake_sdk_io_config.s3.replace( + region_name=s3_config_from_env.region_name, + ) + ) + + self._table = DeltaTable( + table_uri, storage_options=io_config_to_storage_options(deltalake_sdk_io_config, table_uri) + ) + self._storage_config = storage_config self._schema = Schema.from_pyarrow_schema(self._table.schema().to_pyarrow()) partition_columns = set(self._table.metadata().partition_columns)