diff --git a/java/beam/pom.xml b/java/beam/pom.xml
index aea546d239..199edb96fc 100644
--- a/java/beam/pom.xml
+++ b/java/beam/pom.xml
@@ -5,7 +5,7 @@
hsfs-parent
com.logicalclocks
- 3.8.0-RC1
+ 3.8.0-RC2
4.0.0
diff --git a/java/flink/pom.xml b/java/flink/pom.xml
index e51122de22..3b94ab0487 100644
--- a/java/flink/pom.xml
+++ b/java/flink/pom.xml
@@ -5,7 +5,7 @@
hsfs-parent
com.logicalclocks
- 3.8.0-RC1
+ 3.8.0-RC2
4.0.0
diff --git a/java/hsfs/pom.xml b/java/hsfs/pom.xml
index c8e99e2a6c..a761fc095e 100644
--- a/java/hsfs/pom.xml
+++ b/java/hsfs/pom.xml
@@ -5,7 +5,7 @@
hsfs-parent
com.logicalclocks
- 3.8.0-RC1
+ 3.8.0-RC2
4.0.0
diff --git a/java/pom.xml b/java/pom.xml
index 0cc46465f8..595accc6fd 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -7,7 +7,7 @@
com.logicalclocks
hsfs-parent
pom
- 3.8.0-RC1
+ 3.8.0-RC2
hsfs
spark
diff --git a/java/spark/pom.xml b/java/spark/pom.xml
index 4c4dea00e1..834a58c749 100644
--- a/java/spark/pom.xml
+++ b/java/spark/pom.xml
@@ -22,7 +22,7 @@
hsfs-parent
com.logicalclocks
- 3.8.0-RC1
+ 3.8.0-RC2
4.0.0
diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py
index b9f8621cfc..852034d38a 100644
--- a/python/hsfs/engine/spark.py
+++ b/python/hsfs/engine/spark.py
@@ -1086,41 +1086,54 @@ def setup_storage_connector(self, storage_connector, path=None):
return path
def _setup_s3_hadoop_conf(self, storage_connector, path):
- FS_S3_ENDPOINT = "fs.s3a.endpoint"
+ # For legacy behaviour set the S3 values at global level
+ self._set_s3_hadoop_conf(storage_connector, "fs.s3a")
+
+ # Set credentials at bucket level as well to allow users to use multiple
+ # storage connector in the same application.
+ self._set_s3_hadoop_conf(
+ storage_connector, f"fs.s3a.bucket.{storage_connector.bucket}"
+ )
+ return path.replace("s3", "s3a", 1) if path is not None else None
+
+ def _set_s3_hadoop_conf(self, storage_connector, prefix):
if storage_connector.access_key:
self._spark_context._jsc.hadoopConfiguration().set(
- "fs.s3a.access.key", storage_connector.access_key
+ f"{prefix}.access.key", storage_connector.access_key
)
if storage_connector.secret_key:
self._spark_context._jsc.hadoopConfiguration().set(
- "fs.s3a.secret.key", storage_connector.secret_key
+ f"{prefix}.secret.key", storage_connector.secret_key
)
if storage_connector.server_encryption_algorithm:
self._spark_context._jsc.hadoopConfiguration().set(
- "fs.s3a.server-side-encryption-algorithm",
+ f"{prefix}.server-side-encryption-algorithm",
storage_connector.server_encryption_algorithm,
)
if storage_connector.server_encryption_key:
self._spark_context._jsc.hadoopConfiguration().set(
- "fs.s3a.server-side-encryption-key",
+ f"{prefix}.server-side-encryption-key",
storage_connector.server_encryption_key,
)
if storage_connector.session_token:
+ print(f"session token set for {prefix}")
self._spark_context._jsc.hadoopConfiguration().set(
- "fs.s3a.aws.credentials.provider",
+ f"{prefix}.aws.credentials.provider",
"org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider",
)
self._spark_context._jsc.hadoopConfiguration().set(
- "fs.s3a.session.token",
+ f"{prefix}.session.token",
storage_connector.session_token,
)
+
+ # This is the name of the property as expected from the user, without the bucket name.
+ FS_S3_ENDPOINT = "fs.s3a.endpoint"
if FS_S3_ENDPOINT in storage_connector.arguments:
self._spark_context._jsc.hadoopConfiguration().set(
- FS_S3_ENDPOINT, storage_connector.spark_options().get(FS_S3_ENDPOINT)
+ f"{prefix}.endpoint",
+ storage_connector.spark_options().get(FS_S3_ENDPOINT),
)
- return path.replace("s3", "s3a", 1) if path is not None else None
-
def _setup_adls_hadoop_conf(self, storage_connector, path):
for k, v in storage_connector.spark_options().items():
self._spark_context._jsc.hadoopConfiguration().set(k, v)
diff --git a/python/hsfs/version.py b/python/hsfs/version.py
index 0a80f3f6ba..79b8fb46ac 100644
--- a/python/hsfs/version.py
+++ b/python/hsfs/version.py
@@ -14,4 +14,4 @@
# limitations under the License.
#
-__version__ = "3.8.0rc1"
+__version__ = "3.8.0rc2"
diff --git a/python/tests/engine/test_spark.py b/python/tests/engine/test_spark.py
index 5c7d76add0..3bc80cd991 100644
--- a/python/tests/engine/test_spark.py
+++ b/python/tests/engine/test_spark.py
@@ -4085,7 +4085,7 @@ def test_setup_storage_connector_jdbc(self, mocker):
assert mock_spark_engine_setup_adls_hadoop_conf.call_count == 0
assert mock_spark_engine_setup_gcp_hadoop_conf.call_count == 0
- def test_setup_s3_hadoop_conf(self, mocker):
+ def test_setup_s3_hadoop_conf_legacy(self, mocker):
# Arrange
mock_pyspark_getOrCreate = mocker.patch(
"pyspark.sql.session.SparkSession.builder.getOrCreate"
@@ -4097,6 +4097,7 @@ def test_setup_s3_hadoop_conf(self, mocker):
id=1,
name="test_connector",
featurestore_id=99,
+ bucket="bucket-name",
access_key="1",
secret_key="2",
server_encryption_algorithm="3",
@@ -4115,7 +4116,7 @@ def test_setup_s3_hadoop_conf(self, mocker):
assert result == "s3a_test_path"
assert (
mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.call_count
- == 7
+ == 14
)
mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.assert_any_call(
"fs.s3a.access.key", s3_connector.access_key
@@ -4141,6 +4142,65 @@ def test_setup_s3_hadoop_conf(self, mocker):
"fs.s3a.endpoint", s3_connector.arguments.get("fs.s3a.endpoint")
)
+ def test_setup_s3_hadoop_conf_bucket_scope(self, mocker):
+ # Arrange
+ mock_pyspark_getOrCreate = mocker.patch(
+ "pyspark.sql.session.SparkSession.builder.getOrCreate"
+ )
+
+ spark_engine = spark.Engine()
+
+ s3_connector = storage_connector.S3Connector(
+ id=1,
+ name="test_connector",
+ featurestore_id=99,
+ bucket="bucket-name",
+ access_key="1",
+ secret_key="2",
+ server_encryption_algorithm="3",
+ server_encryption_key="4",
+ session_token="5",
+ arguments=[{"name": "fs.s3a.endpoint", "value": "testEndpoint"}],
+ )
+
+ # Act
+ result = spark_engine._setup_s3_hadoop_conf(
+ storage_connector=s3_connector,
+ path="s3_test_path",
+ )
+
+ # Assert
+ assert result == "s3a_test_path"
+ assert (
+ mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.call_count
+ == 14
+ )
+ mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.assert_any_call(
+ "fs.s3a.bucket.bucket-name.access.key", s3_connector.access_key
+ )
+ mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.assert_any_call(
+ "fs.s3a.bucket.bucket-name.secret.key", s3_connector.secret_key
+ )
+ mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.assert_any_call(
+ "fs.s3a.bucket.bucket-name.server-side-encryption-algorithm",
+ s3_connector.server_encryption_algorithm,
+ )
+ mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.assert_any_call(
+ "fs.s3a.bucket.bucket-name.server-side-encryption-key",
+ s3_connector.server_encryption_key,
+ )
+ mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.assert_any_call(
+ "fs.s3a.bucket.bucket-name.aws.credentials.provider",
+ "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider",
+ )
+ mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.assert_any_call(
+ "fs.s3a.bucket.bucket-name.session.token", s3_connector.session_token
+ )
+ mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.assert_any_call(
+ "fs.s3a.bucket.bucket-name.endpoint",
+ s3_connector.arguments.get("fs.s3a.endpoint"),
+ )
+
def test_setup_adls_hadoop_conf(self, mocker):
# Arrange
mock_pyspark_getOrCreate = mocker.patch(
diff --git a/utils/java/pom.xml b/utils/java/pom.xml
index 6366083f53..77b6a1d69d 100644
--- a/utils/java/pom.xml
+++ b/utils/java/pom.xml
@@ -5,7 +5,7 @@
com.logicalclocks
hsfs-utils
- 3.8.0-RC1
+ 3.8.0-RC2
3.2.0.0-SNAPSHOT