Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-40343: [C++] Move S3FileSystem to the registry #41559

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cpp/cmake_modules/DefineOptions.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,12 @@ takes precedence over ccache if a storage backend is configured" ON)
DEPENDS
ARROW_FILESYSTEM)

define_option(ARROW_S3_MODULE
"Build the Arrow S3 filesystem as a dynamic module"
OFF
DEPENDS
ARROW_S3)

define_option(ARROW_SKYHOOK
"Build the Skyhook libraries"
OFF
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,18 @@ if(ARROW_FILESYSTEM)
foreach(ARROW_FILESYSTEM_TARGET ${ARROW_FILESYSTEM_TARGETS})
target_link_libraries(${ARROW_FILESYSTEM_TARGET} PRIVATE ${AWSSDK_LINK_LIBRARIES})
endforeach()

if(ARROW_S3_MODULE)
if(NOT ARROW_BUILD_SHARED)
message(FATAL_ERROR "ARROW_S3_MODULE without shared libarrow is not supported")
bkietz marked this conversation as resolved.
Show resolved Hide resolved
endif()

add_library(arrow_s3fs MODULE filesystem/s3fs_module.cc filesystem/s3fs.cc)
target_link_libraries(arrow_s3fs PRIVATE ${AWSSDK_LINK_LIBRARIES} arrow_shared)
set_source_files_properties(filesystem/s3fs.cc filesystem/s3fs_module.cc
PROPERTIES SKIP_PRECOMPILE_HEADERS ON
SKIP_UNITY_BUILD_INCLUSION ON)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we remove the linking of the main arrow library against AWSSDK_LINK_LIBRARIES when ARROW_S3_MODULE is ON?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to require the main arrow library to load the module. After the module can reproduce the full functionality of the S3FileSystem (S3ProxyOptions, for example) we can deprecate building s3 in the main library. That's not in scope for now, though

endif()
endif()

list(APPEND ARROW_TESTING_SHARED_LINK_LIBS ${ARROW_GTEST_GMOCK})
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/adapters/orc/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "arrow/adapters/orc/util.h"

#include <cmath>
#include <sstream>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used in GetOrcMajorVersion(), so it should be #included here by IWYU. I found it because compilation failed at one point (compilation can spuriously succeed when standard headers include more definitions than the standard guarantees; for example, inclusion of only <string> or <iostream> might fully define std::stringstream rather than only forward declaring it)

#include <string>
#include <string_view>
#include <vector>
Expand Down
18 changes: 18 additions & 0 deletions cpp/src/arrow/filesystem/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,24 @@ if(ARROW_S3)
target_link_libraries(arrow-filesystem-s3fs-benchmark PRIVATE parquet_shared)
endif()
endif()

if(ARROW_S3_MODULE)
add_arrow_test(s3fs_module_test
SOURCES
s3fs_module_test.cc
s3_test_util.cc
EXTRA_LABELS
filesystem
DEFINITIONS
ARROW_S3_LIBPATH="$<TARGET_FILE:arrow_s3fs>"
EXTRA_LINK_LIBS
Boost::filesystem
Boost::system)
Comment on lines +134 to +136
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@assignUser does the auto-formatter for CMake has the ability to indent these?

                   SOURCES
                     s3fs_module_test.cc
                     s3_test_util.cc
                   EXTRA_LABELS
                     filesystem
                   DEFINITIONS
                     ARROW_S3_LIBPATH="$<TARGET_FILE:arrow_s3fs>"
                   EXTRA_LINK_LIBS
                     Boost::filesystem
                     Boost::system)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we could, but it'd involve declaring each function like add_arrow_library to additional_commands https://cmake-format.readthedocs.io/en/latest/configuration.html#configuration

Copy link
Member

@assignUser assignUser Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, missed that ping. But yes, that is possible as Ben says, I have done something similar for velox custom functions. But currently I am a little concerned that cmake-format seems to be unmaintained https://github.com/cheshirekow/cmake_format/issues/340

target_compile_definitions(arrow-filesystem-test
PUBLIC ARROW_S3_LIBPATH="$<TARGET_FILE:arrow_s3fs>")
target_sources(arrow-filesystem-test PUBLIC s3fs_module_test.cc s3_test_util.cc)
target_link_libraries(arrow-filesystem-test PUBLIC Boost::filesystem Boost::system)
endif()
endif()

if(ARROW_HDFS)
Expand Down
44 changes: 28 additions & 16 deletions cpp/src/arrow/filesystem/filesystem.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@
#ifdef ARROW_HDFS
# include "arrow/filesystem/hdfs.h"
#endif
#ifdef ARROW_S3
# include "arrow/filesystem/s3fs.h"
#endif
#include "arrow/filesystem/localfs.h"
#include "arrow/filesystem/mockfs.h"
#include "arrow/filesystem/path_util.h"
Expand Down Expand Up @@ -700,6 +697,29 @@ class FileSystemFactoryRegistry {
return &registry;
}

Status Unregister(const std::string& scheme) {
std::shared_lock lock{mutex_};
RETURN_NOT_OK(CheckValid());

auto it = scheme_to_factory_.find(scheme);
if (it == scheme_to_factory_.end()) {
return Status::KeyError("No factories found for scheme ", scheme,
", can't unregister");
}

std::function<void()> finalizer;
if (it->second.ok()) {
finalizer = it->second.ValueOrDie().finalizer;
}
scheme_to_factory_.erase(it);
lock.unlock();

if (finalizer) {
finalizer();
}
return Status::OK();
}

Result<const FileSystemFactory*> FactoryForScheme(const std::string& scheme) {
std::shared_lock lock{mutex_};
RETURN_NOT_OK(CheckValid());
Expand Down Expand Up @@ -749,7 +769,7 @@ class FileSystemFactoryRegistry {
if (finalized_) return;

for (const auto& [_, registered_or_error] : scheme_to_factory_) {
if (!registered_or_error.ok()) continue;
if (!registered_or_error.ok() || !registered_or_error->finalizer) continue;
registered_or_error->finalizer();
}
finalized_ = true;
Expand Down Expand Up @@ -819,6 +839,10 @@ FileSystemRegistrar::FileSystemRegistrar(std::string scheme, FileSystemFactory f

namespace internal {
void* GetFileSystemRegistry() { return FileSystemFactoryRegistry::GetInstance(); }

Status UnregisterFileSystemFactory(const std::string& scheme) {
return FileSystemFactoryRegistry::GetInstance()->Unregister(scheme);
}
} // namespace internal

Status LoadFileSystemFactories(const char* libpath) {
Expand Down Expand Up @@ -896,18 +920,6 @@ Result<std::shared_ptr<FileSystem>> FileSystemFromUriReal(const Uri& uri,
"without HDFS support");
#endif
}
if (scheme == "s3") {
#ifdef ARROW_S3
RETURN_NOT_OK(EnsureS3Initialized());
ARROW_ASSIGN_OR_RAISE(auto options, S3Options::FromUri(uri, out_path));
ARROW_ASSIGN_OR_RAISE(auto s3fs, S3FileSystem::Make(options, io_context));
return s3fs;
#else
return Status::NotImplemented(
"Got S3 URI but Arrow compiled "
"without S3 support");
#endif
}

if (scheme == "mock") {
// MockFileSystem does not have an
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/filesystem/filesystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ class ARROW_EXPORT FileSystem
virtual Result<std::string> PathFromUri(const std::string& uri_string) const;

/// \brief Make a URI from which FileSystemFromUri produces an equivalent filesystem
/// \param path The path component to use in the resulting URI
/// \param path The path component to use in the resulting URI. Must be absolute.
/// \return A URI string, or an error if an equivalent URI cannot be produced
virtual Result<std::string> MakeUri(std::string path) const;

Expand Down
9 changes: 8 additions & 1 deletion cpp/src/arrow/filesystem/localfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,14 @@ Result<std::string> LocalFileSystem::PathFromUri(const std::string& uri_string)

Result<std::string> LocalFileSystem::MakeUri(std::string path) const {
ARROW_ASSIGN_OR_RAISE(path, DoNormalizePath(std::move(path)));
return "file://" + path + (options_.use_mmap ? "?use_mmap" : "");
if (!internal::DetectAbsolutePath(path)) {
return Status::Invalid("MakeUri requires an absolute path, got ", path);
}
ARROW_ASSIGN_OR_RAISE(auto uri, util::UriFromAbsolutePath(path));
if (uri[0] == '/') {
uri = "file://" + uri;
}
return uri + (options_.use_mmap ? "?use_mmap" : "");
}

bool LocalFileSystem::Equals(const FileSystem& other) const {
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/arrow/filesystem/localfs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <utility>
#include <vector>

#include <gmock/gmock-matchers.h>
#include <gtest/gtest.h>

#include "arrow/filesystem/filesystem.h"
Expand Down Expand Up @@ -428,9 +429,15 @@ TYPED_TEST(TestLocalFS, FileSystemFromUriFile) {

this->TestLocalUri("file:///_?use_mmap", "/_");
if (this->path_formatter_.supports_uri()) {
EXPECT_THAT(this->fs_->MakeUri(""), Raises(StatusCode::Invalid));
EXPECT_THAT(this->fs_->MakeUri("a/b"), Raises(StatusCode::Invalid));

ASSERT_TRUE(this->local_fs_->options().use_mmap);
ASSERT_OK_AND_ASSIGN(auto uri, this->fs_->MakeUri("/_"));
EXPECT_EQ(uri, "file:///_?use_mmap");

ASSERT_OK_AND_ASSIGN(uri, this->fs_->MakeUri("/hello world/b/c"));
EXPECT_EQ(uri, "file:///hello%20world/b/c?use_mmap");
}

#ifdef _WIN32
Expand Down
90 changes: 64 additions & 26 deletions cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@
#include "arrow/util/string.h"
#include "arrow/util/task_group.h"
#include "arrow/util/thread_pool.h"
#include "arrow/util/value_parsing.h"

namespace arrow::fs {

Expand Down Expand Up @@ -168,6 +169,8 @@ static constexpr const char kAwsEndpointUrlEnvVar[] = "AWS_ENDPOINT_URL";
static constexpr const char kAwsEndpointUrlS3EnvVar[] = "AWS_ENDPOINT_URL_S3";
static constexpr const char kAwsDirectoryContentType[] = "application/x-directory";

using namespace std::string_literals; // NOLINT(build/namespaces)

// -----------------------------------------------------------------------
// S3ProxyOptions implementation

Expand Down Expand Up @@ -3031,6 +3034,30 @@ Result<std::string> S3FileSystem::PathFromUri(const std::string& uri_string) con
internal::AuthorityHandlingBehavior::kPrepend);
}

Result<std::string> S3FileSystem::MakeUri(std::string path) const {
if (path.length() <= 1 || path[0] != '/') {
return Status::Invalid("MakeUri requires an absolute, non-root path, got ", path);
}
ARROW_ASSIGN_OR_RAISE(auto uri, util::UriFromAbsolutePath(path));
if (!options().GetAccessKey().empty()) {
uri = "s3://" + options().GetAccessKey() + ":" + options().GetSecretKey() + "@" +
uri.substr("file:///"s.size());
Comment on lines +3043 to +3044
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these be in URIs? Has this always been the case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been in place since S3 URIs were supported, #6403

The URI is intended to be a fully self contained initializer for a filesystem, so if the filesystem requires secrets for initialization then the URI must contain them

Copy link
Member

@assignUser assignUser Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still recommended practice? It seems Azure recommends not doing this to prevent accidental exposure of credentials in logs etc. https://lists.apache.org/thread/mhwr2lvtxvjcqos12k7hr4cqkdofrxxo

I don't know anything about S3 auth though so 🤷 (in any case probably something for a follow up)

} else {
uri = "s3" + uri.substr("file"s.size());
}
uri += "?";
uri += "region=" + util::UriEscape(options().region);
uri += "&";
uri += "scheme=" + options().scheme;
uri += "&";
uri += "endpoint_override=" + util::UriEscape(options().endpoint_override);
uri += "&";
uri += "allow_bucket_creation="s + (options().allow_bucket_creation ? "1" : "0");
uri += "&";
uri += "allow_bucket_deletion="s + (options().allow_bucket_deletion ? "1" : "0");
return uri;
}
felipecrv marked this conversation as resolved.
Show resolved Hide resolved

S3Options S3FileSystem::options() const { return impl_->options(); }

std::string S3FileSystem::region() const { return impl_->region(); }
Expand Down Expand Up @@ -3492,32 +3519,33 @@ bool IsS3Finalized() { return GetAwsInstance()->IsFinalized(); }

S3GlobalOptions S3GlobalOptions::Defaults() {
auto log_level = S3LogLevel::Fatal;

auto result = arrow::internal::GetEnvVar("ARROW_S3_LOG_LEVEL");

if (result.ok()) {
// Extract, trim, and downcase the value of the environment variable
auto value =
arrow::internal::AsciiToLower(arrow::internal::TrimString(result.ValueUnsafe()));

if (value == "fatal") {
log_level = S3LogLevel::Fatal;
} else if (value == "error") {
log_level = S3LogLevel::Error;
} else if (value == "warn") {
log_level = S3LogLevel::Warn;
} else if (value == "info") {
log_level = S3LogLevel::Info;
} else if (value == "debug") {
log_level = S3LogLevel::Debug;
} else if (value == "trace") {
log_level = S3LogLevel::Trace;
} else if (value == "off") {
log_level = S3LogLevel::Off;
}
}

return S3GlobalOptions{log_level};
int num_event_loop_threads = 1;
// Extract, trim, and downcase the value of the environment variable
auto value = arrow::internal::GetEnvVar("ARROW_S3_LOG_LEVEL")
.Map(arrow::internal::AsciiToLower)
.Map(arrow::internal::TrimString)
.ValueOr("fatal");
if (value == "fatal") {
log_level = S3LogLevel::Fatal;
} else if (value == "error") {
log_level = S3LogLevel::Error;
} else if (value == "warn") {
log_level = S3LogLevel::Warn;
} else if (value == "info") {
log_level = S3LogLevel::Info;
} else if (value == "debug") {
log_level = S3LogLevel::Debug;
} else if (value == "trace") {
log_level = S3LogLevel::Trace;
} else if (value == "off") {
log_level = S3LogLevel::Off;
}

value = arrow::internal::GetEnvVar("ARROW_S3_THREADS").ValueOr("1");
if (uint32_t u; ::arrow::internal::ParseUnsigned(value.data(), value.size(), &u)) {
num_event_loop_threads = u;
}
return S3GlobalOptions{log_level, num_event_loop_threads};
}

// -----------------------------------------------------------------------
Expand All @@ -3535,4 +3563,14 @@ Result<std::string> ResolveS3BucketRegion(const std::string& bucket) {
return resolver->ResolveRegion(bucket);
}

auto kS3FileSystemModule = ARROW_REGISTER_FILESYSTEM(
"s3",
[](const arrow::util::Uri& uri, const io::IOContext& io_context,
std::string* out_path) -> Result<std::shared_ptr<fs::FileSystem>> {
RETURN_NOT_OK(EnsureS3Initialized());
ARROW_ASSIGN_OR_RAISE(auto options, S3Options::FromUri(uri, out_path));
return S3FileSystem::Make(options, io_context);
},
[] { DCHECK_OK(EnsureS3Finalized()); });

} // namespace arrow::fs
18 changes: 7 additions & 11 deletions cpp/src/arrow/filesystem/s3fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,16 @@
#include "arrow/util/macros.h"
#include "arrow/util/uri.h"

namespace Aws {
namespace Auth {

namespace Aws::Auth {
class AWSCredentialsProvider;
class STSAssumeRoleCredentialsProvider;
} // namespace Aws::Auth

} // namespace Auth
namespace STS {
namespace Aws::STS {
class STSClient;
}
} // namespace Aws
} // namespace Aws::STS

namespace arrow {
namespace fs {
namespace arrow::fs {

/// Options for using a proxy for S3
struct ARROW_EXPORT S3ProxyOptions {
Expand Down Expand Up @@ -277,6 +273,7 @@ class ARROW_EXPORT S3FileSystem : public FileSystem {

bool Equals(const FileSystem& other) const override;
Result<std::string> PathFromUri(const std::string& uri_string) const override;
Result<std::string> MakeUri(std::string path) const override;

/// \cond FALSE
using FileSystem::CreateDir;
Expand Down Expand Up @@ -418,5 +415,4 @@ Status EnsureS3Finalized();
ARROW_EXPORT
Result<std::string> ResolveS3BucketRegion(const std::string& bucket);

} // namespace fs
} // namespace arrow
} // namespace arrow::fs
18 changes: 18 additions & 0 deletions cpp/src/arrow/filesystem/s3fs_module.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/filesystem/filesystem_library.h"
Loading
Loading