Skip to content

Commit

Permalink
Init commit.
Browse files Browse the repository at this point in the history
  • Loading branch information
kaka11chen committed Apr 21, 2024
1 parent 06c9fb6 commit a93a483
Show file tree
Hide file tree
Showing 41 changed files with 1,371 additions and 415 deletions.
1 change: 1 addition & 0 deletions be/src/io/fs/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class FileSystem;

// Only affects remote file writers
struct FileWriterOptions {
bool used_by_s3_committer = false;
bool write_file_cache = false;
bool is_cold_data = false;
bool sync_file_data = true; // Whether flush data into storage system
Expand Down
132 changes: 77 additions & 55 deletions be/src/io/fs/s3_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ S3FileWriter::S3FileWriter(std::shared_ptr<Aws::S3::S3Client> client, std::strin
_client(std::move(client)),
_expiration_time(opts ? opts->file_cache_expiration : 0),
_is_cold_data(opts ? opts->is_cold_data : true),
_write_file_cache(opts ? opts->write_file_cache : false) {
_write_file_cache(opts ? opts->write_file_cache : false),
_used_by_s3_committer(opts ? opts->used_by_s3_committer : false) {
s3_file_writer_total << 1;
s3_file_being_written << 1;

Expand Down Expand Up @@ -196,10 +197,7 @@ Status S3FileWriter::close() {
Defer defer {[this] { _closed = true; }};

if (_upload_id.empty() && _pending_buf) {
// It might be one file less than 5MB, and call close without finalize
auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
DCHECK(buf != nullptr);
buf->set_upload_to_remote([this](UploadFileBuffer& b) { _put_object(b); });
RETURN_IF_ERROR(_set_upload_to_remote_less_than_buffer_size());
}

if (_bytes_appended == 0) {
Expand All @@ -226,6 +224,13 @@ Status S3FileWriter::close() {
RETURN_IF_ERROR(builder.build(&_pending_buf));
auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
DCHECK(buf != nullptr);
if (_used_by_s3_committer) {
buf->set_upload_to_remote([part_num = _cur_part_num, this](UploadFileBuffer& buf) {
_upload_one_part(part_num, buf);
});
DCHECK(_cur_part_num == 1);
RETURN_IF_ERROR(_create_multi_upload_request());
}
}

if (_pending_buf != nullptr) {
Expand Down Expand Up @@ -400,56 +405,61 @@ Status S3FileWriter::_complete() {
_wait_until_finish("PutObject");
return _st;
}
CompleteMultipartUploadRequest complete_request;
complete_request.WithBucket(_bucket).WithKey(_key).WithUploadId(_upload_id);

// Wait multipart load and finish.
_wait_until_finish("Complete");
DBUG_EXECUTE_IF("s3_file_writer::_complete:1", { _cur_part_num++; });
if (_failed || _completed_parts.size() != _cur_part_num) {
_st = Status::InternalError(
"error status {}, complete parts {}, cur part num {}, whole parts {}", _st,
_completed_parts.size(), _cur_part_num, _dump_completed_part());
LOG(WARNING) << _st;
return _st;
}
// make sure _completed_parts are ascending order
std::sort(_completed_parts.begin(), _completed_parts.end(),
[](auto& p1, auto& p2) { return p1->GetPartNumber() < p2->GetPartNumber(); });
DBUG_EXECUTE_IF("s3_file_writer::_complete:2",
{ _completed_parts.back()->SetPartNumber(10 * _completed_parts.size()); });
CompletedMultipartUpload completed_upload;
for (size_t i = 0; i < _completed_parts.size(); i++) {
if (_completed_parts[i]->GetPartNumber() != i + 1) [[unlikely]] {
auto st = Status::InternalError(
"error status {}, part num not continous, expected num {}, actual num {}, "
"whole parts {}",
_st, i + 1, _completed_parts[i]->GetPartNumber(), _dump_completed_part());
LOG(WARNING) << st;
_st = st;
return st;
if (!_used_by_s3_committer) { // S3 committer will complete multipart upload file on FE side.
CompleteMultipartUploadRequest complete_request;
complete_request.WithBucket(_bucket).WithKey(_key).WithUploadId(_upload_id);

if (_failed || _completed_parts.size() != _cur_part_num) {
_st = Status::InternalError(
"error status {}, complete parts {}, cur part num {}, whole parts {}", _st,
_completed_parts.size(), _cur_part_num, _dump_completed_part());
LOG(WARNING) << _st;
return _st;
}
// make sure _completed_parts are ascending order
std::sort(_completed_parts.begin(), _completed_parts.end(),
[](auto& p1, auto& p2) { return p1->GetPartNumber() < p2->GetPartNumber(); });
DBUG_EXECUTE_IF("s3_file_writer::_complete:2",
{ _completed_parts.back()->SetPartNumber(10 * _completed_parts.size()); });
CompletedMultipartUpload completed_upload;
for (size_t i = 0; i < _completed_parts.size(); i++) {
if (_completed_parts[i]->GetPartNumber() != i + 1) [[unlikely]] {
auto st = Status::InternalError(
"error status {}, part num not continous, expected num {}, actual num {}, "
"whole parts {}",
_st, i + 1, _completed_parts[i]->GetPartNumber(), _dump_completed_part());
LOG(WARNING) << st;
_st = st;
return st;
}
completed_upload.AddParts(*_completed_parts[i]);
}
completed_upload.AddParts(*_completed_parts[i]);
}

complete_request.WithMultipartUpload(completed_upload);
complete_request.WithMultipartUpload(completed_upload);

DBUG_EXECUTE_IF("s3_file_writer::_complete:3", {
auto s = Status::IOError(
"failed to create complete multi part upload (bucket={}, key={}): injected error",
_bucket, _path.native());
LOG_WARNING(s.to_string());
return s;
});
SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency);
auto complete_outcome = _client->CompleteMultipartUpload(complete_request);
DBUG_EXECUTE_IF("s3_file_writer::_complete:3", {
auto s = Status::IOError(
"failed to create complete multi part upload (bucket={}, key={}): injected "
"error",
_bucket, _path.native());
LOG_WARNING(s.to_string());
return s;
});
SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency);
auto complete_outcome = _client->CompleteMultipartUpload(complete_request);

if (!complete_outcome.IsSuccess()) {
_st = s3fs_error(
complete_outcome.GetError(),
fmt::format("failed to complete multi part upload {}, upload_id={}, whole parts={}",
if (!complete_outcome.IsSuccess()) {
_st = s3fs_error(
complete_outcome.GetError(),
fmt::format(
"failed to complete multi part upload {}, upload_id={}, whole parts={}",
_path.native(), _upload_id, _dump_completed_part()));
LOG(WARNING) << _st;
return _st;
LOG(WARNING) << _st;
return _st;
}
}
s3_file_created_total << 1;
return Status::OK();
Expand All @@ -465,13 +475,7 @@ Status S3FileWriter::finalize() {
// submit pending buf if it's not nullptr
// it's the last buf, we can submit it right now
if (_pending_buf != nullptr) {
// if we only need to upload one file less than 5MB, we can just
// call PutObject to reduce the network IO
if (_upload_id.empty()) {
auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
DCHECK(buf != nullptr);
buf->set_upload_to_remote([this](UploadFileBuffer& b) { _put_object(b); });
}
RETURN_IF_ERROR(_set_upload_to_remote_less_than_buffer_size());
_countdown_event.add_count();
RETURN_IF_ERROR(_pending_buf->submit(std::move(_pending_buf)));
_pending_buf = nullptr;
Expand All @@ -480,6 +484,24 @@ Status S3FileWriter::finalize() {
return _st;
}

Status S3FileWriter::_set_upload_to_remote_less_than_buffer_size() {
auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
DCHECK(buf != nullptr);
if (_used_by_s3_committer) {
// If used_by_s3_committer, we always use multi-parts uploading.
buf->set_upload_to_remote([part_num = _cur_part_num, this](UploadFileBuffer& buf) {
_upload_one_part(part_num, buf);
});
DCHECK(_cur_part_num == 1);
RETURN_IF_ERROR(_create_multi_upload_request());
} else {
// if we only need to upload one file less than 5MB, we can just
// call PutObject to reduce the network IO
buf->set_upload_to_remote([this](UploadFileBuffer& b) { _put_object(b); });
}
return Status::OK();
}

void S3FileWriter::_put_object(UploadFileBuffer& buf) {
DCHECK(!closed());
Aws::S3::Model::PutObjectRequest request;
Expand Down
11 changes: 11 additions & 0 deletions be/src/io/fs/s3_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,21 @@ class S3FileWriter final : public FileWriter {
size_t bytes_appended() const override { return _bytes_appended; }
bool closed() const override { return _closed; }

const std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>>& completed_parts() const {
return _completed_parts;
}

const std::string& key() const { return _key; }
const std::string& bucket() const { return _bucket; }
const std::string& upload_id() const { return _upload_id; }

private:
Status _abort();
[[nodiscard]] std::string _dump_completed_part() const;
void _wait_until_finish(std::string_view task_name);
Status _complete();
Status _create_multi_upload_request();
Status _set_upload_to_remote_less_than_buffer_size();
void _put_object(UploadFileBuffer& buf);
void _upload_one_part(int64_t part_num, UploadFileBuffer& buf);

Expand Down Expand Up @@ -95,6 +104,8 @@ class S3FileWriter final : public FileWriter {
uint64_t _expiration_time;
bool _is_cold_data;
bool _write_file_cache;

bool _used_by_s3_committer;
};

} // namespace io
Expand Down
1 change: 1 addition & 0 deletions be/src/util/s3_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ struct S3Conf {
S3ClientConf client_conf;

bool sse_enabled = false;
bool used_by_s3_committer = false;
cloud::ObjectStoreInfoPB::Provider provider;

std::string to_string() const {
Expand Down
22 changes: 20 additions & 2 deletions be/src/vec/sink/writer/vhive_partition_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "vhive_partition_writer.h"

#include "io/file_factory.h"
#include "io/fs/s3_file_writer.h"
#include "runtime/runtime_state.h"
#include "vec/columns/column_map.h"
#include "vec/core/materialize_block.h"
Expand Down Expand Up @@ -55,7 +56,8 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile)
io::FileDescription file_description = {
.path = fmt::format("{}/{}", _write_info.write_path, _get_target_file_name())};
_fs = DORIS_TRY(FileFactory::create_fs(fs_properties, file_description));
RETURN_IF_ERROR(_fs->create_file(file_description.path, &_file_writer));
io::FileWriterOptions file_writer_options = {.used_by_s3_committer = true};
RETURN_IF_ERROR(_fs->create_file(file_description.path, &_file_writer, &file_writer_options));

std::vector<std::string> column_names;
column_names.reserve(_columns.size());
Expand Down Expand Up @@ -189,12 +191,28 @@ THivePartitionUpdate VHivePartitionWriter::_build_partition_update() {
hive_partition_update.__set_name(_partition_name);
hive_partition_update.__set_update_mode(_update_mode);
THiveLocationParams location;
location.__set_write_path(_write_info.write_path);
location.__set_write_path(_write_info.original_write_path);
location.__set_target_path(_write_info.target_path);
hive_partition_update.__set_location(location);
hive_partition_update.__set_file_names({_get_target_file_name()});
hive_partition_update.__set_row_count(_row_count);
hive_partition_update.__set_file_size(_input_size_in_bytes);

if (_write_info.file_type == TFileType::FILE_S3) {
doris::io::S3FileWriter* s3_mpu_file_writer =
dynamic_cast<doris::io::S3FileWriter*>(_file_writer.get());
TS3MPUPendingUpload s3_mpu_pending_upload;
s3_mpu_pending_upload.__set_bucket(s3_mpu_file_writer->bucket());
s3_mpu_pending_upload.__set_key(s3_mpu_file_writer->key());
s3_mpu_pending_upload.__set_upload_id(s3_mpu_file_writer->upload_id());

std::map<int, std::string> etags;
for (auto& completed_part : s3_mpu_file_writer->completed_parts()) {
etags.insert({completed_part->GetPartNumber(), completed_part->GetETag()});
}
s3_mpu_pending_upload.__set_etags(etags);
hive_partition_update.__set_s3_mpu_pending_uploads({s3_mpu_pending_upload});
}
return hive_partition_update;
}

Expand Down
1 change: 1 addition & 0 deletions be/src/vec/sink/writer/vhive_partition_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class VHivePartitionWriter {
public:
struct WriteInfo {
std::string write_path;
std::string original_write_path;
std::string target_path;
TFileType::type file_type;
};
Expand Down
31 changes: 20 additions & 11 deletions be/src/vec/sink/writer/vhive_table_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,26 +256,30 @@ std::shared_ptr<VHivePartitionWriter> VHiveTableWriter::_create_partition_writer
if (existing_table == false) { // new table
update_mode = TUpdateMode::NEW;
if (_partition_columns_input_index.empty()) { // new unpartitioned table
write_info = {write_location.write_path, write_location.target_path,
write_location.file_type};
write_info = {write_location.write_path, write_location.original_write_path,
write_location.target_path, write_location.file_type};
} else { // a new partition in a new partitioned table
auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name);
auto original_write_path =
fmt::format("{}/{}", write_location.original_write_path, partition_name);
auto target_path = fmt::format("{}/{}", write_location.target_path, partition_name);
write_info = {std::move(write_path), std::move(target_path),
write_location.file_type};
write_info = {std::move(write_path), std::move(original_write_path),
std::move(target_path), write_location.file_type};
}
} else { // a new partition in an existing partitioned table, or an existing unpartitioned table
if (_partition_columns_input_index.empty()) { // an existing unpartitioned table
update_mode =
!hive_table_sink.overwrite ? TUpdateMode::APPEND : TUpdateMode::OVERWRITE;
write_info = {write_location.write_path, write_location.target_path,
write_location.file_type};
write_info = {write_location.write_path, write_location.original_write_path,
write_location.target_path, write_location.file_type};
} else { // a new partition in an existing partitioned table
update_mode = TUpdateMode::NEW;
auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name);
auto original_write_path =
fmt::format("{}/{}", write_location.original_write_path, partition_name);
auto target_path = fmt::format("{}/{}", write_location.target_path, partition_name);
write_info = {std::move(write_path), std::move(target_path),
write_location.file_type};
write_info = {std::move(write_path), std::move(original_write_path),
std::move(target_path), write_location.file_type};
}
// need to get schema from existing table ?
}
Expand All @@ -285,16 +289,21 @@ std::shared_ptr<VHivePartitionWriter> VHiveTableWriter::_create_partition_writer
if (!hive_table_sink.overwrite) {
update_mode = TUpdateMode::APPEND;
auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name);
auto original_write_path =
fmt::format("{}/{}", write_location.original_write_path, partition_name);
auto target_path = fmt::format("{}", existing_partition->location.target_path);
write_info = {std::move(write_path), std::move(target_path),
existing_partition->location.file_type};
write_info = {std::move(write_path), std::move(original_write_path),
std::move(target_path), existing_partition->location.file_type};
file_format_type = existing_partition->file_format;
write_compress_type = hive_table_sink.compression_type;
} else {
update_mode = TUpdateMode::OVERWRITE;
auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name);
auto original_write_path =
fmt::format("{}/{}", write_location.original_write_path, partition_name);
auto target_path = fmt::format("{}/{}", write_location.target_path, partition_name);
write_info = {std::move(write_path), std::move(target_path), write_location.file_type};
write_info = {std::move(write_path), std::move(original_write_path),
std::move(target_path), write_location.file_type};
file_format_type = hive_table_sink.file_format;
write_compress_type = hive_table_sink.compression_type;
// need to get schema from existing table ?
Expand Down
Loading

0 comments on commit a93a483

Please sign in to comment.