diff --git a/CMakeLists.txt b/CMakeLists.txt index a9612b0173916..5206f27cab52e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -98,6 +98,7 @@ option(VELOX_ENABLE_S3 "Build S3 Connector" OFF) option(VELOX_ENABLE_GCS "Build GCS Connector" OFF) option(VELOX_ENABLE_ABFS "Build Abfs Connector" OFF) option(VELOX_ENABLE_HDFS "Build Hdfs Connector" OFF) +option(VELOX_ENABLE_HDFS3 "Build Hdfs3 Connector" OFF) option(VELOX_ENABLE_PARQUET "Enable Parquet support" OFF) option(VELOX_ENABLE_ARROW "Enable Arrow support" OFF) option(VELOX_ENABLE_REMOTE_FUNCTIONS "Enable remote function support" OFF) @@ -237,6 +238,31 @@ if(VELOX_ENABLE_ABFS) endif() if(VELOX_ENABLE_HDFS) + if(DEFINED ENV{HADOOP_HOME} AND DEFINED ENV{JAVA_HOME}) + message(STATUS "HADOOP_HOME is set to $ENV{HADOOP_HOME}") + message(STATUS "JAVA_HOME is set to $ENV{JAVA_HOME}") + + set(HADOOP_HOME $ENV{HADOOP_HOME}) + set(JAVA_HOME $ENV{JAVA_HOME}) + else() + message(FATAL_ERROR "Both HADOOP_HOME and JAVA_HOME need to be set when enabling libhdfs. Please set the HADOOP_HOME and JAVA_HOME environment variables.") + # You can stop the configuration process if both variables are required + endif() + + find_library( + LIBHDFS + NAMES libhdfs.so + HINTS "${HADOOP_HOME}/lib/native" REQUIRED) + + find_library( + LIBJVM + NAMES libjvm.so + HINTS "${JAVA_HOME}/jre/lib/amd64/server/" REQUIRED) + + add_definitions(-DVELOX_ENABLE_HDFS) +endif() + +if(VELOX_ENABLE_HDFS3) find_library( LIBHDFS3 NAMES libhdfs3.so libhdfs3.dylib diff --git a/velox/connectors/hive/HiveConnector.cpp b/velox/connectors/hive/HiveConnector.cpp index 84372f6bb6e01..decaed42bb0be 100644 --- a/velox/connectors/hive/HiveConnector.cpp +++ b/velox/connectors/hive/HiveConnector.cpp @@ -25,7 +25,7 @@ #ifdef VELOX_ENABLE_GCS #include "velox/connectors/hive/storage_adapters/gcs/RegisterGCSFileSystem.h" // @manual #endif -#ifdef VELOX_ENABLE_HDFS3 +#if defined(VELOX_ENABLE_HDFS) || defined(VELOX_ENABLE_HDFS3) #include "velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.h" // @manual #endif #ifdef VELOX_ENABLE_S3 diff --git a/velox/connectors/hive/storage_adapters/hdfs/CMakeLists.txt b/velox/connectors/hive/storage_adapters/hdfs/CMakeLists.txt index d6363d9e71c55..fb3375ea6b84e 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/CMakeLists.txt +++ b/velox/connectors/hive/storage_adapters/hdfs/CMakeLists.txt @@ -17,9 +17,22 @@ add_library(velox_hdfs RegisterHdfsFileSystem.cpp) if(VELOX_ENABLE_HDFS) + target_sources(velox_hdfs PRIVATE HdfsFileSystem.cpp HdfsReadFile.cpp + HdfsWriteFile.cpp HdfsInternal.cpp) + target_link_libraries(velox_hdfs Folly::folly ${LIBHDFS} ${LIBJVM} xsimd + arrow) + + target_include_directories(velox_hdfs PRIVATE ${HADOOP_HOME}/include/) + + if(${VELOX_BUILD_TESTING}) + add_subdirectory(tests) + endif() +endif() + +if(VELOX_ENABLE_HDFS3) target_sources(velox_hdfs PRIVATE HdfsFileSystem.cpp HdfsReadFile.cpp HdfsWriteFile.cpp) - target_link_libraries(velox_hdfs Folly::folly ${LIBHDFS3} xsimd) + target_link_libraries(velox_hdfs Folly::folly ${LIBHDFS3} xsimd arrow) if(${VELOX_BUILD_TESTING}) add_subdirectory(tests) diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp index af18aa32e5e95..d57affff2f8d8 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp @@ -14,7 +14,11 @@ * limitations under the License. */ #include "velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h" +#ifdef VELOX_ENABLE_HDFS3 #include +#elif VELOX_ENABLE_HDFS +#include "velox/connectors/hive/storage_adapters/hdfs/HdfsInternal.h" +#endif #include #include "velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h" #include "velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h" @@ -32,11 +36,13 @@ class HdfsFileSystem::Impl { hdfsBuilderSetNameNodePort(builder, atoi(endpoint.port.data())); hdfsClient_ = hdfsBuilderConnect(builder); hdfsFreeBuilder(builder); +#ifdef VELOX_ENABLE_HDFS3 VELOX_CHECK_NOT_NULL( hdfsClient_, "Unable to connect to HDFS: {}, got error: {}.", endpoint.identity(), hdfsGetLastError()) +#endif } ~Impl() { diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsInternal.cpp b/velox/connectors/hive/storage_adapters/hdfs/HdfsInternal.cpp new file mode 100644 index 0000000000000..bb69d1e40e774 --- /dev/null +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsInternal.cpp @@ -0,0 +1,621 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This shim interface to libhdfs (for runtime shared library loading) has been +// adapted from the SFrame project, released under the ASF-compatible 3-clause +// BSD license +// +// Using this required having the $JAVA_HOME and $HADOOP_HOME environment +// variables set, so that libjvm and libhdfs can be located easily + +// Copyright (C) 2015 Dato, Inc. +// All rights reserved. +// +// This software may be modified and distributed under the terms +// of the BSD license. See the LICENSE file for details. + +#include "velox/connectors/hive/storage_adapters/hdfs/HdfsInternal.h" + +#include +#include +#include +#include // IWYU pragma: keep +#include +#include +#include + +#ifndef _WIN32 +#include +#endif + +#include "arrow/result.h" +#include "arrow/util/io_util.h" +#include "arrow/util/logging.h" + +using arrow::internal::GetEnvVarNative; +using arrow::internal::PlatformFilename; +#ifdef _WIN32 +using internal::WinErrorMessage; +#endif + +namespace facebook::velox::filesystems { + +namespace { + +void* GetLibrarySymbol(LibraryHandle handle, const char* symbol) { + if (handle == NULL) + return NULL; +#ifndef _WIN32 + return dlsym(handle, symbol); +#else + + void* ret = reinterpret_cast(GetProcAddress(handle, symbol)); + if (ret == NULL) { + // logstream(LOG_INFO) << "GetProcAddress error: " + // << get_last_err_str(GetLastError()) << std::endl; + } + return ret; +#endif +} + +#define GET_SYMBOL_REQUIRED(SHIM, SYMBOL_NAME) \ + do { \ + if (!SHIM->SYMBOL_NAME) { \ + *reinterpret_cast(&SHIM->SYMBOL_NAME) = \ + GetLibrarySymbol(SHIM->handle, "" #SYMBOL_NAME); \ + } \ + if (!SHIM->SYMBOL_NAME) \ + return arrow::Status::IOError("Getting symbol " #SYMBOL_NAME "failed"); \ + } while (0) + +#define GET_SYMBOL(SHIM, SYMBOL_NAME) \ + if (!SHIM->SYMBOL_NAME) { \ + *reinterpret_cast(&SHIM->SYMBOL_NAME) = \ + GetLibrarySymbol(SHIM->handle, "" #SYMBOL_NAME); \ + } + +LibraryHandle libjvm_handle = nullptr; + +// Helper functions for dlopens +arrow::Result> get_potential_libjvm_paths(); +arrow::Result> get_potential_libhdfs_paths(); +arrow::Result try_dlopen( + const std::vector& potential_paths, + const char* name); + +arrow::Result> MakeFilenameVector( + const std::vector& names) { + std::vector filenames(names.size()); + for (size_t i = 0; i < names.size(); ++i) { + ARROW_ASSIGN_OR_RAISE(filenames[i], PlatformFilename::FromString(names[i])); + } + return filenames; +} + +void AppendEnvVarFilename( + const char* var_name, + std::vector* filenames) { + auto maybe_env_var = GetEnvVarNative(var_name); + if (maybe_env_var.ok()) { + filenames->emplace_back(std::move(*maybe_env_var)); + } +} + +void AppendEnvVarFilename( + const char* var_name, + const char* suffix, + std::vector* filenames) { + auto maybe_env_var = GetEnvVarNative(var_name); + if (maybe_env_var.ok()) { + auto maybe_env_var_with_suffix = + PlatformFilename(std::move(*maybe_env_var)).Join(suffix); + if (maybe_env_var_with_suffix.ok()) { + filenames->emplace_back(std::move(*maybe_env_var_with_suffix)); + } + } +} + +void InsertEnvVarFilename( + const char* var_name, + std::vector* filenames) { + auto maybe_env_var = GetEnvVarNative(var_name); + if (maybe_env_var.ok()) { + filenames->emplace( + filenames->begin(), PlatformFilename(std::move(*maybe_env_var))); + } +} + +arrow::Result> get_potential_libhdfs_paths() { + std::vector potential_paths; + std::string file_name; + +// OS-specific file name +#ifdef _WIN32 + file_name = "hdfs.dll"; +#elif __APPLE__ + file_name = "libhdfs.dylib"; +#else + file_name = "libhdfs.so"; +#endif + + // Common paths + ARROW_ASSIGN_OR_RAISE(auto search_paths, MakeFilenameVector({"", "."})); + + // Path from environment variable + AppendEnvVarFilename("HADOOP_HOME", "lib/native", &search_paths); + AppendEnvVarFilename("ARROW_LIBHDFS_DIR", &search_paths); + + // All paths with file name + for (const auto& path : search_paths) { + ARROW_ASSIGN_OR_RAISE(auto full_path, path.Join(file_name)); + potential_paths.push_back(std::move(full_path)); + } + + return potential_paths; +} + +arrow::Result> get_potential_libjvm_paths() { + std::vector potential_paths; + + std::vector search_prefixes; + std::vector search_suffixes; + std::string file_name; + +// From heuristics +#ifdef _WIN32 + ARROW_ASSIGN_OR_RAISE(search_prefixes, MakeFilenameVector({""})); + ARROW_ASSIGN_OR_RAISE( + search_suffixes, MakeFilenameVector({"/jre/bin/server", "/bin/server"})); + file_name = "jvm.dll"; +#elif __APPLE__ + ARROW_ASSIGN_OR_RAISE(search_prefixes, MakeFilenameVector({""})); + ARROW_ASSIGN_OR_RAISE( + search_suffixes, MakeFilenameVector({"/jre/lib/server", "/lib/server"})); + file_name = "libjvm.dylib"; + +// SFrame uses /usr/libexec/java_home to find JAVA_HOME; for now we are +// expecting users to set an environment variable +#else +#if defined(__aarch64__) + const std::string prefix_arch{"arm64"}; + const std::string suffix_arch{"aarch64"}; +#else + const std::string prefix_arch{"amd64"}; + const std::string suffix_arch{"amd64"}; +#endif + ARROW_ASSIGN_OR_RAISE( + search_prefixes, + MakeFilenameVector({ + "/usr/lib/jvm/default-java", // ubuntu / debian distros + "/usr/lib/jvm/java", // rhel6 + "/usr/lib/jvm", // centos6 + "/usr/lib64/jvm", // opensuse 13 + "/usr/local/lib/jvm/default-java", // alt ubuntu / debian distros + "/usr/local/lib/jvm/java", // alt rhel6 + "/usr/local/lib/jvm", // alt centos6 + "/usr/local/lib64/jvm", // alt opensuse 13 + "/usr/local/lib/jvm/java-8-openjdk-" + + prefix_arch, // alt ubuntu / debian distros + "/usr/lib/jvm/java-8-openjdk-" + + prefix_arch, // alt ubuntu / debian distros + "/usr/local/lib/jvm/java-7-openjdk-" + + prefix_arch, // alt ubuntu / debian distros + "/usr/lib/jvm/java-7-openjdk-" + + prefix_arch, // alt ubuntu / debian distros + "/usr/local/lib/jvm/java-6-openjdk-" + + prefix_arch, // alt ubuntu / debian distros + "/usr/lib/jvm/java-6-openjdk-" + + prefix_arch, // alt ubuntu / debian distros + "/usr/lib/jvm/java-7-oracle", // alt ubuntu + "/usr/lib/jvm/java-8-oracle", // alt ubuntu + "/usr/lib/jvm/java-6-oracle", // alt ubuntu + "/usr/local/lib/jvm/java-7-oracle", // alt ubuntu + "/usr/local/lib/jvm/java-8-oracle", // alt ubuntu + "/usr/local/lib/jvm/java-6-oracle", // alt ubuntu + "/usr/lib/jvm/default", // alt centos + "/usr/java/latest" // alt centos + })); + ARROW_ASSIGN_OR_RAISE( + search_suffixes, + MakeFilenameVector( + {"", + "/lib/server", + "/jre/lib/" + suffix_arch + "/server", + "/lib/" + suffix_arch + "/server"})); + file_name = "libjvm.so"; +#endif + + // From direct environment variable + InsertEnvVarFilename("JAVA_HOME", &search_prefixes); + + // Generate cross product between search_prefixes, search_suffixes, and + // file_name + for (auto& prefix : search_prefixes) { + for (auto& suffix : search_suffixes) { + ARROW_ASSIGN_OR_RAISE(auto path, prefix.Join(suffix).Join(file_name)); + potential_paths.push_back(std::move(path)); + } + } + + return potential_paths; +} + +#ifndef _WIN32 +arrow::Result try_dlopen( + const std::vector& potential_paths, + const char* name) { + std::string error_message = "unknown error"; + LibraryHandle handle; + + for (const auto& p : potential_paths) { + handle = dlopen(p.ToNative().c_str(), RTLD_NOW | RTLD_LOCAL); + + if (handle != NULL) { + return handle; + } else { + const char* err_msg = dlerror(); + if (err_msg != NULL) { + error_message = err_msg; + } + } + } + + return arrow::Status::IOError("Unable to load ", name, ": ", error_message); +} + +#else +arrow::Result try_dlopen( + const std::vector& potential_paths, + const char* name) { + std::string error_message; + LibraryHandle handle; + + for (const auto& p : potential_paths) { + handle = LoadLibraryW(p.ToNative().c_str()); + if (handle != NULL) { + return handle; + } else { + error_message = WinErrorMessage(GetLastError()); + } + } + + return arrow::Status::IOError("Unable to load ", name, ": ", error_message); +} +#endif // _WIN32 + +LibHdfsShim libhdfs_shim; + +} // namespace + +arrow::Status LibHdfsShim::GetRequiredSymbols() { + GET_SYMBOL_REQUIRED(this, hdfsNewBuilder); + GET_SYMBOL_REQUIRED(this, hdfsBuilderSetNameNode); + GET_SYMBOL_REQUIRED(this, hdfsBuilderSetNameNodePort); + GET_SYMBOL_REQUIRED(this, hdfsBuilderSetUserName); + GET_SYMBOL_REQUIRED(this, hdfsBuilderSetKerbTicketCachePath); + GET_SYMBOL_REQUIRED(this, hdfsBuilderSetForceNewInstance); + GET_SYMBOL_REQUIRED(this, hdfsBuilderConfSetStr); + GET_SYMBOL_REQUIRED(this, hdfsBuilderConnect); + GET_SYMBOL_REQUIRED(this, hdfsCreateDirectory); + GET_SYMBOL_REQUIRED(this, hdfsDelete); + GET_SYMBOL_REQUIRED(this, hdfsDisconnect); + GET_SYMBOL_REQUIRED(this, hdfsExists); + GET_SYMBOL_REQUIRED(this, hdfsFreeFileInfo); + GET_SYMBOL_REQUIRED(this, hdfsGetCapacity); + GET_SYMBOL_REQUIRED(this, hdfsGetUsed); + GET_SYMBOL_REQUIRED(this, hdfsGetPathInfo); + GET_SYMBOL_REQUIRED(this, hdfsListDirectory); + GET_SYMBOL_REQUIRED(this, hdfsChown); + GET_SYMBOL_REQUIRED(this, hdfsChmod); + + // File methods + GET_SYMBOL_REQUIRED(this, hdfsCloseFile); + GET_SYMBOL_REQUIRED(this, hdfsFlush); + GET_SYMBOL_REQUIRED(this, hdfsOpenFile); + GET_SYMBOL_REQUIRED(this, hdfsRead); + GET_SYMBOL_REQUIRED(this, hdfsSeek); + GET_SYMBOL_REQUIRED(this, hdfsTell); + GET_SYMBOL_REQUIRED(this, hdfsWrite); + + return arrow::Status::OK(); +} + +arrow::Status ConnectLibHdfs(LibHdfsShim** driver) { + static std::mutex lock; + std::lock_guard guard(lock); + + LibHdfsShim* shim = &libhdfs_shim; + + static bool shim_attempted = false; + if (!shim_attempted) { + shim_attempted = true; + + shim->Initialize(); + + ARROW_ASSIGN_OR_RAISE( + auto libjvm_potential_paths, get_potential_libjvm_paths()); + ARROW_ASSIGN_OR_RAISE( + libjvm_handle, try_dlopen(libjvm_potential_paths, "libjvm")); + + ARROW_ASSIGN_OR_RAISE( + auto libhdfs_potential_paths, get_potential_libhdfs_paths()); + ARROW_ASSIGN_OR_RAISE( + shim->handle, try_dlopen(libhdfs_potential_paths, "libhdfs")); + } else if (shim->handle == nullptr) { + return arrow::Status::IOError("Prior attempt to load libhdfs failed"); + } + + *driver = shim; + return shim->GetRequiredSymbols(); +} + +/////////////////////////////////////////////////////////////////////////// +// HDFS thin wrapper methods + +hdfsBuilder* LibHdfsShim::NewBuilder(void) { + return this->hdfsNewBuilder(); +} + +void LibHdfsShim::BuilderSetNameNode(hdfsBuilder* bld, const char* nn) { + this->hdfsBuilderSetNameNode(bld, nn); +} + +void LibHdfsShim::BuilderSetNameNodePort(hdfsBuilder* bld, tPort port) { + this->hdfsBuilderSetNameNodePort(bld, port); +} + +void LibHdfsShim::BuilderSetUserName(hdfsBuilder* bld, const char* userName) { + this->hdfsBuilderSetUserName(bld, userName); +} + +void LibHdfsShim::BuilderSetKerbTicketCachePath( + hdfsBuilder* bld, + const char* kerbTicketCachePath) { + this->hdfsBuilderSetKerbTicketCachePath(bld, kerbTicketCachePath); +} + +void LibHdfsShim::BuilderSetForceNewInstance(hdfsBuilder* bld) { + this->hdfsBuilderSetForceNewInstance(bld); +} + +hdfsFS LibHdfsShim::BuilderConnect(hdfsBuilder* bld) { + return this->hdfsBuilderConnect(bld); +} + +int LibHdfsShim::BuilderConfSetStr( + hdfsBuilder* bld, + const char* key, + const char* val) { + return this->hdfsBuilderConfSetStr(bld, key, val); +} + +int LibHdfsShim::Disconnect(hdfsFS fs) { + return this->hdfsDisconnect(fs); +} + +hdfsFile LibHdfsShim::OpenFile( + hdfsFS fs, + const char* path, + int flags, + int bufferSize, + short replication, + tSize blocksize) { // NOLINT + return this->hdfsOpenFile( + fs, path, flags, bufferSize, replication, blocksize); +} + +int LibHdfsShim::CloseFile(hdfsFS fs, hdfsFile file) { + return this->hdfsCloseFile(fs, file); +} + +int LibHdfsShim::Exists(hdfsFS fs, const char* path) { + return this->hdfsExists(fs, path); +} + +int LibHdfsShim::Seek(hdfsFS fs, hdfsFile file, tOffset desiredPos) { + return this->hdfsSeek(fs, file, desiredPos); +} + +tOffset LibHdfsShim::Tell(hdfsFS fs, hdfsFile file) { + return this->hdfsTell(fs, file); +} + +tSize LibHdfsShim::Read(hdfsFS fs, hdfsFile file, void* buffer, tSize length) { + return this->hdfsRead(fs, file, buffer, length); +} + +bool LibHdfsShim::HasPread() { + GET_SYMBOL(this, hdfsPread); + return this->hdfsPread != nullptr; +} + +tSize LibHdfsShim::Pread( + hdfsFS fs, + hdfsFile file, + tOffset position, + void* buffer, + tSize length) { + GET_SYMBOL(this, hdfsPread); + DCHECK(this->hdfsPread); + return this->hdfsPread(fs, file, position, buffer, length); +} + +tSize LibHdfsShim::Write( + hdfsFS fs, + hdfsFile file, + const void* buffer, + tSize length) { + return this->hdfsWrite(fs, file, buffer, length); +} + +int LibHdfsShim::Flush(hdfsFS fs, hdfsFile file) { + return this->hdfsFlush(fs, file); +} + +int LibHdfsShim::Available(hdfsFS fs, hdfsFile file) { + GET_SYMBOL(this, hdfsAvailable); + if (this->hdfsAvailable) + return this->hdfsAvailable(fs, file); + else + return 0; +} + +int LibHdfsShim::Copy( + hdfsFS srcFS, + const char* src, + hdfsFS dstFS, + const char* dst) { + GET_SYMBOL(this, hdfsCopy); + if (this->hdfsCopy) + return this->hdfsCopy(srcFS, src, dstFS, dst); + else + return 0; +} + +int LibHdfsShim::Move( + hdfsFS srcFS, + const char* src, + hdfsFS dstFS, + const char* dst) { + GET_SYMBOL(this, hdfsMove); + if (this->hdfsMove) + return this->hdfsMove(srcFS, src, dstFS, dst); + else + return 0; +} + +int LibHdfsShim::Delete(hdfsFS fs, const char* path, int recursive) { + return this->hdfsDelete(fs, path, recursive); +} + +int LibHdfsShim::Rename(hdfsFS fs, const char* oldPath, const char* newPath) { + GET_SYMBOL(this, hdfsRename); + if (this->hdfsRename) + return this->hdfsRename(fs, oldPath, newPath); + else + return 0; +} + +char* LibHdfsShim::GetWorkingDirectory( + hdfsFS fs, + char* buffer, + size_t bufferSize) { + GET_SYMBOL(this, hdfsGetWorkingDirectory); + if (this->hdfsGetWorkingDirectory) { + return this->hdfsGetWorkingDirectory(fs, buffer, bufferSize); + } else { + return NULL; + } +} + +int LibHdfsShim::SetWorkingDirectory(hdfsFS fs, const char* path) { + GET_SYMBOL(this, hdfsSetWorkingDirectory); + if (this->hdfsSetWorkingDirectory) { + return this->hdfsSetWorkingDirectory(fs, path); + } else { + return 0; + } +} + +int LibHdfsShim::MakeDirectory(hdfsFS fs, const char* path) { + return this->hdfsCreateDirectory(fs, path); +} + +int LibHdfsShim::SetReplication( + hdfsFS fs, + const char* path, + int16_t replication) { + GET_SYMBOL(this, hdfsSetReplication); + if (this->hdfsSetReplication) { + return this->hdfsSetReplication(fs, path, replication); + } else { + return 0; + } +} + +hdfsFileInfo* +LibHdfsShim::ListDirectory(hdfsFS fs, const char* path, int* numEntries) { + return this->hdfsListDirectory(fs, path, numEntries); +} + +hdfsFileInfo* LibHdfsShim::GetPathInfo(hdfsFS fs, const char* path) { + return this->hdfsGetPathInfo(fs, path); +} + +void LibHdfsShim::FreeFileInfo(hdfsFileInfo* hdfsFileInfo, int numEntries) { + this->hdfsFreeFileInfo(hdfsFileInfo, numEntries); +} + +char*** LibHdfsShim::GetHosts( + hdfsFS fs, + const char* path, + tOffset start, + tOffset length) { + GET_SYMBOL(this, hdfsGetHosts); + if (this->hdfsGetHosts) { + return this->hdfsGetHosts(fs, path, start, length); + } else { + return NULL; + } +} + +void LibHdfsShim::FreeHosts(char*** blockHosts) { + GET_SYMBOL(this, hdfsFreeHosts); + if (this->hdfsFreeHosts) { + this->hdfsFreeHosts(blockHosts); + } +} + +tOffset LibHdfsShim::GetDefaultBlockSize(hdfsFS fs) { + GET_SYMBOL(this, hdfsGetDefaultBlockSize); + if (this->hdfsGetDefaultBlockSize) { + return this->hdfsGetDefaultBlockSize(fs); + } else { + return 0; + } +} + +tOffset LibHdfsShim::GetCapacity(hdfsFS fs) { + return this->hdfsGetCapacity(fs); +} + +tOffset LibHdfsShim::GetUsed(hdfsFS fs) { + return this->hdfsGetUsed(fs); +} + +int LibHdfsShim::Chown( + hdfsFS fs, + const char* path, + const char* owner, + const char* group) { + return this->hdfsChown(fs, path, owner, group); +} + +int LibHdfsShim::Chmod(hdfsFS fs, const char* path, short mode) { // NOLINT + return this->hdfsChmod(fs, path, mode); +} + +int LibHdfsShim::Utime(hdfsFS fs, const char* path, tTime mtime, tTime atime) { + GET_SYMBOL(this, hdfsUtime); + if (this->hdfsUtime) { + return this->hdfsUtime(fs, path, mtime, atime); + } else { + return 0; + } +} + +} // namespace facebook::velox::filesystems diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsInternal.h b/velox/connectors/hive/storage_adapters/hdfs/HdfsInternal.h new file mode 100644 index 0000000000000..128192a1523e3 --- /dev/null +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsInternal.h @@ -0,0 +1,247 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include + +#include "arrow/status.h" +#include "arrow/util/visibility.h" +#include "arrow/util/windows_compatibility.h" // IWYU pragma: keep + +using std::size_t; + +struct hdfsBuilder; + +namespace facebook::velox::filesystems { + +#ifndef _WIN32 +typedef void* LibraryHandle; +#else +typedef HINSTANCE LibraryHandle; +#endif + +// NOTE(wesm): cpplint does not like use of short and other imprecise C types +struct LibHdfsShim { + LibraryHandle handle; + + hdfsBuilder* (*hdfsNewBuilder)(void); + void (*hdfsBuilderSetNameNode)(hdfsBuilder* bld, const char* nn); + void (*hdfsBuilderSetNameNodePort)(hdfsBuilder* bld, tPort port); + void (*hdfsBuilderSetUserName)(hdfsBuilder* bld, const char* userName); + void (*hdfsBuilderSetKerbTicketCachePath)( + hdfsBuilder* bld, + const char* kerbTicketCachePath); + void (*hdfsBuilderSetForceNewInstance)(hdfsBuilder* bld); + hdfsFS (*hdfsBuilderConnect)(hdfsBuilder* bld); + int (*hdfsBuilderConfSetStr)( + hdfsBuilder* bld, + const char* key, + const char* val); + + int (*hdfsDisconnect)(hdfsFS fs); + + hdfsFile (*hdfsOpenFile)( + hdfsFS fs, + const char* path, + int flags, + int bufferSize, + short replication, + tSize blocksize); // NOLINT + + int (*hdfsCloseFile)(hdfsFS fs, hdfsFile file); + int (*hdfsExists)(hdfsFS fs, const char* path); + int (*hdfsSeek)(hdfsFS fs, hdfsFile file, tOffset desiredPos); + tOffset (*hdfsTell)(hdfsFS fs, hdfsFile file); + tSize (*hdfsRead)(hdfsFS fs, hdfsFile file, void* buffer, tSize length); + tSize (*hdfsPread)( + hdfsFS fs, + hdfsFile file, + tOffset position, + void* buffer, + tSize length); + tSize ( + *hdfsWrite)(hdfsFS fs, hdfsFile file, const void* buffer, tSize length); + int (*hdfsFlush)(hdfsFS fs, hdfsFile file); + int (*hdfsAvailable)(hdfsFS fs, hdfsFile file); + int (*hdfsCopy)(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst); + int (*hdfsMove)(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst); + int (*hdfsDelete)(hdfsFS fs, const char* path, int recursive); + int (*hdfsRename)(hdfsFS fs, const char* oldPath, const char* newPath); + char* (*hdfsGetWorkingDirectory)(hdfsFS fs, char* buffer, size_t bufferSize); + int (*hdfsSetWorkingDirectory)(hdfsFS fs, const char* path); + int (*hdfsCreateDirectory)(hdfsFS fs, const char* path); + int (*hdfsSetReplication)(hdfsFS fs, const char* path, int16_t replication); + hdfsFileInfo* ( + *hdfsListDirectory)(hdfsFS fs, const char* path, int* numEntries); + hdfsFileInfo* (*hdfsGetPathInfo)(hdfsFS fs, const char* path); + void (*hdfsFreeFileInfo)(hdfsFileInfo* hdfsFileInfo, int numEntries); + char*** (*hdfsGetHosts)( + hdfsFS fs, + const char* path, + tOffset start, + tOffset length); + void (*hdfsFreeHosts)(char*** blockHosts); + tOffset (*hdfsGetDefaultBlockSize)(hdfsFS fs); + tOffset (*hdfsGetCapacity)(hdfsFS fs); + tOffset (*hdfsGetUsed)(hdfsFS fs); + int (*hdfsChown)( + hdfsFS fs, + const char* path, + const char* owner, + const char* group); + int (*hdfsChmod)(hdfsFS fs, const char* path, short mode); // NOLINT + int (*hdfsUtime)(hdfsFS fs, const char* path, tTime mtime, tTime atime); + + void Initialize() { + this->handle = nullptr; + this->hdfsNewBuilder = nullptr; + this->hdfsBuilderSetNameNode = nullptr; + this->hdfsBuilderSetNameNodePort = nullptr; + this->hdfsBuilderSetUserName = nullptr; + this->hdfsBuilderSetKerbTicketCachePath = nullptr; + this->hdfsBuilderSetForceNewInstance = nullptr; + this->hdfsBuilderConfSetStr = nullptr; + this->hdfsBuilderConnect = nullptr; + this->hdfsDisconnect = nullptr; + this->hdfsOpenFile = nullptr; + this->hdfsCloseFile = nullptr; + this->hdfsExists = nullptr; + this->hdfsSeek = nullptr; + this->hdfsTell = nullptr; + this->hdfsRead = nullptr; + this->hdfsPread = nullptr; + this->hdfsWrite = nullptr; + this->hdfsFlush = nullptr; + this->hdfsAvailable = nullptr; + this->hdfsCopy = nullptr; + this->hdfsMove = nullptr; + this->hdfsDelete = nullptr; + this->hdfsRename = nullptr; + this->hdfsGetWorkingDirectory = nullptr; + this->hdfsSetWorkingDirectory = nullptr; + this->hdfsCreateDirectory = nullptr; + this->hdfsSetReplication = nullptr; + this->hdfsListDirectory = nullptr; + this->hdfsGetPathInfo = nullptr; + this->hdfsFreeFileInfo = nullptr; + this->hdfsGetHosts = nullptr; + this->hdfsFreeHosts = nullptr; + this->hdfsGetDefaultBlockSize = nullptr; + this->hdfsGetCapacity = nullptr; + this->hdfsGetUsed = nullptr; + this->hdfsChown = nullptr; + this->hdfsChmod = nullptr; + this->hdfsUtime = nullptr; + } + + hdfsBuilder* NewBuilder(void); + + void BuilderSetNameNode(hdfsBuilder* bld, const char* nn); + + void BuilderSetNameNodePort(hdfsBuilder* bld, tPort port); + + void BuilderSetUserName(hdfsBuilder* bld, const char* userName); + + void BuilderSetKerbTicketCachePath( + hdfsBuilder* bld, + const char* kerbTicketCachePath); + + void BuilderSetForceNewInstance(hdfsBuilder* bld); + + int BuilderConfSetStr(hdfsBuilder* bld, const char* key, const char* val); + + hdfsFS BuilderConnect(hdfsBuilder* bld); + + int Disconnect(hdfsFS fs); + + hdfsFile OpenFile( + hdfsFS fs, + const char* path, + int flags, + int bufferSize, + short replication, + tSize blocksize); // NOLINT + + int CloseFile(hdfsFS fs, hdfsFile file); + + int Exists(hdfsFS fs, const char* path); + + int Seek(hdfsFS fs, hdfsFile file, tOffset desiredPos); + + tOffset Tell(hdfsFS fs, hdfsFile file); + + tSize Read(hdfsFS fs, hdfsFile file, void* buffer, tSize length); + + bool HasPread(); + + tSize + Pread(hdfsFS fs, hdfsFile file, tOffset position, void* buffer, tSize length); + + tSize Write(hdfsFS fs, hdfsFile file, const void* buffer, tSize length); + + int Flush(hdfsFS fs, hdfsFile file); + + int Available(hdfsFS fs, hdfsFile file); + + int Copy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst); + + int Move(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst); + + int Delete(hdfsFS fs, const char* path, int recursive); + + int Rename(hdfsFS fs, const char* oldPath, const char* newPath); + + char* GetWorkingDirectory(hdfsFS fs, char* buffer, size_t bufferSize); + + int SetWorkingDirectory(hdfsFS fs, const char* path); + + int MakeDirectory(hdfsFS fs, const char* path); + + int SetReplication(hdfsFS fs, const char* path, int16_t replication); + + hdfsFileInfo* ListDirectory(hdfsFS fs, const char* path, int* numEntries); + + hdfsFileInfo* GetPathInfo(hdfsFS fs, const char* path); + + void FreeFileInfo(hdfsFileInfo* hdfsFileInfo, int numEntries); + + char*** GetHosts(hdfsFS fs, const char* path, tOffset start, tOffset length); + + void FreeHosts(char*** blockHosts); + + tOffset GetDefaultBlockSize(hdfsFS fs); + tOffset GetCapacity(hdfsFS fs); + + tOffset GetUsed(hdfsFS fs); + + int Chown(hdfsFS fs, const char* path, const char* owner, const char* group); + + int Chmod(hdfsFS fs, const char* path, short mode); // NOLINT + + int Utime(hdfsFS fs, const char* path, tTime mtime, tTime atime); + + arrow::Status GetRequiredSymbols(); +}; + +// TODO(wesm): Remove these exports when we are linking statically +ARROW_EXPORT arrow::Status ConnectLibHdfs(LibHdfsShim** driver); + +} // namespace facebook::velox::filesystems diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp index 9d99420c9d7ed..67b1e574369e1 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp @@ -16,7 +16,11 @@ #include "HdfsReadFile.h" #include +#ifdef VELOX_ENABLE_HDFS3 #include +#elif VELOX_ENABLE_HDFS +#include "velox/connectors/hive/storage_adapters/hdfs/HdfsInternal.h" +#endif namespace facebook::velox { @@ -24,6 +28,7 @@ HdfsReadFile::HdfsReadFile(hdfsFS hdfs, const std::string_view path) : hdfsClient_(hdfs), filePath_(path) { fileInfo_ = hdfsGetPathInfo(hdfsClient_, filePath_.data()); if (fileInfo_ == nullptr) { +#ifdef VELOX_ENABLE_HDFS3 auto error = hdfsGetLastError(); auto errMsg = fmt::format( "Unable to get file path info for file: {}. got error: {}", @@ -33,6 +38,7 @@ HdfsReadFile::HdfsReadFile(hdfsFS hdfs, const std::string_view path) VELOX_FILE_NOT_FOUND_ERROR(errMsg); } VELOX_FAIL(errMsg); +#endif } } diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h index 2bd94bf9c8aa5..ee867f47f4baa 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h @@ -14,7 +14,11 @@ * limitations under the License. */ +#ifdef VELOX_ENABLE_HDFS3 #include +#elif VELOX_ENABLE_HDFS +#include "velox/connectors/hive/storage_adapters/hdfs/HdfsInternal.h" +#endif #include "velox/common/file/File.h" namespace facebook::velox { @@ -33,19 +37,24 @@ struct HdfsFile { void open(hdfsFS client, const std::string& path) { client_ = client; handle_ = hdfsOpenFile(client, path.data(), O_RDONLY, 0, 0, 0); +#ifdef VELOX_ENABLE_HDFS3 VELOX_CHECK_NOT_NULL( handle_, "Unable to open file {}. got error: {}", path, hdfsGetLastError()); +#endif } void seek(uint64_t offset) const { + auto result = hdfsSeek(client_, handle_, offset); +#ifdef VELOX_ENABLE_HDFS3 VELOX_CHECK_EQ( - hdfsSeek(client_, handle_, offset), + result, 0, "Cannot seek through HDFS file, error is : {}", std::string(hdfsGetLastError())); +#endif } int32_t read(char* pos, uint64_t length) const { diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.cpp b/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.cpp index 60f98a88c972b..8b62e2f217f04 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.cpp +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.cpp @@ -15,7 +15,11 @@ */ #include "velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h" +#ifdef VELOX_ENABLE_HDFS3 #include +#elif VELOX_ENABLE_HDFS +#include "velox/connectors/hive/storage_adapters/hdfs/HdfsInternal.h" +#endif namespace facebook::velox { HdfsWriteFile::HdfsWriteFile( @@ -38,11 +42,13 @@ HdfsWriteFile::HdfsWriteFile( bufferSize, replication, blockSize); +#ifdef VELOX_ENABLE_HDFS3 VELOX_CHECK_NOT_NULL( hdfsFile_, "Failed to open hdfs file: {}, with error: {}", filePath_, std::string(hdfsGetLastError())); +#endif } HdfsWriteFile::~HdfsWriteFile() { @@ -53,11 +59,13 @@ HdfsWriteFile::~HdfsWriteFile() { void HdfsWriteFile::close() { int success = hdfsCloseFile(hdfsClient_, hdfsFile_); +#ifdef VELOX_ENABLE_HDFS3 VELOX_CHECK_EQ( success, 0, "Failed to close hdfs file: {}", std::string(hdfsGetLastError())); +#endif hdfsFile_ = nullptr; } @@ -67,8 +75,10 @@ void HdfsWriteFile::flush() { "Cannot flush HDFS file because file handle is null, file path: {}", filePath_); int success = hdfsFlush(hdfsClient_, hdfsFile_); +#ifdef VELOX_ENABLE_HDFS3 VELOX_CHECK_EQ( success, 0, "Hdfs flush error: {}", std::string(hdfsGetLastError())); +#endif } void HdfsWriteFile::append(std::string_view data) { @@ -81,11 +91,13 @@ void HdfsWriteFile::append(std::string_view data) { filePath_); int64_t totalWrittenBytes = hdfsWrite(hdfsClient_, hdfsFile_, std::string(data).c_str(), data.size()); +#ifdef VELOX_ENABLE_HDFS3 VELOX_CHECK_EQ( totalWrittenBytes, data.size(), "Write failure in HDFSWriteFile::append {}", std::string(hdfsGetLastError())); +#endif } uint64_t HdfsWriteFile::size() const { diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h b/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h index 7ed1819cd61f8..b3812c7afb1c9 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h @@ -15,7 +15,12 @@ */ #pragma once +#ifdef VELOX_ENABLE_HDFS3 #include +#elif VELOX_ENABLE_HDFS +#include "velox/connectors/hive/storage_adapters/hdfs/HdfsInternal.h" +#endif + #include "velox/common/file/File.h" namespace facebook::velox { diff --git a/velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.cpp b/velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.cpp index 47734838838f8..4df266000b11c 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.cpp @@ -14,7 +14,7 @@ * limitations under the License. */ -#ifdef VELOX_ENABLE_HDFS3 +#if defined(VELOX_ENABLE_HDFS) || defined(VELOX_ENABLE_HDFS3) #include "folly/concurrency/ConcurrentHashMap.h" #include "velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h" @@ -25,7 +25,7 @@ namespace facebook::velox::filesystems { -#ifdef VELOX_ENABLE_HDFS3 +#if defined(VELOX_ENABLE_HDFS) || defined(VELOX_ENABLE_HDFS3) std::mutex mtx; std::function #include +#ifdef VELOX_ENABLE_HDFS3 #include +#elif VELOX_ENABLE_HDFS +#include "velox/connectors/hive/storage_adapters/hdfs/HdfsInternal.h" +#endif #include #include #include "HdfsMiniCluster.h" diff --git a/velox/connectors/hive/storage_adapters/hdfs/tests/LoadLibHdfsTest.cpp b/velox/connectors/hive/storage_adapters/hdfs/tests/LoadLibHdfsTest.cpp new file mode 100644 index 0000000000000..88215e1113819 --- /dev/null +++ b/velox/connectors/hive/storage_adapters/hdfs/tests/LoadLibHdfsTest.cpp @@ -0,0 +1,64 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include "gtest/gtest.h" +#include "velox/connectors/hive/storage_adapters/hdfs/HdfsInternal.h" + +using namespace facebook::velox; + +class LoadLibHdfsTest : public testing::Test { + void SetUp() override {} +}; + +// Before runing this test need export CLASSPATH=`$HADOOP_HOME/bin/hdfs +// classpath --glob` +TEST_F(LoadLibHdfsTest, loadLibHdfs) { + filesystems::LibHdfsShim* driver_shim; + arrow::Status msg = filesystems::ConnectLibHdfs(&driver_shim); + if (!msg.ok()) { + if (std::getenv("ARROW_HDFS_TEST_LIBHDFS_REQUIRE")) { + FAIL() << "Loading libhdfs failed: " << msg.ToString(); + } else { + std::cout << "Loading libhdfs failed, skipping tests gracefully: " + << msg.ToString() << std::endl; + } + return; + } else { + std::cout << "Load libhdfs success" << std::endl; + } + + // connect to HDFS with the builder object + hdfsBuilder* builder = driver_shim->NewBuilder(); + driver_shim->BuilderSetNameNode(builder, "sr246"); + driver_shim->BuilderSetNameNodePort(builder, 9000); + + driver_shim->BuilderSetForceNewInstance(builder); + auto fs = driver_shim->BuilderConnect(builder); + + auto testPath = "/tmp/hdfstest1"; + if (driver_shim->MakeDirectory(fs, testPath) == 0) { + std::cout << "create hdfs path " << testPath << "\n"; + } + + if (driver_shim->Exists(fs, testPath) == 0) { + std::cout << "the" << testPath << " path is existing" + << "\n"; + } else { + std::cout << "the" << testPath << " path is not existing" + << "\n"; + } +}