diff --git a/cpp/src/arrow/adapters/orc/util.cc b/cpp/src/arrow/adapters/orc/util.cc index 2a74bec1aa6fd..513001f7d67c4 100644 --- a/cpp/src/arrow/adapters/orc/util.cc +++ b/cpp/src/arrow/adapters/orc/util.cc @@ -19,6 +19,7 @@ #include #include +#include #include #include diff --git a/cpp/src/arrow/filesystem/filesystem.cc b/cpp/src/arrow/filesystem/filesystem.cc index 284be685fa800..10b12460520ca 100644 --- a/cpp/src/arrow/filesystem/filesystem.cc +++ b/cpp/src/arrow/filesystem/filesystem.cc @@ -34,9 +34,6 @@ #ifdef ARROW_HDFS #include "arrow/filesystem/hdfs.h" #endif -#ifdef ARROW_S3 -#include "arrow/filesystem/s3fs.h" -#endif #include "arrow/filesystem/localfs.h" #include "arrow/filesystem/mockfs.h" #include "arrow/filesystem/path_util.h" @@ -896,18 +893,6 @@ Result> FileSystemFromUriReal(const Uri& uri, "without HDFS support"); #endif } - if (scheme == "s3") { -#ifdef ARROW_S3 - RETURN_NOT_OK(EnsureS3Initialized()); - ARROW_ASSIGN_OR_RAISE(auto options, S3Options::FromUri(uri, out_path)); - ARROW_ASSIGN_OR_RAISE(auto s3fs, S3FileSystem::Make(options, io_context)); - return s3fs; -#else - return Status::NotImplemented( - "Got S3 URI but Arrow compiled " - "without S3 support"); -#endif - } if (scheme == "mock") { // MockFileSystem does not have an diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index 640888e1c4fa5..697b4ed6c7f5c 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -136,6 +136,7 @@ #include "arrow/util/string.h" #include "arrow/util/task_group.h" #include "arrow/util/thread_pool.h" +#include "arrow/util/value_parsing.h" namespace arrow::fs { @@ -2766,6 +2767,11 @@ Result S3FileSystem::PathFromUri(const std::string& uri_string) con internal::AuthorityHandlingBehavior::kPrepend); } +Result S3FileSystem::MakeUri(std::string path) const { + impl_->options().region; + return "s3://" + path; +} + S3Options S3FileSystem::options() const { return impl_->options(); } std::string S3FileSystem::region() const { return impl_->region(); } @@ -3196,32 +3202,37 @@ bool IsS3Finalized() { return GetAwsInstance()->IsFinalized(); } S3GlobalOptions S3GlobalOptions::Defaults() { auto log_level = S3LogLevel::Fatal; + int num_event_loop_threads = 1; - auto result = arrow::internal::GetEnvVar("ARROW_S3_LOG_LEVEL"); + auto value = arrow::internal::GetEnvVar("ARROW_S3_LOG_LEVEL").ValueOr("fatal"); - if (result.ok()) { - // Extract, trim, and downcase the value of the environment variable - auto value = - arrow::internal::AsciiToLower(arrow::internal::TrimString(result.ValueUnsafe())); + // Extract, trim, and downcase the value of the environment variable + value = arrow::internal::AsciiToLower(arrow::internal::TrimString(std::move(value))); - if (value == "fatal") { - log_level = S3LogLevel::Fatal; - } else if (value == "error") { - log_level = S3LogLevel::Error; - } else if (value == "warn") { - log_level = S3LogLevel::Warn; - } else if (value == "info") { - log_level = S3LogLevel::Info; - } else if (value == "debug") { - log_level = S3LogLevel::Debug; - } else if (value == "trace") { - log_level = S3LogLevel::Trace; - } else if (value == "off") { - log_level = S3LogLevel::Off; - } + if (value == "fatal") { + log_level = S3LogLevel::Fatal; + } else if (value == "error") { + log_level = S3LogLevel::Error; + } else if (value == "warn") { + log_level = S3LogLevel::Warn; + } else if (value == "info") { + log_level = S3LogLevel::Info; + } else if (value == "debug") { + log_level = S3LogLevel::Debug; + } else if (value == "trace") { + log_level = S3LogLevel::Trace; + } else if (value == "off") { + log_level = S3LogLevel::Off; } - return S3GlobalOptions{log_level}; + value = arrow::internal::GetEnvVar("ARROW_S3_THREADS").ValueOr("1"); + // Extract, trim, and downcase the value of the environment variable + int32_t i; + if (::arrow::internal::ParseValue(value.data(), value.size(), &i)) { + num_event_loop_threads = i; + } + + return S3GlobalOptions{log_level, num_event_loop_threads}; } // ----------------------------------------------------------------------- @@ -3239,4 +3250,14 @@ Result ResolveS3BucketRegion(const std::string& bucket) { return resolver->ResolveRegion(bucket); } +auto kS3FileSystemModule = ARROW_REGISTER_FILESYSTEM( + "s3", + [](const arrow::util::Uri& uri, const io::IOContext& io_context, + std::string* out_path) -> Result> { + RETURN_NOT_OK(EnsureS3Initialized()); + ARROW_ASSIGN_OR_RAISE(auto options, S3Options::FromUri(uri, out_path)); + return S3FileSystem::Make(options, io_context); + }, + [] { DCHECK_OK(EnsureS3Finalized()); }); + } // namespace arrow::fs diff --git a/cpp/src/arrow/filesystem/s3fs.h b/cpp/src/arrow/filesystem/s3fs.h index 82d08bc5ea89a..b23c93b8af1c3 100644 --- a/cpp/src/arrow/filesystem/s3fs.h +++ b/cpp/src/arrow/filesystem/s3fs.h @@ -25,20 +25,16 @@ #include "arrow/util/macros.h" #include "arrow/util/uri.h" -namespace Aws { -namespace Auth { - +namespace Aws::Auth { class AWSCredentialsProvider; class STSAssumeRoleCredentialsProvider; +} // namespace Aws::Auth -} // namespace Auth -namespace STS { +namespace Aws::STS { class STSClient; -} -} // namespace Aws +} // namespace Aws::STS -namespace arrow { -namespace fs { +namespace arrow::fs { /// Options for using a proxy for S3 struct ARROW_EXPORT S3ProxyOptions { @@ -256,6 +252,7 @@ class ARROW_EXPORT S3FileSystem : public FileSystem { bool Equals(const FileSystem& other) const override; Result PathFromUri(const std::string& uri_string) const override; + Result MakeUri(std::string path) const override; /// \cond FALSE using FileSystem::CreateDir; @@ -397,5 +394,4 @@ Status EnsureS3Finalized(); ARROW_EXPORT Result ResolveS3BucketRegion(const std::string& bucket); -} // namespace fs -} // namespace arrow +} // namespace arrow::fs diff --git a/docs/source/cpp/env_vars.rst b/docs/source/cpp/env_vars.rst index 116c151824c75..5963fc4b5ff36 100644 --- a/docs/source/cpp/env_vars.rst +++ b/docs/source/cpp/env_vars.rst @@ -108,6 +108,12 @@ that changing their value later will have an effect. `Logging - AWS SDK For C++ `__ +.. envvar:: ARROW_S3_THREADS + + The number of threads to configure when creating AWS' I/O event loop. + + Defaults to 1 as recommended by AWS' doc when the # of connections is + expected to be, at most, in the hundreds. .. envvar:: ARROW_TRACING_BACKEND