Skip to content

Commit

Permalink
[FSTORE-1604] Add option to avoid setting s3a global options (#1402)
Browse files Browse the repository at this point in the history
  • Loading branch information
SirOibaf authored Nov 14, 2024
1 parent 73e6001 commit 09ddb23
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 2 deletions.
8 changes: 6 additions & 2 deletions python/hsfs/engine/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -1152,8 +1152,12 @@ def setup_storage_connector(self, storage_connector, path=None):
return path

def _setup_s3_hadoop_conf(self, storage_connector, path):
# For legacy behaviour set the S3 values at global level
self._set_s3_hadoop_conf(storage_connector, "fs.s3a")
FS_S3_GLOBAL_CONF = "fs.s3a.global-conf"

# The argument arrive here as strings
if storage_connector.arguments.get(FS_S3_GLOBAL_CONF, "True").lower() == "true":
# For legacy behaviour set the S3 values at global level
self._set_s3_hadoop_conf(storage_connector, "fs.s3a")

# Set credentials at bucket level as well to allow users to use multiple
# storage connector in the same application.
Expand Down
105 changes: 105 additions & 0 deletions python/tests/engine/test_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#
from __future__ import annotations

from unittest.mock import call

import numpy
import pandas as pd
import pytest
Expand Down Expand Up @@ -4162,6 +4164,109 @@ def test_setup_s3_hadoop_conf_legacy(self, mocker):
"fs.s3a.endpoint", s3_connector.arguments.get("fs.s3a.endpoint")
)

def test_setup_s3_hadoop_conf_disable_legacy(self, mocker):
# Arrange
mock_pyspark_getOrCreate = mocker.patch(
"pyspark.sql.session.SparkSession.builder.getOrCreate"
)

spark_engine = spark.Engine()

s3_connector = storage_connector.S3Connector(
id=1,
name="test_connector",
featurestore_id=99,
bucket="bucket-name",
access_key="1",
secret_key="2",
server_encryption_algorithm="3",
server_encryption_key="4",
session_token="5",
arguments=[
{"name": "fs.s3a.endpoint", "value": "testEndpoint"},
{"name": "fs.s3a.global-conf", "value": "False"},
],
)

# Act
result = spark_engine._setup_s3_hadoop_conf(
storage_connector=s3_connector,
path="s3://_test_path",
)

# Assert
assert result == "s3a://_test_path"
assert (
mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.call_count
== 7 # Options should only be set at bucket level
)
assert (
call("fs.s3a.access.key", s3_connector.access_key)
not in mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.mock_calls
)
assert (
call("fs.s3a.secret.key", s3_connector.secret_key)
not in mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.mock_calls
)
assert (
call(
"fs.s3a.server-side-encryption-algorithm",
s3_connector.server_encryption_algorithm,
)
not in mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.mock_calls
)

assert (
call(
"fs.s3a.server-side-encryption-key", s3_connector.server_encryption_key
)
not in mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.mock_calls
)

assert (
call(
"fs.s3a.aws.credentials.provider",
"org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider",
)
not in mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.mock_calls
)

assert (
call("fs.s3a.session.token", s3_connector.session_token)
not in mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.mock_calls
)

assert (
call("fs.s3a.endpoint", s3_connector.arguments.get("fs.s3a.endpoint"))
not in mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.mock_calls
)

mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.assert_any_call(
"fs.s3a.bucket.bucket-name.access.key", s3_connector.access_key
)
mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.assert_any_call(
"fs.s3a.bucket.bucket-name.secret.key", s3_connector.secret_key
)
mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.assert_any_call(
"fs.s3a.bucket.bucket-name.server-side-encryption-algorithm",
s3_connector.server_encryption_algorithm,
)
mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.assert_any_call(
"fs.s3a.bucket.bucket-name.server-side-encryption-key",
s3_connector.server_encryption_key,
)
mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.assert_any_call(
"fs.s3a.bucket.bucket-name.aws.credentials.provider",
"org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider",
)
mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.assert_any_call(
"fs.s3a.bucket.bucket-name.session.token", s3_connector.session_token
)
mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.assert_any_call(
"fs.s3a.bucket.bucket-name.endpoint",
s3_connector.arguments.get("fs.s3a.endpoint"),
)

def test_setup_s3_hadoop_conf_bucket_scope(self, mocker):
# Arrange
mock_pyspark_getOrCreate = mocker.patch(
Expand Down

0 comments on commit 09ddb23

Please sign in to comment.