Skip to content

Commit

Permalink
Merge branch 'branch-2.0' into branch-2.0_join
Browse files Browse the repository at this point in the history
  • Loading branch information
jackwener authored Mar 20, 2024
2 parents 356fdfb + be1948a commit cf6a72d
Show file tree
Hide file tree
Showing 41 changed files with 799 additions and 115 deletions.
10 changes: 10 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1136,6 +1136,16 @@ DEFINE_mInt32(query_statistics_reserve_timeout_ms, "30000");

DEFINE_mBool(check_segment_when_build_rowset_meta, "false");

DEFINE_mInt32(max_s3_client_retry, "10");

DEFINE_String(trino_connector_plugin_dir, "${DORIS_HOME}/connectors");

// ca_cert_file is in this path by default, Normally no modification is required
// ca cert default path is different from different OS
DEFINE_mString(ca_cert_file_paths,
"/etc/pki/tls/certs/ca-bundle.crt;/etc/ssl/certs/ca-certificates.crt;"
"/etc/ssl/ca-bundle.pem");

// clang-format off
#ifdef BE_TEST
// test s3
Expand Down
9 changes: 9 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1189,6 +1189,15 @@ DECLARE_mInt32(query_statistics_reserve_timeout_ms);

DECLARE_mBool(check_segment_when_build_rowset_meta);

// max s3 client retry times
DECLARE_mInt32(max_s3_client_retry);

// the directory for storing the trino-connector plugins.
DECLARE_String(trino_connector_plugin_dir);

// the file paths(one or more) of CA cert, splite using ";" aws s3 lib use it to init s3client
DECLARE_mString(ca_cert_file_paths);

#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
Expand Down
1 change: 1 addition & 0 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ E(COPY_FILE_ERROR, -121);
E(FILE_ALREADY_EXIST, -122);
E(BAD_CAST, -123);
E(ARITHMETIC_OVERFLOW_ERRROR, -124);
E(PERMISSION_DENIED, -125);
E(CALL_SEQUENCE_ERROR, -202);
E(BUFFER_OVERFLOW, -204);
E(CONFIG_ERROR, -205);
Expand Down
36 changes: 36 additions & 0 deletions be/src/io/fs/err_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
#include "io/fs/hdfs.h"

namespace doris {
using namespace ErrorCode;

namespace io {

std::string errno_to_str() {
Expand Down Expand Up @@ -73,5 +75,39 @@ std::string glob_err_to_str(int code) {
return fmt::format("({}), {}", code, msg);
}

Status localfs_error(const std::error_code& ec, std::string_view msg) {
if (ec == std::errc::io_error) {
return Status::Error<IO_ERROR, false>(msg);
} else if (ec == std::errc::no_such_file_or_directory) {
return Status::Error<NOT_FOUND, false>(msg);
} else if (ec == std::errc::file_exists) {
return Status::Error<ALREADY_EXIST, false>(msg);
} else if (ec == std::errc::no_space_on_device) {
return Status::Error<DISK_REACH_CAPACITY_LIMIT, false>(msg);
} else if (ec == std::errc::permission_denied) {
return Status::Error<PERMISSION_DENIED, false>(msg);
} else {
return Status::Error<ErrorCode::INTERNAL_ERROR, false>("{}: {}", msg, ec.message());
}
}

Status localfs_error(int posix_errno, std::string_view msg) {
switch (posix_errno) {
case EIO:
return Status::Error<IO_ERROR, false>(msg);
case ENOENT:
return Status::Error<NOT_FOUND, false>(msg);
case EEXIST:
return Status::Error<ALREADY_EXIST, false>(msg);
case ENOSPC:
return Status::Error<DISK_REACH_CAPACITY_LIMIT, false>(msg);
case EACCES:
return Status::Error<PERMISSION_DENIED, false>(msg);
default:
return Status::Error<ErrorCode::INTERNAL_ERROR, false>("{}: {}", msg,
std::strerror(posix_errno));
}
}

} // namespace io
} // namespace doris
5 changes: 5 additions & 0 deletions be/src/io/fs/err_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include <string>
#include <system_error>

#include "common/status.h"

namespace doris {
namespace io {

Expand All @@ -28,5 +30,8 @@ std::string errcode_to_str(const std::error_code& ec);
std::string hdfs_error();
std::string glob_err_to_str(int code);

Status localfs_error(const std::error_code& ec, std::string_view msg);
Status localfs_error(int posix_errno, std::string_view msg);

} // namespace io
} // namespace doris
13 changes: 6 additions & 7 deletions be/src/io/fs/local_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,7 @@ Status LocalFileReader::close() {
}
#endif
if (-1 == res) {
std::string err = errno_to_str();
LOG(WARNING) << fmt::format("failed to close {}: {}", _path.native(), err);
return Status::IOError("failed to close {}: {}", _path.native(), err);
return localfs_error(errno, fmt::format("failed to close {}", _path.native()));
}
_fd = -1;
}
Expand All @@ -81,8 +79,9 @@ Status LocalFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_
const IOContext* /*io_ctx*/) {
DCHECK(!closed());
if (offset > _file_size) {
return Status::IOError("offset exceeds file size(offset: {}, file size: {}, path: {})",
offset, _file_size, _path.native());
return Status::InternalError(
"offset exceeds file size(offset: {}, file size: {}, path: {})", offset, _file_size,
_path.native());
}
size_t bytes_req = result.size;
char* to = result.data;
Expand All @@ -92,10 +91,10 @@ Status LocalFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_
while (bytes_req != 0) {
auto res = ::pread(_fd, to, bytes_req, offset);
if (UNLIKELY(-1 == res && errno != EINTR)) {
return Status::IOError("cannot read from {}: {}", _path.native(), std::strerror(errno));
return localfs_error(errno, fmt::format("failed to read {}", _path.native()));
}
if (UNLIKELY(res == 0)) {
return Status::IOError("cannot read from {}: unexpected EOF", _path.native());
return Status::InternalError("cannot read from {}: unexpected EOF", _path.native());
}
if (res > 0) {
to += res;
Expand Down
75 changes: 32 additions & 43 deletions be/src/io/fs/local_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ Status LocalFileSystem::create_file_impl(const Path& file, FileWriterPtr* writer
const FileWriterOptions* opts) {
int fd = ::open(file.c_str(), O_TRUNC | O_WRONLY | O_CREAT | O_CLOEXEC, 0666);
if (-1 == fd) {
return Status::IOError("failed to open {}: {}", file.native(), errno_to_str());
return localfs_error(errno, fmt::format("failed to create file {}", file.native()));
}
bool sync_data = opts != nullptr ? opts->sync_file_data : true;
*writer = std::make_unique<LocalFileWriter>(
Expand All @@ -81,7 +81,7 @@ Status LocalFileSystem::open_file_impl(const FileDescription& file_desc, const P
int fd = -1;
RETRY_ON_EINTR(fd, open(abs_path.c_str(), O_RDONLY));
if (fd < 0) {
return Status::IOError("failed to open {}: {}", abs_path.native(), errno_to_str());
return localfs_error(errno, fmt::format("failed to open {}", abs_path.native()));
}
*reader = std::make_shared<LocalFileReader>(
std::move(abs_path), fsize, fd,
Expand All @@ -94,13 +94,13 @@ Status LocalFileSystem::create_directory_impl(const Path& dir, bool failed_if_ex
bool exists = true;
RETURN_IF_ERROR(exists_impl(dir, &exists));
if (exists) {
return Status::IOError("failed to create {}, already exists", dir.native());
return Status::AlreadyExist("failed to create {}, already exists", dir.native());
}
}
std::error_code ec;
std::filesystem::create_directories(dir, ec);
if (ec) {
return Status::IOError("failed to create {}: {}", dir.native(), errcode_to_str(ec));
return localfs_error(ec, fmt::format("failed to create {}", dir.native()));
}
return Status::OK();
}
Expand All @@ -112,12 +112,12 @@ Status LocalFileSystem::delete_file_impl(const Path& file) {
return Status::OK();
}
if (!std::filesystem::is_regular_file(file)) {
return Status::IOError("failed to delete {}, not a file", file.native());
return Status::InternalError("failed to delete {}, not a file", file.native());
}
std::error_code ec;
std::filesystem::remove(file, ec);
if (ec) {
return Status::IOError("failed to delete {}: {}", file.native(), errcode_to_str(ec));
return localfs_error(ec, fmt::format("failed to delete {}", file.native()));
}
return Status::OK();
}
Expand All @@ -129,12 +129,12 @@ Status LocalFileSystem::delete_directory_impl(const Path& dir) {
return Status::OK();
}
if (!std::filesystem::is_directory(dir)) {
return Status::IOError("failed to delete {}, not a directory", dir.native());
return Status::InternalError("failed to delete {}, not a dir", dir.native());
}
std::error_code ec;
std::filesystem::remove_all(dir, ec);
if (ec) {
return Status::IOError("failed to delete {}: {}", dir.native(), errcode_to_str(ec));
return localfs_error(ec, fmt::format("failed to delete {}", dir.native()));
}
return Status::OK();
}
Expand Down Expand Up @@ -174,7 +174,7 @@ Status LocalFileSystem::file_size_impl(const Path& file, int64_t* file_size) con
std::error_code ec;
*file_size = std::filesystem::file_size(file, ec);
if (ec) {
return Status::IOError("failed to get file size {}: {}", file.native(), errcode_to_str(ec));
return localfs_error(ec, fmt::format("failed to check exists {}", file.native()));
}
return Status::OK();
}
Expand Down Expand Up @@ -208,10 +208,10 @@ Status LocalFileSystem::list_impl(const Path& dir, bool only_file, std::vector<F
} catch (const std::filesystem::filesystem_error& e) {
// although `directory_iterator(dir, ec)` does not throw an exception,
// it may throw an exception during iterator++, so we need to catch the exception here
return Status::IOError("failed to list {}: {}", dir.native(), e.what());
return localfs_error(e.code(), fmt::format("failed to list {}", dir.native()));
}
if (ec) {
return Status::IOError("failed to list {}: {}", dir.native(), errcode_to_str(ec));
return localfs_error(ec, fmt::format("failed to list {}", dir.native()));
}
return Status::OK();
}
Expand All @@ -220,8 +220,8 @@ Status LocalFileSystem::rename_impl(const Path& orig_name, const Path& new_name)
std::error_code ec;
std::filesystem::rename(orig_name, new_name, ec);
if (ec) {
return Status::IOError("failed to rename {} to {}: {}", orig_name.native(),
new_name.native(), errcode_to_str(ec));
return localfs_error(ec, fmt::format("failed to rename {} to {}", orig_name.native(),
new_name.native()));
}
return Status::OK();
}
Expand All @@ -238,8 +238,8 @@ Status LocalFileSystem::link_file(const Path& src, const Path& dest) {

Status LocalFileSystem::link_file_impl(const Path& src, const Path& dest) {
if (::link(src.c_str(), dest.c_str()) != 0) {
return Status::IOError("failed to create hard link from {} to {}: {}", src.native(),
dest.native(), errno_to_str());
return localfs_error(errno, fmt::format("failed to create hard link from {} to {}",
src.native(), dest.native()));
}
return Status::OK();
}
Expand All @@ -248,8 +248,7 @@ Status LocalFileSystem::canonicalize(const Path& path, std::string* real_path) {
std::error_code ec;
Path res = std::filesystem::canonical(path, ec);
if (ec) {
return Status::IOError("failed to canonicalize path {}: {}", path.native(),
errcode_to_str(ec));
return localfs_error(ec, fmt::format("failed to canonicalize {}", path.native()));
}
*real_path = res.string();
return Status::OK();
Expand All @@ -260,10 +259,7 @@ Status LocalFileSystem::is_directory(const Path& path, bool* res) {
std::error_code ec;
*res = std::filesystem::is_directory(tmp_path, ec);
if (ec) {
LOG(WARNING) << fmt::format("failed to check is dir {}: {}", tmp_path.native(),
errcode_to_str(ec));
return Status::IOError("failed to check is dir {}: {}", tmp_path.native(),
errcode_to_str(ec));
return localfs_error(ec, fmt::format("failed to check is dir {}", tmp_path.native()));
}
return Status::OK();
}
Expand All @@ -276,15 +272,15 @@ Status LocalFileSystem::md5sum(const Path& file, std::string* md5sum) {
Status LocalFileSystem::md5sum_impl(const Path& file, std::string* md5sum) {
int fd = open(file.c_str(), O_RDONLY);
if (fd < 0) {
return Status::IOError("failed to open file for md5sum {}: {}", file.native(),
errno_to_str());
return localfs_error(errno,
fmt::format("failed to open file for md5sum {}", file.native()));
}

struct stat statbuf;
if (fstat(fd, &statbuf) < 0) {
std::string err = errno_to_str();
int err = errno;
close(fd);
return Status::InternalError("failed to stat file {}: {}", file.native(), err);
return localfs_error(err, fmt::format("failed to stat file {}", file.native()));
}
size_t file_len = statbuf.st_size;
CONSUME_THREAD_MEM_TRACKER(file_len);
Expand Down Expand Up @@ -330,16 +326,9 @@ Status LocalFileSystem::mtime(const Path& file, time_t* m_time) {
}

Status LocalFileSystem::mtime_impl(const Path& file, time_t* m_time) {
int fd = open(file.c_str(), O_RDONLY);
if (fd < 0) {
return Status::IOError("failed to get mtime for file {}: {}", file.native(),
errno_to_str());
}

Defer defer {[&]() { close(fd); }};
struct stat statbuf;
if (fstat(fd, &statbuf) < 0) {
return Status::IOError("failed to stat file {}: {}", file.native(), errno_to_str());
if (stat(file.c_str(), &statbuf) < 0) {
return localfs_error(errno, fmt::format("failed to stat file {}", file.native()));
}
*m_time = statbuf.st_mtime;
return Status::OK();
Expand All @@ -364,8 +353,8 @@ Status LocalFileSystem::get_space_info_impl(const Path& path, size_t* capacity,
std::error_code ec;
std::filesystem::space_info info = std::filesystem::space(path, ec);
if (ec) {
return Status::IOError("failed to get available space for path {}: {}", path.native(),
errcode_to_str(ec));
return localfs_error(
ec, fmt::format("failed to get available space for path {}", path.native()));
}
*capacity = info.capacity;
*available = info.available;
Expand All @@ -382,8 +371,8 @@ Status LocalFileSystem::copy_path_impl(const Path& src, const Path& dest) {
std::error_code ec;
std::filesystem::copy(src, dest, std::filesystem::copy_options::recursive, ec);
if (ec) {
return Status::IOError("failed to copy from {} to {}: {}", src.native(), dest.native(),
errcode_to_str(ec));
return localfs_error(
ec, fmt::format("failed to copy from {} to {}", src.native(), dest.native()));
}
return Status::OK();
}
Expand Down Expand Up @@ -429,8 +418,9 @@ Status LocalFileSystem::read_file_to_string(const Path& file, std::string* conte
size_t bytes_read = 0;
RETURN_IF_ERROR(file_reader->read_at(0, {*content}, &bytes_read));
if (bytes_read != file_size) {
return Status::IOError("failed to read file {} to string. bytes read: {}, file size: {}",
file.native(), bytes_read, file_size);
return Status::InternalError(
"failed to read file {} to string. bytes read: {}, file size: {}", file.native(),
bytes_read, file_size);
}
return file_reader->close();
}
Expand Down Expand Up @@ -479,7 +469,7 @@ Status LocalFileSystem::_glob(const std::string& pattern, std::vector<std::strin
int rc = glob(pattern.c_str(), GLOB_TILDE, NULL, &glob_result);
if (rc != 0) {
globfree(&glob_result);
return Status::IOError("failed to glob {}: {}", pattern, glob_err_to_str(rc));
return Status::InternalError("failed to glob {}: {}", pattern, glob_err_to_str(rc));
}

for (size_t i = 0; i < glob_result.gl_pathc; ++i) {
Expand All @@ -499,8 +489,7 @@ Status LocalFileSystem::permission_impl(const Path& file, std::filesystem::perms
std::error_code ec;
std::filesystem::permissions(file, prms, ec);
if (ec) {
return Status::IOError("failed to change file permission {}: {}", file.native(),
errcode_to_str(ec));
return localfs_error(ec, fmt::format("failed to change file permission {}", file.native()));
}
return Status::OK();
}
Expand Down
Loading

0 comments on commit cf6a72d

Please sign in to comment.