diff --git a/cpp/cmake_modules/DefineOptions.cmake b/cpp/cmake_modules/DefineOptions.cmake index dc0e5da63adb7..5e22cd0c23ec4 100644 --- a/cpp/cmake_modules/DefineOptions.cmake +++ b/cpp/cmake_modules/DefineOptions.cmake @@ -402,6 +402,12 @@ takes precedence over ccache if a storage backend is configured" ON) define_option(ARROW_S3 "Build Arrow with S3 support (requires the AWS SDK for C++)" OFF) + define_option(ARROW_S3_MODULE + "Build Arrow with S3 support as a dynamic module" + OFF + DEPENDS + ARROW_S3) + define_option(ARROW_SKYHOOK "Build the Skyhook libraries" OFF diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 5d61112518f5e..2e6c02941f56d 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -811,7 +811,7 @@ if(ARROW_FILESYSTEM) if(ARROW_HDFS) list(APPEND ARROW_FILESYSTEM_SRCS filesystem/hdfs.cc) endif() - if(ARROW_S3) + if(ARROW_S3 AND NOT ARROW_S3_MODULE) list(APPEND ARROW_FILESYSTEM_SRCS filesystem/s3fs.cc) set_source_files_properties(filesystem/s3fs.cc PROPERTIES SKIP_PRECOMPILE_HEADERS ON @@ -836,9 +836,20 @@ if(ARROW_FILESYSTEM) endforeach() endif() if(ARROW_S3) - foreach(ARROW_FILESYSTEM_TARGET ${ARROW_FILESYSTEM_TARGETS}) - target_link_libraries(${ARROW_FILESYSTEM_TARGET} PRIVATE ${AWSSDK_LINK_LIBRARIES}) - endforeach() + if(ARROW_S3_MODULE) + if(NOT ARROW_BUILD_SHARED) + message(FATAL_ERROR "ARROW_S3_MODULE with static linkage is not supported") + endif() + add_library(arrow_s3fs MODULE filesystem/s3fs_module.cc filesystem/s3fs.cc) + target_link_libraries(arrow_s3fs PRIVATE ${AWSSDK_LINK_LIBRARIES} arrow_shared) + set_source_files_properties(filesystem/s3fs.cc filesystem/s3fs_module.cc + PROPERTIES SKIP_PRECOMPILE_HEADERS ON + SKIP_UNITY_BUILD_INCLUSION ON) + else() + foreach(ARROW_FILESYSTEM_TARGET ${ARROW_FILESYSTEM_TARGETS}) + target_link_libraries(${ARROW_FILESYSTEM_TARGET} PRIVATE ${AWSSDK_LINK_LIBRARIES}) + endforeach() + endif() endif() list(APPEND ARROW_TESTING_SHARED_LINK_LIBS ${ARROW_GTEST_GMOCK}) diff --git a/cpp/src/arrow/adapters/orc/util.cc b/cpp/src/arrow/adapters/orc/util.cc index 2a74bec1aa6fd..57df1b330283a 100644 --- a/cpp/src/arrow/adapters/orc/util.cc +++ b/cpp/src/arrow/adapters/orc/util.cc @@ -18,6 +18,7 @@ #include "arrow/adapters/orc/util.h" #include +#include #include #include #include diff --git a/cpp/src/arrow/filesystem/CMakeLists.txt b/cpp/src/arrow/filesystem/CMakeLists.txt index 0a31a64b7a3a4..db8a8940f65dd 100644 --- a/cpp/src/arrow/filesystem/CMakeLists.txt +++ b/cpp/src/arrow/filesystem/CMakeLists.txt @@ -62,7 +62,7 @@ if(ARROW_AZURE) Boost::system) endif() -if(ARROW_S3) +if(ARROW_S3 AND NOT ARROW_S3_MODULE) add_arrow_test(s3fs_test SOURCES s3fs_test.cc @@ -115,6 +115,18 @@ if(ARROW_S3) target_link_libraries(arrow-filesystem-s3fs-benchmark PRIVATE parquet_shared) endif() endif() +elseif(ARROW_S3 AND ARROW_S3_MODULE) + add_arrow_test(s3fs_test + SOURCES + s3fs_module_test.cc + s3_test_util.cc + EXTRA_LABELS + filesystem + DEFINITIONS + ARROW_S3_LIBPATH="$" + EXTRA_LINK_LIBS + Boost::filesystem + Boost::system) endif() if(ARROW_HDFS) diff --git a/cpp/src/arrow/filesystem/filesystem.cc b/cpp/src/arrow/filesystem/filesystem.cc index 284be685fa800..60cdd63e05290 100644 --- a/cpp/src/arrow/filesystem/filesystem.cc +++ b/cpp/src/arrow/filesystem/filesystem.cc @@ -34,9 +34,6 @@ #ifdef ARROW_HDFS #include "arrow/filesystem/hdfs.h" #endif -#ifdef ARROW_S3 -#include "arrow/filesystem/s3fs.h" -#endif #include "arrow/filesystem/localfs.h" #include "arrow/filesystem/mockfs.h" #include "arrow/filesystem/path_util.h" @@ -749,7 +746,7 @@ class FileSystemFactoryRegistry { if (finalized_) return; for (const auto& [_, registered_or_error] : scheme_to_factory_) { - if (!registered_or_error.ok()) continue; + if (!registered_or_error.ok() || !registered_or_error->finalizer) continue; registered_or_error->finalizer(); } finalized_ = true; @@ -896,18 +893,6 @@ Result> FileSystemFromUriReal(const Uri& uri, "without HDFS support"); #endif } - if (scheme == "s3") { -#ifdef ARROW_S3 - RETURN_NOT_OK(EnsureS3Initialized()); - ARROW_ASSIGN_OR_RAISE(auto options, S3Options::FromUri(uri, out_path)); - ARROW_ASSIGN_OR_RAISE(auto s3fs, S3FileSystem::Make(options, io_context)); - return s3fs; -#else - return Status::NotImplemented( - "Got S3 URI but Arrow compiled " - "without S3 support"); -#endif - } if (scheme == "mock") { // MockFileSystem does not have an diff --git a/cpp/src/arrow/filesystem/filesystem.h b/cpp/src/arrow/filesystem/filesystem.h index d4f62f86a7482..3a47eb62f5245 100644 --- a/cpp/src/arrow/filesystem/filesystem.h +++ b/cpp/src/arrow/filesystem/filesystem.h @@ -198,7 +198,7 @@ class ARROW_EXPORT FileSystem virtual Result PathFromUri(const std::string& uri_string) const; /// \brief Make a URI from which FileSystemFromUri produces an equivalent filesystem - /// \param path The path component to use in the resulting URI + /// \param path The path component to use in the resulting URI. Must be absolute. /// \return A URI string, or an error if an equivalent URI cannot be produced virtual Result MakeUri(std::string path) const; diff --git a/cpp/src/arrow/filesystem/localfs.cc b/cpp/src/arrow/filesystem/localfs.cc index 25ac04b758f9b..bca2ad77bb144 100644 --- a/cpp/src/arrow/filesystem/localfs.cc +++ b/cpp/src/arrow/filesystem/localfs.cc @@ -288,7 +288,14 @@ Result LocalFileSystem::PathFromUri(const std::string& uri_string) Result LocalFileSystem::MakeUri(std::string path) const { ARROW_ASSIGN_OR_RAISE(path, DoNormalizePath(std::move(path))); - return "file://" + path + (options_.use_mmap ? "?use_mmap" : ""); + if (!internal::DetectAbsolutePath(path)) { + return Status::Invalid("MakeUri requires an absolute path, got ", path); + } + ARROW_ASSIGN_OR_RAISE(auto uri, util::UriFromAbsolutePath(path)); + if (uri[0] == '/') { + uri = "file://" + uri; + } + return uri + (options_.use_mmap ? "?use_mmap" : ""); } bool LocalFileSystem::Equals(const FileSystem& other) const { diff --git a/cpp/src/arrow/filesystem/localfs_test.cc b/cpp/src/arrow/filesystem/localfs_test.cc index d68c992dff863..a0e8e515af107 100644 --- a/cpp/src/arrow/filesystem/localfs_test.cc +++ b/cpp/src/arrow/filesystem/localfs_test.cc @@ -21,6 +21,7 @@ #include #include +#include #include #include "arrow/filesystem/filesystem.h" @@ -428,9 +429,15 @@ TYPED_TEST(TestLocalFS, FileSystemFromUriFile) { this->TestLocalUri("file:///_?use_mmap", "/_"); if (this->path_formatter_.supports_uri()) { + EXPECT_THAT(this->fs_->MakeUri(""), Raises(StatusCode::Invalid)); + EXPECT_THAT(this->fs_->MakeUri("a/b"), Raises(StatusCode::Invalid)); + ASSERT_TRUE(this->local_fs_->options().use_mmap); ASSERT_OK_AND_ASSIGN(auto uri, this->fs_->MakeUri("/_")); EXPECT_EQ(uri, "file:///_?use_mmap"); + + ASSERT_OK_AND_ASSIGN(uri, this->fs_->MakeUri("/hello world/b/c")); + EXPECT_EQ(uri, "file:///hello%20world/b/c?use_mmap"); } #ifdef _WIN32 diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index 640888e1c4fa5..00978c2731f9d 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -136,6 +136,7 @@ #include "arrow/util/string.h" #include "arrow/util/task_group.h" #include "arrow/util/thread_pool.h" +#include "arrow/util/value_parsing.h" namespace arrow::fs { @@ -166,6 +167,8 @@ static constexpr const char kAwsEndpointUrlEnvVar[] = "AWS_ENDPOINT_URL"; static constexpr const char kAwsEndpointUrlS3EnvVar[] = "AWS_ENDPOINT_URL_S3"; static constexpr const char kAwsDirectoryContentType[] = "application/x-directory"; +using std::operator""s; + // ----------------------------------------------------------------------- // S3ProxyOptions implementation @@ -2766,6 +2769,30 @@ Result S3FileSystem::PathFromUri(const std::string& uri_string) con internal::AuthorityHandlingBehavior::kPrepend); } +Result S3FileSystem::MakeUri(std::string path) const { + if (path.length() <= 1 || path[0] != '/') { + return Status::Invalid("MakeUri requires an absolute, non-root path, got ", path); + } + ARROW_ASSIGN_OR_RAISE(auto uri, util::UriFromAbsolutePath(path)); + if (!options().GetAccessKey().empty()) { + uri = "s3://" + options().GetAccessKey() + ":" + options().GetSecretKey() + "@" + + uri.substr("file:///"s.size()); + } else { + uri = "s3" + uri.substr("file"s.size()); + } + uri += "?"; + uri += "region=" + util::UriEscape(options().region); + uri += "&"; + uri += "scheme=" + options().scheme; + uri += "&"; + uri += "endpoint_override=" + util::UriEscape(options().endpoint_override); + uri += "&"; + uri += "allow_bucket_creation="s + (options().allow_bucket_creation ? "1" : "0"); + uri += "&"; + uri += "allow_bucket_deletion="s + (options().allow_bucket_deletion ? "1" : "0"); + return uri; +} + S3Options S3FileSystem::options() const { return impl_->options(); } std::string S3FileSystem::region() const { return impl_->region(); } @@ -3196,32 +3223,33 @@ bool IsS3Finalized() { return GetAwsInstance()->IsFinalized(); } S3GlobalOptions S3GlobalOptions::Defaults() { auto log_level = S3LogLevel::Fatal; - - auto result = arrow::internal::GetEnvVar("ARROW_S3_LOG_LEVEL"); - - if (result.ok()) { - // Extract, trim, and downcase the value of the environment variable - auto value = - arrow::internal::AsciiToLower(arrow::internal::TrimString(result.ValueUnsafe())); - - if (value == "fatal") { - log_level = S3LogLevel::Fatal; - } else if (value == "error") { - log_level = S3LogLevel::Error; - } else if (value == "warn") { - log_level = S3LogLevel::Warn; - } else if (value == "info") { - log_level = S3LogLevel::Info; - } else if (value == "debug") { - log_level = S3LogLevel::Debug; - } else if (value == "trace") { - log_level = S3LogLevel::Trace; - } else if (value == "off") { - log_level = S3LogLevel::Off; - } - } - - return S3GlobalOptions{log_level}; + int num_event_loop_threads = 1; + // Extract, trim, and downcase the value of the environment variable + auto value = arrow::internal::GetEnvVar("ARROW_S3_LOG_LEVEL") + .Map(arrow::internal::AsciiToLower) + .Map(arrow::internal::TrimString) + .ValueOr("fatal"); + if (value == "fatal") { + log_level = S3LogLevel::Fatal; + } else if (value == "error") { + log_level = S3LogLevel::Error; + } else if (value == "warn") { + log_level = S3LogLevel::Warn; + } else if (value == "info") { + log_level = S3LogLevel::Info; + } else if (value == "debug") { + log_level = S3LogLevel::Debug; + } else if (value == "trace") { + log_level = S3LogLevel::Trace; + } else if (value == "off") { + log_level = S3LogLevel::Off; + } + + value = arrow::internal::GetEnvVar("ARROW_S3_THREADS").ValueOr("1"); + if (uint32_t u; ::arrow::internal::ParseUnsigned(value.data(), value.size(), &u)) { + num_event_loop_threads = u; + } + return S3GlobalOptions{log_level, num_event_loop_threads}; } // ----------------------------------------------------------------------- @@ -3239,4 +3267,14 @@ Result ResolveS3BucketRegion(const std::string& bucket) { return resolver->ResolveRegion(bucket); } +auto kS3FileSystemModule = ARROW_REGISTER_FILESYSTEM( + "s3", + [](const arrow::util::Uri& uri, const io::IOContext& io_context, + std::string* out_path) -> Result> { + RETURN_NOT_OK(EnsureS3Initialized()); + ARROW_ASSIGN_OR_RAISE(auto options, S3Options::FromUri(uri, out_path)); + return S3FileSystem::Make(options, io_context); + }, + [] { DCHECK_OK(EnsureS3Finalized()); }); + } // namespace arrow::fs diff --git a/cpp/src/arrow/filesystem/s3fs.h b/cpp/src/arrow/filesystem/s3fs.h index 82d08bc5ea89a..b23c93b8af1c3 100644 --- a/cpp/src/arrow/filesystem/s3fs.h +++ b/cpp/src/arrow/filesystem/s3fs.h @@ -25,20 +25,16 @@ #include "arrow/util/macros.h" #include "arrow/util/uri.h" -namespace Aws { -namespace Auth { - +namespace Aws::Auth { class AWSCredentialsProvider; class STSAssumeRoleCredentialsProvider; +} // namespace Aws::Auth -} // namespace Auth -namespace STS { +namespace Aws::STS { class STSClient; -} -} // namespace Aws +} // namespace Aws::STS -namespace arrow { -namespace fs { +namespace arrow::fs { /// Options for using a proxy for S3 struct ARROW_EXPORT S3ProxyOptions { @@ -256,6 +252,7 @@ class ARROW_EXPORT S3FileSystem : public FileSystem { bool Equals(const FileSystem& other) const override; Result PathFromUri(const std::string& uri_string) const override; + Result MakeUri(std::string path) const override; /// \cond FALSE using FileSystem::CreateDir; @@ -397,5 +394,4 @@ Status EnsureS3Finalized(); ARROW_EXPORT Result ResolveS3BucketRegion(const std::string& bucket); -} // namespace fs -} // namespace arrow +} // namespace arrow::fs diff --git a/cpp/src/arrow/filesystem/s3fs_module.cc b/cpp/src/arrow/filesystem/s3fs_module.cc new file mode 100644 index 0000000000000..d91413138b454 --- /dev/null +++ b/cpp/src/arrow/filesystem/s3fs_module.cc @@ -0,0 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/filesystem/filesystem_library.h" diff --git a/cpp/src/arrow/filesystem/s3fs_module_test.cc b/cpp/src/arrow/filesystem/s3fs_module_test.cc new file mode 100644 index 0000000000000..7c777cc2557ea --- /dev/null +++ b/cpp/src/arrow/filesystem/s3fs_module_test.cc @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "arrow/filesystem/filesystem.h" +#include "arrow/filesystem/s3_test_util.h" +#include "arrow/filesystem/s3fs.h" +#include "arrow/filesystem/test_util.h" +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/testing/future_util.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/matchers.h" +#include "arrow/util/async_generator.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/future.h" +#include "arrow/util/io_util.h" +#include "arrow/util/key_value_metadata.h" +#include "arrow/util/logging.h" +#include "arrow/util/macros.h" +#include "arrow/util/range.h" +#include "arrow/util/string.h" + +namespace arrow::fs { + +auto* minio_env = ::testing::AddGlobalTestEnvironment(new MinioTestEnvironment); + +MinioTestEnvironment* GetMinioEnv() { + return ::arrow::internal::checked_cast(minio_env); +} + +class RegistrationTestEnvironment : public ::testing::Environment { + public: + void SetUp() override { ASSERT_OK(LoadFileSystemFactories(ARROW_S3_LIBPATH)); }; + void TearDown() override { EnsureFinalized(); }; +}; + +auto* lib_env = ::testing::AddGlobalTestEnvironment(new RegistrationTestEnvironment); + +TEST(S3Test, FromUri) { + // XXX s3fs_test uses Aws::S3::S3Client to ensure that the server is initialized. + ASSERT_OK_AND_ASSIGN(auto minio, GetMinioEnv()->GetOneServer()); + + std::string path; + ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFromUri("s3://" + minio->access_key() + ":" + + minio->secret_key() + + "@bucket/somedir/subdir/subfile", + &path)); + + EXPECT_EQ(fs->MakeUri("/" + path), + "s3://minio:miniopass@bucket/somedir/subdir/subfile" + "?region=us-east-1&scheme=https&endpoint_override=" + "&allow_bucket_creation=0&allow_bucket_deletion=0"); +} + +} // namespace arrow::fs diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc index d48f9eb97d562..e75bb46e7fd09 100644 --- a/cpp/src/arrow/util/io_util.cc +++ b/cpp/src/arrow/util/io_util.cc @@ -2232,7 +2232,7 @@ Result LoadDynamicLibrary(const char* path) { constexpr int kFlags = // All undefined symbols in the shared object are resolved before dlopen() returns. RTLD_NOW - // Symbols defined in this shared object are not made available to + // Symbols defined in this shared object are not made available to // resolve references in subsequently loaded shared objects. | RTLD_LOCAL; if (void* handle = dlopen(path, kFlags)) return handle; diff --git a/docs/source/cpp/env_vars.rst b/docs/source/cpp/env_vars.rst index 116c151824c75..5963fc4b5ff36 100644 --- a/docs/source/cpp/env_vars.rst +++ b/docs/source/cpp/env_vars.rst @@ -108,6 +108,12 @@ that changing their value later will have an effect. `Logging - AWS SDK For C++ `__ +.. envvar:: ARROW_S3_THREADS + + The number of threads to configure when creating AWS' I/O event loop. + + Defaults to 1 as recommended by AWS' doc when the # of connections is + expected to be, at most, in the hundreds. .. envvar:: ARROW_TRACING_BACKEND diff --git a/testing b/testing index 735ae7128d571..25d16511e8d42 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit 735ae7128d571398dd798d7ff004adebeb342883 +Subproject commit 25d16511e8d42c2744a1d94d90169e3a36e92631