Skip to content

Commit

Permalink
refine code according to review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Hang Zheng committed Oct 10, 2024
1 parent b17c26a commit 8b50ce2
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 74 deletions.
3 changes: 0 additions & 3 deletions cpp/src/arrow/filesystem/filesystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -710,9 +710,6 @@ struct FileSystemGlobalOptions {
///
/// If empty, the underlying TLS library's defaults will be used.
std::string tls_ca_dir_path;

/// Controls whether to verify TLS certificates. Defaults to true.
bool tls_verify_certificates = true;
};

/// EXPERIMENTAL: optional global initialization routine
Expand Down
31 changes: 26 additions & 5 deletions cpp/src/arrow/filesystem/s3_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,27 @@
#include "arrow/util/print.h"
#include "arrow/util/string.h"

#ifndef ARROW_AWS_SDK_VERSION_CHECK
// AWS_SDK_VERSION_{MAJOR,MINOR,PATCH} are available since 1.9.7.
# if defined(AWS_SDK_VERSION_MAJOR) && defined(AWS_SDK_VERSION_MINOR) && \
defined(AWS_SDK_VERSION_PATCH)
// Redundant "(...)" are for suppressing "Weird number of spaces at
// line-start. Are you using a 2-space indent? [whitespace/indent]
// [3]" errors...
# define ARROW_AWS_SDK_VERSION_CHECK(major, minor, patch) \
((AWS_SDK_VERSION_MAJOR > (major) || \
(AWS_SDK_VERSION_MAJOR == (major) && AWS_SDK_VERSION_MINOR > (minor)) || \
((AWS_SDK_VERSION_MAJOR == (major) && AWS_SDK_VERSION_MINOR == (minor) && \
AWS_SDK_VERSION_PATCH >= (patch)))))
# else
# define ARROW_AWS_SDK_VERSION_CHECK(major, minor, patch) 0
# endif
#endif // !ARROW_AWS_SDK_VERSION_CHECK

#if ARROW_AWS_SDK_VERSION_CHECK(1, 9, 201)
# define ARROW_S3_HAS_SSE_C
#endif

namespace arrow {
namespace fs {
namespace internal {
Expand Down Expand Up @@ -319,15 +340,15 @@ inline Result<std::string> CalculateSSECustomerKeyMD5(
}

template <typename S3RequestType>
Status SetSSECustomerKey(S3RequestType& request, const std::string& sse_customer_key) {
Status SetSSECustomerKey(S3RequestType* request, const std::string& sse_customer_key) {
if (sse_customer_key.empty()) {
return Status::OK(); // do nothing if the sse_customer_key is not configured
}
#ifdef ARROW_S3_SUPPORT_SSEC
#ifdef ARROW_S3_HAS_SSE_C
ARROW_ASSIGN_OR_RAISE(auto md5, internal::CalculateSSECustomerKeyMD5(sse_customer_key));
request.SetSSECustomerKeyMD5(md5);
request.SetSSECustomerKey(arrow::util::base64_encode(sse_customer_key));
request.SetSSECustomerAlgorithm("AES256");
request->SetSSECustomerKeyMD5(md5);
request->SetSSECustomerKey(arrow::util::base64_encode(sse_customer_key));
request->SetSSECustomerAlgorithm("AES256");
return Status::OK();
#else
return Status::NotImplemented("SSE-C is not supported");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@

#pragma once

namespace arrow {
namespace fs {

namespace arrow::fs {
// The below two static strings are generated according to
// https://github.com/minio/minio/tree/RELEASE.2024-09-22T00-33-43Z/docs/tls#323-generate-a-self-signed-certificate
// `openssl req -new -x509 -nodes -days 36500 -keyout private.key -out public.crt -config
// openssl.conf`
const char* kMinioPrivateKey = R"(-----BEGIN PRIVATE KEY-----
static constexpr const char* kMinioPrivateKey = R"(-----BEGIN PRIVATE KEY-----
MIIEvAIBADANBgkqhkiG9w0BAQEFAASCBKYwggSiAgEAAoIBAQCqwKYHsTSciGqP
uU3qkTWpnXIi3iC0eeW7JSzJHGFs880WdR5JdK4WufPK+1xzgiYjMEPfAcuSWz3b
qYyCI61q+a9Iu2nj7cFTW9bfZrmWlnI0YOLJc+q0AAdAjF1lvRKenH8tbjz/2jyl
Expand Down Expand Up @@ -54,7 +52,7 @@ t9pJcv2E5xY7/nFNIorpKg==
-----END PRIVATE KEY-----
)";

const char* kMinioCert = R"(-----BEGIN CERTIFICATE-----
static constexpr const char* kMinioCert = R"(-----BEGIN CERTIFICATE-----
MIIDiTCCAnGgAwIBAgIUXbHZ6FAhKSXg4WSGUQySlSyE4U0wDQYJKoZIhvcNAQEL
BQAwXzELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAlZBMQ4wDAYDVQQHDAVBcnJvdzEO
MAwGA1UECgwFQXJyb3cxDjAMBgNVBAsMBUFycm93MRMwEQYDVQQDDApBcnJyb3dU
Expand All @@ -76,5 +74,4 @@ TFeMNKROmrEPCWaYr6MJ+ItHtb5Cawapea4THz9GCjR9eLq2CbMqLezZ8xBHPzc4
ixI2l0uCfg7ZUSA+90yaScc7bhEQ8CMiPtJgNKaKIqB58DpY7028xJpW7Ma2
-----END CERTIFICATE-----
)";
} // namespace fs
} // namespace arrow
} // namespace arrow::fs
26 changes: 16 additions & 10 deletions cpp/src/arrow/filesystem/s3_test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
# include <sys/wait.h>
#endif

#include "arrow/filesystem/s3_test_cert.h"
#include "arrow/filesystem/s3_test_cert_internal.h"
#include "arrow/filesystem/s3_test_util.h"
#include "arrow/filesystem/s3fs.h"
#include "arrow/testing/process.h"
Expand Down Expand Up @@ -77,10 +77,14 @@ std::string MinioTestServer::access_key() const { return impl_->access_key_; }

std::string MinioTestServer::secret_key() const { return impl_->secret_key_; }

std::string MinioTestServer::ca_path() const {
std::string MinioTestServer::ca_dir_path() const {
return impl_->temp_dir_ca_->path().ToString();
}

std::string MinioTestServer::ca_file_path() const {
return impl_->temp_dir_ca_->path().ToString() + "/public.crt";
}

std::string MinioTestServer::scheme() const { return impl_->scheme_; }

Status MinioTestServer::GenerateCertificateFile() {
Expand All @@ -89,29 +93,29 @@ Status MinioTestServer::GenerateCertificateFile() {
ARROW_ASSIGN_OR_RAISE(impl_->temp_dir_ca_, TemporaryDir::Make("s3fs-test-ca-"));

ARROW_ASSIGN_OR_RAISE(auto public_crt_file,
PlatformFilename::FromString(ca_path() + "/public.crt"));
PlatformFilename::FromString(ca_dir_path() + "/public.crt"));
ARROW_ASSIGN_OR_RAISE(auto public_cert_fd, FileOpenWritable(public_crt_file));
ARROW_RETURN_NOT_OK(FileWrite(public_cert_fd.fd(),
reinterpret_cast<const uint8_t*>(kMinioCert),
strlen(kMinioCert)));
ARROW_RETURN_NOT_OK(public_cert_fd.Close());

ARROW_ASSIGN_OR_RAISE(auto private_key_file,
PlatformFilename::FromString(ca_path() + "/private.key"));
PlatformFilename::FromString(ca_dir_path() + "/private.key"));
ARROW_ASSIGN_OR_RAISE(auto private_key_fd, FileOpenWritable(private_key_file));
ARROW_RETURN_NOT_OK(FileWrite(private_key_fd.fd(),
reinterpret_cast<const uint8_t*>(kMinioPrivateKey),
strlen(kMinioPrivateKey)));
ARROW_RETURN_NOT_OK(private_key_fd.Close());

arrow::fs::FileSystemGlobalOptions global_options;
global_options.tls_verify_certificates = false;
global_options.tls_ca_file_path = ca_file_path();
ARROW_RETURN_NOT_OK(arrow::fs::Initialize(global_options));

return Status::OK();
}

Status MinioTestServer::Start() {
Status MinioTestServer::Start(bool enable_tls_if_supported) {
const char* connect_str = std::getenv(kEnvConnectString);
const char* access_key = std::getenv(kEnvAccessKey);
const char* secret_key = std::getenv(kEnvSecretKey);
Expand All @@ -134,12 +138,14 @@ Status MinioTestServer::Start() {
std::vector<std::string> minio_args({"server", "--quiet", "--compat", "--address",
impl_->connect_string_,
impl_->temp_dir_->path().ToString()});
if (enable_tls_if_supported) {
#ifdef MINIO_SERVER_WITH_TLS
ARROW_RETURN_NOT_OK(GenerateCertificateFile());
minio_args.push_back("--certs-dir");
minio_args.push_back(ca_path());
impl_->scheme_ = "https";
ARROW_RETURN_NOT_OK(GenerateCertificateFile());
minio_args.push_back("--certs-dir");
minio_args.push_back(ca_dir_path());
impl_->scheme_ = "https";
#endif // MINIO_SERVER_WITH_TLS
}

ARROW_RETURN_NOT_OK(impl_->server_process_->SetExecutable(kMinioExecutableName));
// NOTE: --quiet makes startup faster by suppressing remote version check
Expand Down
9 changes: 7 additions & 2 deletions cpp/src/arrow/filesystem/s3_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ class MinioTestServer {
MinioTestServer();
~MinioTestServer();

Status Start();
// enable_tls_if_supported = true: start Minio with TLS if MINIO_SERVER_WITH_TLS is
// defined, Currently only enabled on Linux platfrom. enable_tls_if_supported = false:
// start Minio without TLS in all platfroms
Status Start(bool enable_tls_if_supported = true);

Status Stop();

Expand All @@ -54,7 +57,9 @@ class MinioTestServer {

std::string secret_key() const;

std::string ca_path() const;
std::string ca_dir_path() const;

std::string ca_file_path() const;

std::string scheme() const;

Expand Down
22 changes: 7 additions & 15 deletions cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,6 @@
# define ARROW_S3_HAS_S3CLIENT_CONFIGURATION
#endif

#if ARROW_AWS_SDK_VERSION_CHECK(1, 9, 201)
# define ARROW_S3_SUPPORT_SSEC
#endif

#ifdef ARROW_S3_HAS_CRT
# include <aws/crt/io/Bootstrap.h>
# include <aws/crt/io/EventLoopGroup.h>
Expand Down Expand Up @@ -1151,11 +1147,7 @@ class ClientBuilder {
} else if (!internal::global_options.tls_ca_dir_path.empty()) {
client_config_.caPath = ToAwsString(internal::global_options.tls_ca_dir_path);
}
if (!options_.tls_verify_certificates) {
client_config_.verifySSL = options_.tls_verify_certificates;
} else if (!internal::global_options.tls_verify_certificates) {
client_config_.verifySSL = internal::global_options.tls_verify_certificates;
}
client_config_.verifySSL = options_.tls_verify_certificates;

// Set proxy options if provided
if (!options_.proxy_options.scheme.empty()) {
Expand Down Expand Up @@ -1324,7 +1316,7 @@ Result<S3Model::GetObjectResult> GetObjectRange(Aws::S3::S3Client* client,
S3Model::GetObjectRequest req;
req.SetBucket(ToAwsString(path.bucket));
req.SetKey(ToAwsString(path.key));
RETURN_NOT_OK(SetSSECustomerKey(req, sse_customer_key));
RETURN_NOT_OK(SetSSECustomerKey(&req, sse_customer_key));
req.SetRange(ToAwsString(FormatRange(start, length)));
req.SetResponseStreamFactory(AwsWriteableStreamFactory(out, length));
return OutcomeToResult("GetObject", client->GetObject(req));
Expand Down Expand Up @@ -1480,7 +1472,7 @@ class ObjectInputFile final : public io::RandomAccessFile {
S3Model::HeadObjectRequest req;
req.SetBucket(ToAwsString(path_.bucket));
req.SetKey(ToAwsString(path_.key));
RETURN_NOT_OK(SetSSECustomerKey(req, sse_customer_key_));
RETURN_NOT_OK(SetSSECustomerKey(&req, sse_customer_key_));

ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
auto outcome = client_lock.Move()->HeadObject(req);
Expand Down Expand Up @@ -1701,7 +1693,7 @@ class ObjectOutputStream final : public io::OutputStream {
S3Model::CreateMultipartUploadRequest req;
req.SetBucket(ToAwsString(path_.bucket));
req.SetKey(ToAwsString(path_.key));
RETURN_NOT_OK(SetSSECustomerKey(req, sse_customer_key_));
RETURN_NOT_OK(SetSSECustomerKey(&req, sse_customer_key_));
RETURN_NOT_OK(SetMetadataInRequest(&req));

auto outcome = client_lock.Move()->CreateMultipartUpload(req);
Expand Down Expand Up @@ -1803,9 +1795,9 @@ class ObjectOutputStream final : public io::OutputStream {
S3Model::CompleteMultipartUploadRequest req;
req.SetBucket(ToAwsString(path_.bucket));
req.SetKey(ToAwsString(path_.key));
RETURN_NOT_OK(SetSSECustomerKey(req, sse_customer_key_));
req.SetUploadId(multipart_upload_id_);
req.SetMultipartUpload(std::move(completed_upload));
RETURN_NOT_OK(SetSSECustomerKey(&req, sse_customer_key_));

auto outcome =
client_lock.Move()->CompleteMultipartUploadWithErrorFixup(std::move(req));
Expand Down Expand Up @@ -1985,7 +1977,7 @@ class ObjectOutputStream final : public io::OutputStream {
req.SetKey(ToAwsString(path_.key));
req.SetBody(std::make_shared<StringViewStream>(data, nbytes));
req.SetContentLength(nbytes);
RETURN_NOT_OK(SetSSECustomerKey(req, sse_customer_key_));
RETURN_NOT_OK(SetSSECustomerKey(&req, sse_customer_key_));

if (!background_writes_) {
req.SetBody(std::make_shared<StringViewStream>(data, nbytes));
Expand Down Expand Up @@ -2375,7 +2367,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp

S3Model::CopyObjectRequest req;
req.SetBucket(ToAwsString(dest_path.bucket));
RETURN_NOT_OK(SetSSECustomerKey(req, options().sse_customer_key));
RETURN_NOT_OK(SetSSECustomerKey(&req, options().sse_customer_key));
req.SetKey(ToAwsString(dest_path.key));
// ARROW-13048: Copy source "Must be URL-encoded" according to AWS SDK docs.
// However at least in 1.8 and 1.9 the SDK URL-encodes the path for you
Expand Down
8 changes: 5 additions & 3 deletions cpp/src/arrow/filesystem/s3fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,16 +201,18 @@ struct ARROW_EXPORT S3Options {

/// Path to a single PEM file holding all TLS CA certificates
///
/// If empty, the underlying TLS library's defaults will be used.
/// If empty, global filesystem options will be used, if the global filesystem options
/// is also empty, the underlying TLS library's defaults will be used.
std::string tls_ca_file_path;

/// Path to a directory holding TLS CA certificates in individual PEM files
/// named along the OpenSSL "hashed" format.
///
/// If empty, the underlying TLS library's defaults will be used.
/// If empty, global filesystem options will be used, if the global filesystem options
/// is also empty, the underlying TLS library's defaults will be used.
std::string tls_ca_dir_path;

/// Controls whether to verify TLS certificates. Defaults to true.
/// Whether to verify the S3 endpoint's TLS certificate, if the scheme is "https".
bool tls_verify_certificates = true;

S3Options();
Expand Down
9 changes: 2 additions & 7 deletions cpp/src/arrow/filesystem/s3fs_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class MinioFixture : public benchmark::Fixture {
public:
void SetUp(const ::benchmark::State& state) override {
minio_.reset(new MinioTestServer());
ASSERT_OK(minio_->Start());
ASSERT_OK(minio_->Start(false));

const char* region_str = std::getenv(kEnvAwsRegion);
if (region_str) {
Expand All @@ -81,12 +81,7 @@ class MinioFixture : public benchmark::Fixture {
}

client_config_.endpointOverride = ToAwsString(minio_->connect_string());
if (minio_->scheme() == "https") {
client_config_.scheme = Aws::Http::Scheme::HTTPS;
client_config_.verifySSL = false;
} else {
client_config_.scheme = Aws::Http::Scheme::HTTP;
}
client_config_.scheme = Aws::Http::Scheme::HTTP;
if (!region_.empty()) {
client_config_.region = ToAwsString(region_);
}
Expand Down
Loading

0 comments on commit 8b50ce2

Please sign in to comment.