diff --git a/be/src/io/fs/file_writer.h b/be/src/io/fs/file_writer.h index 4feab99c09ff249..de049b9fd54f28d 100644 --- a/be/src/io/fs/file_writer.h +++ b/be/src/io/fs/file_writer.h @@ -31,6 +31,7 @@ class FileSystem; // Only affects remote file writers struct FileWriterOptions { + bool used_by_s3_committer = false; bool write_file_cache = false; bool is_cold_data = false; bool sync_file_data = true; // Whether flush data into storage system diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp index 388aed1ea34eeca..fa7b7fdf1aaa925 100644 --- a/be/src/io/fs/s3_file_writer.cpp +++ b/be/src/io/fs/s3_file_writer.cpp @@ -90,7 +90,8 @@ S3FileWriter::S3FileWriter(std::shared_ptr client, std::strin _client(std::move(client)), _expiration_time(opts ? opts->file_cache_expiration : 0), _is_cold_data(opts ? opts->is_cold_data : true), - _write_file_cache(opts ? opts->write_file_cache : false) { + _write_file_cache(opts ? opts->write_file_cache : false), + _used_by_s3_committer(opts ? opts->used_by_s3_committer : false) { s3_file_writer_total << 1; s3_file_being_written << 1; @@ -196,10 +197,7 @@ Status S3FileWriter::close() { Defer defer {[this] { _closed = true; }}; if (_upload_id.empty() && _pending_buf) { - // It might be one file less than 5MB, and call close without finalize - auto* buf = dynamic_cast(_pending_buf.get()); - DCHECK(buf != nullptr); - buf->set_upload_to_remote([this](UploadFileBuffer& b) { _put_object(b); }); + RETURN_IF_ERROR(_set_upload_to_remote_less_than_buffer_size()); } if (_bytes_appended == 0) { @@ -226,6 +224,13 @@ Status S3FileWriter::close() { RETURN_IF_ERROR(builder.build(&_pending_buf)); auto* buf = dynamic_cast(_pending_buf.get()); DCHECK(buf != nullptr); + if (_used_by_s3_committer) { + buf->set_upload_to_remote([part_num = _cur_part_num, this](UploadFileBuffer& buf) { + _upload_one_part(part_num, buf); + }); + DCHECK(_cur_part_num == 1); + RETURN_IF_ERROR(_create_multi_upload_request()); + } } if (_pending_buf != nullptr) { @@ -400,56 +405,61 @@ Status S3FileWriter::_complete() { _wait_until_finish("PutObject"); return _st; } - CompleteMultipartUploadRequest complete_request; - complete_request.WithBucket(_bucket).WithKey(_key).WithUploadId(_upload_id); - + // Wait multipart load and finish. _wait_until_finish("Complete"); DBUG_EXECUTE_IF("s3_file_writer::_complete:1", { _cur_part_num++; }); - if (_failed || _completed_parts.size() != _cur_part_num) { - _st = Status::InternalError( - "error status {}, complete parts {}, cur part num {}, whole parts {}", _st, - _completed_parts.size(), _cur_part_num, _dump_completed_part()); - LOG(WARNING) << _st; - return _st; - } - // make sure _completed_parts are ascending order - std::sort(_completed_parts.begin(), _completed_parts.end(), - [](auto& p1, auto& p2) { return p1->GetPartNumber() < p2->GetPartNumber(); }); - DBUG_EXECUTE_IF("s3_file_writer::_complete:2", - { _completed_parts.back()->SetPartNumber(10 * _completed_parts.size()); }); - CompletedMultipartUpload completed_upload; - for (size_t i = 0; i < _completed_parts.size(); i++) { - if (_completed_parts[i]->GetPartNumber() != i + 1) [[unlikely]] { - auto st = Status::InternalError( - "error status {}, part num not continous, expected num {}, actual num {}, " - "whole parts {}", - _st, i + 1, _completed_parts[i]->GetPartNumber(), _dump_completed_part()); - LOG(WARNING) << st; - _st = st; - return st; + if (!_used_by_s3_committer) { // S3 committer will complete multipart upload file on FE side. + CompleteMultipartUploadRequest complete_request; + complete_request.WithBucket(_bucket).WithKey(_key).WithUploadId(_upload_id); + + if (_failed || _completed_parts.size() != _cur_part_num) { + _st = Status::InternalError( + "error status {}, complete parts {}, cur part num {}, whole parts {}", _st, + _completed_parts.size(), _cur_part_num, _dump_completed_part()); + LOG(WARNING) << _st; + return _st; + } + // make sure _completed_parts are ascending order + std::sort(_completed_parts.begin(), _completed_parts.end(), + [](auto& p1, auto& p2) { return p1->GetPartNumber() < p2->GetPartNumber(); }); + DBUG_EXECUTE_IF("s3_file_writer::_complete:2", + { _completed_parts.back()->SetPartNumber(10 * _completed_parts.size()); }); + CompletedMultipartUpload completed_upload; + for (size_t i = 0; i < _completed_parts.size(); i++) { + if (_completed_parts[i]->GetPartNumber() != i + 1) [[unlikely]] { + auto st = Status::InternalError( + "error status {}, part num not continous, expected num {}, actual num {}, " + "whole parts {}", + _st, i + 1, _completed_parts[i]->GetPartNumber(), _dump_completed_part()); + LOG(WARNING) << st; + _st = st; + return st; + } + completed_upload.AddParts(*_completed_parts[i]); } - completed_upload.AddParts(*_completed_parts[i]); - } - complete_request.WithMultipartUpload(completed_upload); + complete_request.WithMultipartUpload(completed_upload); - DBUG_EXECUTE_IF("s3_file_writer::_complete:3", { - auto s = Status::IOError( - "failed to create complete multi part upload (bucket={}, key={}): injected error", - _bucket, _path.native()); - LOG_WARNING(s.to_string()); - return s; - }); - SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency); - auto complete_outcome = _client->CompleteMultipartUpload(complete_request); + DBUG_EXECUTE_IF("s3_file_writer::_complete:3", { + auto s = Status::IOError( + "failed to create complete multi part upload (bucket={}, key={}): injected " + "error", + _bucket, _path.native()); + LOG_WARNING(s.to_string()); + return s; + }); + SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency); + auto complete_outcome = _client->CompleteMultipartUpload(complete_request); - if (!complete_outcome.IsSuccess()) { - _st = s3fs_error( - complete_outcome.GetError(), - fmt::format("failed to complete multi part upload {}, upload_id={}, whole parts={}", + if (!complete_outcome.IsSuccess()) { + _st = s3fs_error( + complete_outcome.GetError(), + fmt::format( + "failed to complete multi part upload {}, upload_id={}, whole parts={}", _path.native(), _upload_id, _dump_completed_part())); - LOG(WARNING) << _st; - return _st; + LOG(WARNING) << _st; + return _st; + } } s3_file_created_total << 1; return Status::OK(); @@ -465,13 +475,7 @@ Status S3FileWriter::finalize() { // submit pending buf if it's not nullptr // it's the last buf, we can submit it right now if (_pending_buf != nullptr) { - // if we only need to upload one file less than 5MB, we can just - // call PutObject to reduce the network IO - if (_upload_id.empty()) { - auto* buf = dynamic_cast(_pending_buf.get()); - DCHECK(buf != nullptr); - buf->set_upload_to_remote([this](UploadFileBuffer& b) { _put_object(b); }); - } + RETURN_IF_ERROR(_set_upload_to_remote_less_than_buffer_size()); _countdown_event.add_count(); RETURN_IF_ERROR(_pending_buf->submit(std::move(_pending_buf))); _pending_buf = nullptr; @@ -480,6 +484,24 @@ Status S3FileWriter::finalize() { return _st; } +Status S3FileWriter::_set_upload_to_remote_less_than_buffer_size() { + auto* buf = dynamic_cast(_pending_buf.get()); + DCHECK(buf != nullptr); + if (_used_by_s3_committer) { + // If used_by_s3_committer, we always use multi-parts uploading. + buf->set_upload_to_remote([part_num = _cur_part_num, this](UploadFileBuffer& buf) { + _upload_one_part(part_num, buf); + }); + DCHECK(_cur_part_num == 1); + RETURN_IF_ERROR(_create_multi_upload_request()); + } else { + // if we only need to upload one file less than 5MB, we can just + // call PutObject to reduce the network IO + buf->set_upload_to_remote([this](UploadFileBuffer& b) { _put_object(b); }); + } + return Status::OK(); +} + void S3FileWriter::_put_object(UploadFileBuffer& buf) { DCHECK(!closed()); Aws::S3::Model::PutObjectRequest request; diff --git a/be/src/io/fs/s3_file_writer.h b/be/src/io/fs/s3_file_writer.h index a2c2ec0422a4f17..de4aba9a48037f4 100644 --- a/be/src/io/fs/s3_file_writer.h +++ b/be/src/io/fs/s3_file_writer.h @@ -58,12 +58,21 @@ class S3FileWriter final : public FileWriter { size_t bytes_appended() const override { return _bytes_appended; } bool closed() const override { return _closed; } + const std::vector>& completed_parts() const { + return _completed_parts; + } + + const std::string& key() const { return _key; } + const std::string& bucket() const { return _bucket; } + const std::string& upload_id() const { return _upload_id; } + private: Status _abort(); [[nodiscard]] std::string _dump_completed_part() const; void _wait_until_finish(std::string_view task_name); Status _complete(); Status _create_multi_upload_request(); + Status _set_upload_to_remote_less_than_buffer_size(); void _put_object(UploadFileBuffer& buf); void _upload_one_part(int64_t part_num, UploadFileBuffer& buf); @@ -95,6 +104,8 @@ class S3FileWriter final : public FileWriter { uint64_t _expiration_time; bool _is_cold_data; bool _write_file_cache; + + bool _used_by_s3_committer; }; } // namespace io diff --git a/be/src/util/s3_util.h b/be/src/util/s3_util.h index f2800abf0423849..6f44e288d2eb491 100644 --- a/be/src/util/s3_util.h +++ b/be/src/util/s3_util.h @@ -108,6 +108,7 @@ struct S3Conf { S3ClientConf client_conf; bool sse_enabled = false; + bool used_by_s3_committer = false; cloud::ObjectStoreInfoPB::Provider provider; std::string to_string() const { diff --git a/be/src/vec/sink/writer/vhive_partition_writer.cpp b/be/src/vec/sink/writer/vhive_partition_writer.cpp index 47dc957dbe88dc2..4a6bb969e2b2c5e 100644 --- a/be/src/vec/sink/writer/vhive_partition_writer.cpp +++ b/be/src/vec/sink/writer/vhive_partition_writer.cpp @@ -18,6 +18,7 @@ #include "vhive_partition_writer.h" #include "io/file_factory.h" +#include "io/fs/s3_file_writer.h" #include "runtime/runtime_state.h" #include "vec/columns/column_map.h" #include "vec/core/materialize_block.h" @@ -55,7 +56,8 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile) io::FileDescription file_description = { .path = fmt::format("{}/{}", _write_info.write_path, _get_target_file_name())}; _fs = DORIS_TRY(FileFactory::create_fs(fs_properties, file_description)); - RETURN_IF_ERROR(_fs->create_file(file_description.path, &_file_writer)); + io::FileWriterOptions file_writer_options = {.used_by_s3_committer = true}; + RETURN_IF_ERROR(_fs->create_file(file_description.path, &_file_writer, &file_writer_options)); std::vector column_names; column_names.reserve(_columns.size()); @@ -189,12 +191,28 @@ THivePartitionUpdate VHivePartitionWriter::_build_partition_update() { hive_partition_update.__set_name(_partition_name); hive_partition_update.__set_update_mode(_update_mode); THiveLocationParams location; - location.__set_write_path(_write_info.write_path); + location.__set_write_path(_write_info.original_write_path); location.__set_target_path(_write_info.target_path); hive_partition_update.__set_location(location); hive_partition_update.__set_file_names({_get_target_file_name()}); hive_partition_update.__set_row_count(_row_count); hive_partition_update.__set_file_size(_input_size_in_bytes); + + if (_write_info.file_type == TFileType::FILE_S3) { + doris::io::S3FileWriter* s3_mpu_file_writer = + dynamic_cast(_file_writer.get()); + TS3MPUPendingUpload s3_mpu_pending_upload; + s3_mpu_pending_upload.__set_bucket(s3_mpu_file_writer->bucket()); + s3_mpu_pending_upload.__set_key(s3_mpu_file_writer->key()); + s3_mpu_pending_upload.__set_upload_id(s3_mpu_file_writer->upload_id()); + + std::map etags; + for (auto& completed_part : s3_mpu_file_writer->completed_parts()) { + etags.insert({completed_part->GetPartNumber(), completed_part->GetETag()}); + } + s3_mpu_pending_upload.__set_etags(etags); + hive_partition_update.__set_s3_mpu_pending_uploads({s3_mpu_pending_upload}); + } return hive_partition_update; } diff --git a/be/src/vec/sink/writer/vhive_partition_writer.h b/be/src/vec/sink/writer/vhive_partition_writer.h index 117806f7b055ac4..8c63d855a021383 100644 --- a/be/src/vec/sink/writer/vhive_partition_writer.h +++ b/be/src/vec/sink/writer/vhive_partition_writer.h @@ -43,6 +43,7 @@ class VHivePartitionWriter { public: struct WriteInfo { std::string write_path; + std::string original_write_path; std::string target_path; TFileType::type file_type; }; diff --git a/be/src/vec/sink/writer/vhive_table_writer.cpp b/be/src/vec/sink/writer/vhive_table_writer.cpp index d43fc34b4e5204c..7c3a864ebb36779 100644 --- a/be/src/vec/sink/writer/vhive_table_writer.cpp +++ b/be/src/vec/sink/writer/vhive_table_writer.cpp @@ -256,26 +256,30 @@ std::shared_ptr VHiveTableWriter::_create_partition_writer if (existing_table == false) { // new table update_mode = TUpdateMode::NEW; if (_partition_columns_input_index.empty()) { // new unpartitioned table - write_info = {write_location.write_path, write_location.target_path, - write_location.file_type}; + write_info = {write_location.write_path, write_location.original_write_path, + write_location.target_path, write_location.file_type}; } else { // a new partition in a new partitioned table auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name); + auto original_write_path = + fmt::format("{}/{}", write_location.original_write_path, partition_name); auto target_path = fmt::format("{}/{}", write_location.target_path, partition_name); - write_info = {std::move(write_path), std::move(target_path), - write_location.file_type}; + write_info = {std::move(write_path), std::move(original_write_path), + std::move(target_path), write_location.file_type}; } } else { // a new partition in an existing partitioned table, or an existing unpartitioned table if (_partition_columns_input_index.empty()) { // an existing unpartitioned table update_mode = !hive_table_sink.overwrite ? TUpdateMode::APPEND : TUpdateMode::OVERWRITE; - write_info = {write_location.write_path, write_location.target_path, - write_location.file_type}; + write_info = {write_location.write_path, write_location.original_write_path, + write_location.target_path, write_location.file_type}; } else { // a new partition in an existing partitioned table update_mode = TUpdateMode::NEW; auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name); + auto original_write_path = + fmt::format("{}/{}", write_location.original_write_path, partition_name); auto target_path = fmt::format("{}/{}", write_location.target_path, partition_name); - write_info = {std::move(write_path), std::move(target_path), - write_location.file_type}; + write_info = {std::move(write_path), std::move(original_write_path), + std::move(target_path), write_location.file_type}; } // need to get schema from existing table ? } @@ -285,16 +289,21 @@ std::shared_ptr VHiveTableWriter::_create_partition_writer if (!hive_table_sink.overwrite) { update_mode = TUpdateMode::APPEND; auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name); + auto original_write_path = + fmt::format("{}/{}", write_location.original_write_path, partition_name); auto target_path = fmt::format("{}", existing_partition->location.target_path); - write_info = {std::move(write_path), std::move(target_path), - existing_partition->location.file_type}; + write_info = {std::move(write_path), std::move(original_write_path), + std::move(target_path), existing_partition->location.file_type}; file_format_type = existing_partition->file_format; write_compress_type = hive_table_sink.compression_type; } else { update_mode = TUpdateMode::OVERWRITE; auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name); + auto original_write_path = + fmt::format("{}/{}", write_location.original_write_path, partition_name); auto target_path = fmt::format("{}/{}", write_location.target_path, partition_name); - write_info = {std::move(write_path), std::move(target_path), write_location.file_type}; + write_info = {std::move(write_path), std::move(original_write_path), + std::move(target_path), write_location.file_type}; file_format_type = hive_table_sink.file_format; write_compress_type = hive_table_sink.compression_type; // need to get schema from existing table ? diff --git a/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_table.hql b/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_table.hql index dbeeab972f6b0dc..a45b4a225cdf12d 100644 --- a/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_table.hql +++ b/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_table.hql @@ -2444,3 +2444,142 @@ PARTITIONED BY ( `varchar_col` varchar(50)) stored as orc TBLPROPERTIES("orc.compress"="ZLIB"); + +CREATE TABLE `all_types_parquet_snappy_s3`( + `boolean_col` boolean, + `tinyint_col` tinyint, + `smallint_col` smallint, + `int_col` int, + `bigint_col` bigint, + `float_col` float, + `double_col` double, + `decimal_col1` decimal(9,0), + `decimal_col2` decimal(8,4), + `decimal_col3` decimal(18,6), + `decimal_col4` decimal(38,12), + `string_col` string, + `binary_col` binary, + `date_col` date, + `timestamp_col1` timestamp, + `timestamp_col2` timestamp, + `timestamp_col3` timestamp, + `char_col1` char(50), + `char_col2` char(100), + `char_col3` char(255), + `varchar_col1` varchar(50), + `varchar_col2` varchar(100), + `varchar_col3` varchar(255), + `t_map_string` map, + `t_map_varchar` map, + `t_map_char` map, + `t_map_int` map, + `t_map_bigint` map, + `t_map_float` map, + `t_map_double` map, + `t_map_boolean` map, + `t_map_decimal_precision_2` map, + `t_map_decimal_precision_4` map, + `t_map_decimal_precision_8` map, + `t_map_decimal_precision_17` map, + `t_map_decimal_precision_18` map, + `t_map_decimal_precision_38` map, + `t_array_string` array, + `t_array_int` array, + `t_array_bigint` array, + `t_array_float` array, + `t_array_double` array, + `t_array_boolean` array, + `t_array_varchar` array, + `t_array_char` array, + `t_array_decimal_precision_2` array, + `t_array_decimal_precision_4` array, + `t_array_decimal_precision_8` array, + `t_array_decimal_precision_17` array, + `t_array_decimal_precision_18` array, + `t_array_decimal_precision_38` array, + `t_struct_bigint` struct, + `t_complex` map>>, + `t_struct_nested` struct>, + `t_struct_null` struct, + `t_struct_non_nulls_after_nulls` struct, + `t_nested_struct_non_nulls_after_nulls` struct>, + `t_map_null_value` map, + `t_array_string_starting_with_nulls` array, + `t_array_string_with_nulls_in_between` array, + `t_array_string_ending_with_nulls` array, + `t_array_string_all_nulls` array, + `dt` int) +stored as parquet +LOCATION + 'cosn://doris-build-1308700295/regression/write/data/all_types_parquet_snappy_s3' +TBLPROPERTIES('parquet.compression'='SNAPPY'); + +CREATE TABLE `all_types_par_parquet_snappy_s3`( + `boolean_col` boolean, + `tinyint_col` tinyint, + `smallint_col` smallint, + `int_col` int, + `bigint_col` bigint, + `float_col` float, + `double_col` double, + `decimal_col1` decimal(9,0), + `decimal_col2` decimal(8,4), + `decimal_col3` decimal(18,6), + `decimal_col4` decimal(38,12), + `string_col` string, + `binary_col` binary, + `date_col` date, + `timestamp_col1` timestamp, + `timestamp_col2` timestamp, + `timestamp_col3` timestamp, + `char_col1` char(50), + `char_col2` char(100), + `char_col3` char(255), + `varchar_col1` varchar(50), + `varchar_col2` varchar(100), + `varchar_col3` varchar(255), + `t_map_string` map, + `t_map_varchar` map, + `t_map_char` map, + `t_map_int` map, + `t_map_bigint` map, + `t_map_float` map, + `t_map_double` map, + `t_map_boolean` map, + `t_map_decimal_precision_2` map, + `t_map_decimal_precision_4` map, + `t_map_decimal_precision_8` map, + `t_map_decimal_precision_17` map, + `t_map_decimal_precision_18` map, + `t_map_decimal_precision_38` map, + `t_array_string` array, + `t_array_int` array, + `t_array_bigint` array, + `t_array_float` array, + `t_array_double` array, + `t_array_boolean` array, + `t_array_varchar` array, + `t_array_char` array, + `t_array_decimal_precision_2` array, + `t_array_decimal_precision_4` array, + `t_array_decimal_precision_8` array, + `t_array_decimal_precision_17` array, + `t_array_decimal_precision_18` array, + `t_array_decimal_precision_38` array, + `t_struct_bigint` struct, + `t_complex` map>>, + `t_struct_nested` struct>, + `t_struct_null` struct, + `t_struct_non_nulls_after_nulls` struct, + `t_nested_struct_non_nulls_after_nulls` struct>, + `t_map_null_value` map, + `t_array_string_starting_with_nulls` array, + `t_array_string_with_nulls_in_between` array, + `t_array_string_ending_with_nulls` array, + `t_array_string_all_nulls` array) +PARTITIONED BY ( + `dt` int) +stored as parquet +LOCATION + 'cosn://doris-build-1308700295/regression/write/data/all_types_par_parquet_snappy_s3' +TBLPROPERTIES('parquet.compression'='SNAPPY'); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java index 005a8f2cb84bf41..a3efac587bd2635 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java @@ -296,6 +296,8 @@ private FileSystemType getFileSystemType() { fsType = FileSystemType.S3; break; case COSN: + fsType = FileSystemType.S3; + break; case OFS: // ofs:// and cosn:// use the same underlying file system: Tencent Cloud HDFS, aka CHDFS)) { fsType = FileSystemType.OFS; @@ -329,7 +331,11 @@ public static TFileType getTFileTypeForBE(String location) { return null; } LocationPath locationPath = new LocationPath(location); - switch (locationPath.getLocationType()) { + return locationPath.getTFileTypeForBE(); + } + + public TFileType getTFileTypeForBE() { + switch (this.getLocationType()) { case S3: case S3A: case S3N: @@ -362,7 +368,7 @@ public static TFileType getTFileTypeForBE(String location) { * * @return BE scan range path */ - public Path toScanRangeLocation() { + public Path toStorageLocation() { switch (locationType) { case S3: case S3A: diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3URI.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3URI.java index faa58905563f34f..29c3f2700c43f5c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3URI.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3URI.java @@ -21,34 +21,70 @@ import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; -import org.apache.parquet.glob.GlobExpander; +import org.apache.commons.lang3.StringUtils; import java.net.URI; import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; /** * This class represents a fully qualified location in S3 for input/output - * operations expressed as as URI. This implementation is provided to - * ensure compatibility with Hadoop Path implementations that may introduce - * encoding issues with native URI implementation. + * operations expressed as as URI. + *

+ * For AWS S3, uri common styles should be: + * 1. AWS Client Style(Hadoop S3 Style): s3://my-bucket/path/to/file?versionId=abc123&partNumber=77&partNumber=88 + * or + * 2. Virtual Host Style: https://my-bucket.s3.us-west-1.amazonaws.com/resources/doc.txt?versionId=abc123&partNumber=77&partNumber=88 + * or + * 3. Path Style: https://s3.us-west-1.amazonaws.com/my-bucket/resources/doc.txt?versionId=abc123&partNumber=77&partNumber=88 + * + * Regarding the above-mentioned common styles, we can use isPathStyle to control whether to use path style + * or virtual host style. + * "Virtual host style" is the currently mainstream and recommended approach to use, so the default value of + * isPathStyle is false. + * + * Other Styles: + * 1. Virtual Host AWS Client (Hadoop S3) Mixed Style: + * s3://my-bucket.s3.us-west-1.amazonaws.com/resources/doc.txt?versionId=abc123&partNumber=77&partNumber=88 + * or + * 2. Path AWS Client (Hadoop S3) Mixed Style: + * s3://s3.us-west-1.amazonaws.com/my-bucket/resources/doc.txt?versionId=abc123&partNumber=77&partNumber=88 + * + * For these two styles, we can use isPathStyle and forceParsingByStandardUri + * to control whether to use. + * Virtual Host AWS Client (Hadoop S3) Mixed Style: isPathStyle = false && forceParsingByStandardUri = true + * Path AWS Client (Hadoop S3) Mixed Style: isPathStyle = true && forceParsingByStandardUri = true + * + * When the incoming location is url encoded, the encoded string will be returned. + * For getKey(), getQueryParams() will return the encoding string */ public class S3URI { public static final String SCHEME_DELIM = "://"; public static final String PATH_DELIM = "/"; - private static final String QUERY_DELIM = "\\?"; - private static final String FRAGMENT_DELIM = "#"; private static final Set VALID_SCHEMES = ImmutableSet.of("http", "https", "s3", "s3a", "s3n", - "bos", "oss", "cos", "obs"); + "bos", "oss", "cos", "obs"); - private String scheme; - private final String location; - private final String virtualBucket; - private final String bucket; - private final String key; - private boolean forceVirtualHosted; + private static final Set OS_SCHEMES = ImmutableSet.of("s3", "s3a", "s3n", + "bos", "oss", "cos", "obs"); + + private URI uri; + + private String bucket; + private String key; + + private String endpoint; + + private String region; + + private boolean isStandardURL; + private boolean isPathStyle; + private Map> queryParams; /** * Creates a new S3URI based on the bucket and key parsed from the location as defined in: @@ -59,92 +95,180 @@ public class S3URI { * * @param location fully qualified URI */ - public static S3URI create(String location) throws UserException { - return create(location, false); + return create(location, false, false); } - public static S3URI create(String location, boolean forceVirtualHosted) throws UserException { - S3URI s3URI = new S3URI(location, forceVirtualHosted); - return s3URI; + public static S3URI create(String location, boolean isPathStyle) throws UserException { + return new S3URI(location, isPathStyle, false); } - private S3URI(String location, boolean forceVirtualHosted) throws UserException { + public static S3URI create(String location, boolean isPathStyle, boolean forceParsingByStandardUri) + throws UserException { + return new S3URI(location, isPathStyle, forceParsingByStandardUri); + } + + private S3URI(String location, boolean isPathStyle, boolean forceParsingByStandardUri) throws UserException { if (Strings.isNullOrEmpty(location)) { throw new UserException("s3 location can not be null"); } + this.isPathStyle = isPathStyle; + parseUri(location, forceParsingByStandardUri); + } + + private void parseUri(String location, boolean forceParsingStandardUri) throws UserException { + validateUri(location); + if (!forceParsingStandardUri && OS_SCHEMES.contains(uri.getScheme().toLowerCase())) { + parseAwsCliStyleUri(); + } else { + parseStandardUri(); + } + parseEndpointAndRegion(); + } + + private void validateUri(String location) throws UserException { try { - // the location need to be normalized to eliminate double "/", or the hadoop aws api - // won't handle it correctly. - this.location = new URI(location).normalize().toString(); + uri = new URI(location); } catch (URISyntaxException e) { - throw new UserException("Invalid s3 uri: " + e.getMessage()); + throw new UserException(e); + } + if (uri.getScheme() == null || !VALID_SCHEMES.contains(uri.getScheme().toLowerCase())) { + throw new UserException("Invalid scheme: " + this.uri); + } + } + + private void parseAwsCliStyleUri() throws UserException { + bucket = uri.getAuthority(); + if (bucket == null) { + throw new UserException("missing bucket: " + uri); + } + String path = uri.getRawPath(); + if (path.length() > 1) { + key = path.substring(1); + } else { + throw new UserException("missing key: " + uri); } - this.forceVirtualHosted = forceVirtualHosted; - String[] schemeSplit = this.location.split(SCHEME_DELIM); - if (schemeSplit.length != 2) { - throw new UserException("Invalid s3 uri: " + this.location); + addQueryParamsIfNeeded(); + + isStandardURL = false; + this.isPathStyle = false; + } + + private void parseStandardUri() throws UserException { + if (uri.getHost() == null) { + throw new UserException("Invalid S3 URI: no hostname: " + uri); } - this.scheme = schemeSplit[0]; - if (!VALID_SCHEMES.contains(scheme.toLowerCase())) { - throw new UserException("Invalid scheme: " + this.location); + addQueryParamsIfNeeded(); + + if (isPathStyle) { + parsePathStyleUri(); + } else { + parseVirtualHostedStyleUri(); } + isStandardURL = true; + } - String[] authoritySplit = schemeSplit[1].split(PATH_DELIM, 2); - if (authoritySplit.length != 2) { - throw new UserException("Invalid s3 uri: " + this.location); + private void addQueryParamsIfNeeded() { + if (uri.getQuery() != null) { + queryParams = splitQueryString(uri.getRawQuery()).stream().map((s) -> s.split("=")) + .map((s) -> s.length == 1 ? new String[] {s[0], null} : s).collect( + Collectors.groupingBy((a) -> a[0], + Collectors.mapping((a) -> a[1], Collectors.toList()))); } - if (authoritySplit[1].trim().isEmpty()) { - throw new UserException("Invalid s3 key: " + this.location); + } + + private static List splitQueryString(String queryString) { + List results = new ArrayList<>(); + StringBuilder result = new StringBuilder(); + + for (int i = 0; i < queryString.length(); ++i) { + char character = queryString.charAt(i); + if (character != '&') { + result.append(character); + } else { + String param = result.toString(); + results.add(param); + result.setLength(0); + } } - // Strip query and fragment if they exist - String path = authoritySplit[1]; - path = path.split(QUERY_DELIM)[0]; - path = path.split(FRAGMENT_DELIM)[0]; - if (this.forceVirtualHosted) { - // If forceVirtualHosted is true, the s3 client will NOT automatically convert to virtual-hosted style. - // So we do some convert manually. Eg: - // endpoint: http://cos.ap-beijing.myqcloud.com - // bucket/path: my_bucket/file.txt - // `virtualBucket` will be "my_bucket" - // `bucket` will be `file.txt` - // So that when assembling the real endpoint will be: http://my_bucket.cos.ap-beijing.myqcloud.com/file.txt - this.virtualBucket = authoritySplit[0]; - String[] paths = path.split("/", 2); - this.bucket = paths[0]; - if (paths.length > 1) { - key = paths[1]; + String param = result.toString(); + results.add(param); + return results; + } + + private void parsePathStyleUri() throws UserException { + String path = uri.getRawPath(); + + if (!StringUtils.isEmpty(path) && !"/".equals(path)) { + int index = path.indexOf('/', 1); + + if (index == -1) { + // No trailing slash, e.g., "https://s3.amazonaws.com/bucket" + bucket = path.substring(1); + throw new UserException("missing key: " + uri); } else { - key = ""; + bucket = path.substring(1, index); + if (index != path.length() - 1) { + key = path.substring(index + 1); + } else { + throw new UserException("missing key: " + uri); + } } } else { - // If forceVirtualHosted is false, let the s3 client to determine how to covert endpoint, eg: - // For s3 endpoint(start with "s3."), it will convert to virtual-hosted style. - // For others, keep as it is (maybe path-style, maybe virtual-hosted style.) - this.virtualBucket = ""; - this.bucket = authoritySplit[0]; - key = path; + throw new UserException("missing bucket: " + this.uri); } } - public List expand(String path) { - return GlobExpander.expand(path); - } + private void parseVirtualHostedStyleUri() throws UserException { + bucket = uri.getHost().split("\\.")[0]; - public String getScheme() { - return this.scheme; + String path = uri.getRawPath(); + if (!StringUtils.isEmpty(path) && !"/".equals(path)) { + key = path.substring(1); + } else { + throw new UserException("missing key: " + this.uri); + } } - public String getBucketScheme() { - return scheme + "://" + bucket; - } + private void parseEndpointAndRegion() { + // parse endpoint + if (isStandardURL) { + if (isPathStyle) { + endpoint = uri.getAuthority(); + } else { // virtual_host_style + if (uri.getAuthority() == null) { + endpoint = null; + return; + } + String[] splits = uri.getAuthority().split("\\.", 2); + if (splits.length < 2) { + endpoint = null; + return; + } + endpoint = splits[1]; + } + } else { + endpoint = null; + } + if (endpoint == null) { + return; + } - public String getVirtualBucket() { - return virtualBucket; + // parse region + String[] endpointSplits = endpoint.split("\\."); + if (endpointSplits.length < 2) { + return; + } + if (endpointSplits[0].contains("oss-")) { + // compatible with the endpoint: oss-cn-bejing.aliyuncs.com + region = endpointSplits[0]; + return; + } + region = endpointSplits[1]; } /** @@ -161,15 +285,30 @@ public String getKey() { return key; } - /* - * @return original, unmodified location - */ - public String getLocation() { - return location; + public Optional>> getQueryParams() { + return Optional.ofNullable(queryParams); + } + + public Optional getEndpoint() { + return Optional.ofNullable(endpoint); + } + + public Optional getRegion() { + return Optional.ofNullable(region); } @Override public String toString() { - return location; + final StringBuilder sb = new StringBuilder("S3URI{"); + sb.append("uri=").append(uri); + sb.append(", bucket='").append(bucket).append('\''); + sb.append(", key='").append(key).append('\''); + sb.append(", endpoint='").append(endpoint).append('\''); + sb.append(", region='").append(region).append('\''); + sb.append(", isStandardURL=").append(isStandardURL); + sb.append(", isPathStyle=").append(isPathStyle); + sb.append(", queryParams=").append(queryParams); + sb.append('}'); + return sb.toString(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java index 0195783191cf75a..57b53627c682801 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java @@ -45,7 +45,8 @@ public class S3Util { - public static S3Client buildS3Client(URI endpoint, String region, CloudCredential credential) { + public static S3Client buildS3Client(URI endpoint, String region, CloudCredential credential, + boolean isUsePathStyle) { AwsCredentialsProvider scp; AwsCredentials awsCredential; if (!credential.isTemporary()) { @@ -89,10 +90,9 @@ public static S3Client buildS3Client(URI endpoint, String region, CloudCredentia .region(Region.of(region)) .overrideConfiguration(clientConf) // disable chunkedEncoding because of bos not supported - // use virtual hosted-style access .serviceConfiguration(S3Configuration.builder() .chunkedEncodingEnabled(false) - .pathStyleAccessEnabled(true) + .pathStyleAccessEnabled(isUsePathStyle) .build()) .build(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java index 299ab6dddfb9aae..a3868d20470bf93 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java @@ -34,6 +34,8 @@ import org.apache.doris.datasource.operations.ExternalMetadataOperations; import org.apache.doris.datasource.property.PropertyConverter; import org.apache.doris.datasource.property.constants.HMSProperties; +import org.apache.doris.fs.FileSystemProvider; +import org.apache.doris.fs.FileSystemProviderImpl; import org.apache.doris.transaction.TransactionManagerFactory; import com.google.common.base.Strings; @@ -147,7 +149,9 @@ protected void initLocalObjectsImpl() { AuthenticationConfig.HADOOP_KERBEROS_KEYTAB)); } HiveMetadataOps hiveOps = ExternalMetadataOperations.newHiveMetadataOps(hiveConf, jdbcClientConfig, this); - transactionManager = TransactionManagerFactory.createHiveTransactionManager(hiveOps); + FileSystemProvider fileSystemProvider = new FileSystemProviderImpl(Env.getCurrentEnv().getExtMetaCacheMgr(), + this.bindBrokerName(), this.catalogProperty.getHadoopProperties()); + transactionManager = TransactionManagerFactory.createHiveTransactionManager(hiveOps, fileSystemProvider); metadataOps = hiveOps; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java index 32dd083c2adaf5c..8cfc7c2aeaa3b75 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java @@ -23,13 +23,18 @@ import org.apache.doris.backup.Status; import org.apache.doris.common.Pair; +import org.apache.doris.common.UserException; import org.apache.doris.common.profile.SummaryProfile; import org.apache.doris.fs.FileSystem; +import org.apache.doris.fs.FileSystemProvider; import org.apache.doris.fs.FileSystemUtil; import org.apache.doris.fs.remote.RemoteFile; +import org.apache.doris.fs.remote.S3FileSystem; +import org.apache.doris.fs.remote.SwitchingFileSystem; import org.apache.doris.nereids.trees.plans.commands.insert.HiveInsertCommandContext; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.THivePartitionUpdate; +import org.apache.doris.thrift.TS3MPUPendingUpload; import org.apache.doris.thrift.TUpdateMode; import org.apache.doris.transaction.Transaction; @@ -48,6 +53,11 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload; +import software.amazon.awssdk.services.s3.model.CompletedPart; import java.util.ArrayList; import java.util.HashMap; @@ -62,6 +72,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -83,10 +94,25 @@ public class HMSTransaction implements Transaction { private List hivePartitionUpdates = Lists.newArrayList(); private String declaredIntentionsToWrite; - public HMSTransaction(HiveMetadataOps hiveOps) { - this.hiveOps = hiveOps; - this.fs = hiveOps.getFs(); + private static class UncompletedMpuPendingUpload { + + private final TS3MPUPendingUpload s3MPUPendingUpload; + private final String path; + public UncompletedMpuPendingUpload(TS3MPUPendingUpload s3MPUPendingUpload, String path) { + this.s3MPUPendingUpload = s3MPUPendingUpload; + this.path = path; + } + } + + private Set uncompletedMpuPendingUploads = new HashSet<>(); + + public HMSTransaction(HiveMetadataOps hiveOps, FileSystemProvider fileSystemProvider) { + this.hiveOps = hiveOps; + this.fs = fileSystemProvider.get(null); + if (!(fs instanceof SwitchingFileSystem)) { + throw new RuntimeException("fs should be SwitchingFileSystem"); + } if (ConnectContext.get().getExecutor() != null) { summaryProfile = Optional.of(ConnectContext.get().getExecutor().getSummaryProfile()); } @@ -112,6 +138,9 @@ public List mergePartitions(List hiv THivePartitionUpdate old = mm.get(pu.getName()); old.setFileSize(old.getFileSize() + pu.getFileSize()); old.setRowCount(old.getRowCount() + pu.getRowCount()); + if (old.getS3MpuPendingUploads() != null && pu.getS3MpuPendingUploads() != null) { + old.getS3MpuPendingUploads().addAll(pu.getS3MpuPendingUploads()); + } old.getFileNames().addAll(pu.getFileNames()); } else { mm.put(pu.getName(), pu); @@ -136,6 +165,12 @@ public void finishInsertTable(String dbName, String tbName) { this.dbName = dbName; this.tbName = tbName; List mergedPUs = mergePartitions(hivePartitionUpdates); + for (THivePartitionUpdate pu : mergedPUs) { + for (TS3MPUPendingUpload s3MPUPendingUpload : pu.getS3MpuPendingUploads()) { + uncompletedMpuPendingUploads.add( + new UncompletedMpuPendingUpload(s3MPUPendingUpload, pu.getLocation().getTargetPath())); + } + } Table table = getTable(dbName, tbName); List> insertExistsPartitions = new ArrayList<>(); for (THivePartitionUpdate pu : mergedPUs) { @@ -156,11 +191,12 @@ public void finishInsertTable(String dbName, String tbName) { tbName, writePath, pu.getFileNames(), - hivePartitionStatistics); + hivePartitionStatistics, + pu); break; case OVERWRITE: dropTable(dbName, tbName); - createTable(table, writePath, pu.getFileNames(), hivePartitionStatistics); + createTable(table, writePath, pu.getFileNames(), hivePartitionStatistics, pu); break; default: throw new RuntimeException("Not support mode:[" + updateMode + "] in unPartitioned table"); @@ -191,7 +227,7 @@ public void finishInsertTable(String dbName, String tbName) { } addPartition( dbName, tbName, hivePartition, writePath, - pu.getName(), pu.getFileNames(), hivePartitionStatistics); + pu.getName(), pu.getFileNames(), hivePartitionStatistics, pu); break; default: throw new RuntimeException("Not support mode:[" + updateMode + "] in partitioned table"); @@ -351,7 +387,8 @@ private void convertToInsertExistingPartitionAction( pu.getLocation().getWritePath(), pu.getName(), pu.getFileNames(), - updateStats + updateStats, + pu )) ); } @@ -676,15 +713,19 @@ private static class TableAndMore { private final List fileNames; private final HivePartitionStatistics statisticsUpdate; + private final THivePartitionUpdate hivePartitionUpdate; + public TableAndMore( Table table, String currentLocation, List fileNames, - HivePartitionStatistics statisticsUpdate) { + HivePartitionStatistics statisticsUpdate, + THivePartitionUpdate hivePartitionUpdate) { this.table = Objects.requireNonNull(table, "table is null"); this.currentLocation = Objects.requireNonNull(currentLocation); this.fileNames = Objects.requireNonNull(fileNames); this.statisticsUpdate = Objects.requireNonNull(statisticsUpdate, "statisticsUpdate is null"); + this.hivePartitionUpdate = Objects.requireNonNull(hivePartitionUpdate, "hivePartitionUpdate is null"); } public Table getTable() { @@ -703,6 +744,10 @@ public HivePartitionStatistics getStatisticsUpdate() { return statisticsUpdate; } + public THivePartitionUpdate getHivePartitionUpdate() { + return hivePartitionUpdate; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) @@ -719,17 +764,22 @@ private static class PartitionAndMore { private final List fileNames; private final HivePartitionStatistics statisticsUpdate; + private final THivePartitionUpdate hivePartitionUpdate; + + public PartitionAndMore( HivePartition partition, String currentLocation, String partitionName, List fileNames, - HivePartitionStatistics statisticsUpdate) { + HivePartitionStatistics statisticsUpdate, + THivePartitionUpdate hivePartitionUpdate) { this.partition = Objects.requireNonNull(partition, "partition is null"); this.currentLocation = Objects.requireNonNull(currentLocation, "currentLocation is null"); this.partitionName = Objects.requireNonNull(partitionName, "partition is null"); this.fileNames = Objects.requireNonNull(fileNames, "fileNames is null"); this.statisticsUpdate = Objects.requireNonNull(statisticsUpdate, "statisticsUpdate is null"); + this.hivePartitionUpdate = Objects.requireNonNull(hivePartitionUpdate, "hivePartitionUpdate is null"); } public HivePartition getPartition() { @@ -752,6 +802,10 @@ public HivePartitionStatistics getStatisticsUpdate() { return statisticsUpdate; } + public THivePartitionUpdate getHivePartitionUpdate() { + return hivePartitionUpdate; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) @@ -835,7 +889,8 @@ public synchronized void finishChangingExistingTable( String tableName, String location, List fileNames, - HivePartitionStatistics statisticsUpdate) { + HivePartitionStatistics statisticsUpdate, + THivePartitionUpdate hivePartitionUpdate) { DatabaseTableName databaseTableName = new DatabaseTableName(databaseName, tableName); Action oldTableAction = tableActions.get(databaseTableName); if (oldTableAction == null) { @@ -843,12 +898,13 @@ public synchronized void finishChangingExistingTable( tableActions.put( databaseTableName, new Action<>( - actionType, + actionType, new TableAndMore( - table, - location, - fileNames, - statisticsUpdate))); + table, + location, + fileNames, + statisticsUpdate, + hivePartitionUpdate))); return; } @@ -870,12 +926,13 @@ public synchronized void finishChangingExistingTable( } public synchronized void createTable( - Table table, String location, List fileNames, HivePartitionStatistics statistics) { + Table table, String location, List fileNames, HivePartitionStatistics statistics, + THivePartitionUpdate hivePartitionUpdate) { // When creating a table, it should never have partition actions. This is just a sanity check. checkNoPartitionAction(dbName, tbName); DatabaseTableName databaseTableName = new DatabaseTableName(dbName, tbName); Action oldTableAction = tableActions.get(databaseTableName); - TableAndMore tableAndMore = new TableAndMore(table, location, fileNames, statistics); + TableAndMore tableAndMore = new TableAndMore(table, location, fileNames, statistics, hivePartitionUpdate); if (oldTableAction == null) { tableActions.put(databaseTableName, new Action<>(ActionType.ADD, tableAndMore)); return; @@ -939,7 +996,8 @@ public synchronized void addPartition( String currentLocation, String partitionName, List files, - HivePartitionStatistics statistics) { + HivePartitionStatistics statistics, + THivePartitionUpdate hivePartitionUpdate) { Map, Action> partitionActionsForTable = partitionActions.computeIfAbsent(new DatabaseTableName(databaseName, tableName), k -> new HashMap<>()); Action oldPartitionAction = partitionActionsForTable.get(partition.getPartitionValues()); @@ -948,7 +1006,8 @@ public synchronized void addPartition( partition.getPartitionValues(), new Action<>( ActionType.ADD, - new PartitionAndMore(partition, currentLocation, partitionName, files, statistics)) + new PartitionAndMore(partition, currentLocation, partitionName, files, statistics, + hivePartitionUpdate)) ); return; } @@ -959,7 +1018,8 @@ public synchronized void addPartition( partition.getPartitionValues(), new Action<>( ActionType.ALTER, - new PartitionAndMore(partition, currentLocation, partitionName, files, statistics)) + new PartitionAndMore(partition, currentLocation, partitionName, files, statistics, + hivePartitionUpdate)) ); return; case ADD: @@ -967,8 +1027,9 @@ public synchronized void addPartition( case INSERT_EXISTING: case MERGE: throw new RuntimeException( - "Partition already exists for table: " - + databaseName + "." + tableName + ", partition values: " + partition.getPartitionValues()); + "Partition already exists for table: " + + databaseName + "." + tableName + ", partition values: " + + partition.getPartitionValues()); default: throw new IllegalStateException("Unknown action type: " + oldPartitionAction.getType()); } @@ -1029,7 +1090,7 @@ class HmsCommitter { private final List renameDirectoryTasksForAbort = new ArrayList<>(); // when finished, we need clear some directories private final List clearDirsForFinish = new ArrayList<>(); - Executor fileSystemExecutor = Executors.newFixedThreadPool(16); + private ExecutorService fileSystemExecutor = Executors.newFixedThreadPool(16); public void cancelUnStartedAsyncFileSystemTask() { fileSystemTaskCancelled.set(true); @@ -1053,9 +1114,6 @@ private void undoUpdateStatisticsTasks() { } private void undoAddPartitionsTask() { - if (addPartitionsTask.isEmpty()) { - return; - } HivePartition firstPartition = addPartitionsTask.getPartitions().get(0).getPartition(); String dbName = firstPartition.getDbName(); @@ -1091,15 +1149,20 @@ public void prepareInsertExistingTable(TableAndMore tableAndMore) { writePath, targetPath, tableAndMore.getFileNames()); + } else { + if (!tableAndMore.hivePartitionUpdate.s3_mpu_pending_uploads.isEmpty()) { + s3Commit(fileSystemExecutor, asyncFileSystemTaskFutures, fileSystemTaskCancelled, + tableAndMore.hivePartitionUpdate, targetPath); + } } directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, false)); updateStatisticsTasks.add( - new UpdateStatisticsTask( - dbName, - tbName, - Optional.empty(), - tableAndMore.getStatisticsUpdate(), - true + new UpdateStatisticsTask( + dbName, + tbName, + Optional.empty(), + tableAndMore.getStatisticsUpdate(), + true )); } @@ -1116,7 +1179,7 @@ public void prepareAlterTable(TableAndMore tableAndMore) { () -> renameDirectoryTasksForAbort.add(new RenameDirectoryTask(oldTablePath, targetPath))); if (!status.ok()) { throw new RuntimeException( - "Error to rename dir from " + targetPath + " to " + oldTablePath + status.getErrMsg()); + "Error to rename dir from " + targetPath + " to " + oldTablePath + status.getErrMsg()); } clearDirsForFinish.add(oldTablePath); @@ -1154,6 +1217,11 @@ public void prepareAddPartition(PartitionAndMore partitionAndMore) { writePath, targetPath, () -> directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, true))); + } else { + if (!partitionAndMore.hivePartitionUpdate.s3_mpu_pending_uploads.isEmpty()) { + s3Commit(fileSystemExecutor, asyncFileSystemTaskFutures, fileSystemTaskCancelled, + partitionAndMore.hivePartitionUpdate, targetPath); + } } StorageDescriptor sd = getTable(dbName, tbName).getSd(); @@ -1194,6 +1262,11 @@ public void prepareInsertExistPartition(PartitionAndMore partitionAndMore) { writePath, targetPath, partitionAndMore.getFileNames()); + } else { + if (!partitionAndMore.hivePartitionUpdate.s3_mpu_pending_uploads.isEmpty()) { + s3Commit(fileSystemExecutor, asyncFileSystemTaskFutures, fileSystemTaskCancelled, + partitionAndMore.hivePartitionUpdate, targetPath); + } } updateStatisticsTasks.add( @@ -1263,6 +1336,11 @@ public void prepareAlterPartition(PartitionAndMore partitionAndMore) { throw new RuntimeException( "Error to rename dir from " + writePath + " to " + targetPath + ":" + status.getErrMsg()); } + } else { + if (!partitionAndMore.hivePartitionUpdate.s3_mpu_pending_uploads.isEmpty()) { + s3Commit(fileSystemExecutor, asyncFileSystemTaskFutures, fileSystemTaskCancelled, + partitionAndMore.hivePartitionUpdate, targetPath); + } } updateStatisticsTasks.add( @@ -1337,10 +1415,34 @@ private void doUpdateStatisticsTasks() { summaryProfile.ifPresent(SummaryProfile::setHmsUpdatePartitionTime); } - public void pruneAndDeleteStagingDirectories() { + private void pruneAndDeleteStagingDirectories() { recursiveDeleteItems(new Path(declaredIntentionsToWrite), true); } + private void abortMultiUploads() { + if (uncompletedMpuPendingUploads.isEmpty()) { + return; + } + for (UncompletedMpuPendingUpload uncompletedMpuPendingUpload : uncompletedMpuPendingUploads) { + S3FileSystem s3FileSystem = (S3FileSystem) ((SwitchingFileSystem) fs) + .fileSystem(uncompletedMpuPendingUpload.path); + + S3Client s3Client; + try { + s3Client = (S3Client) s3FileSystem.getObjStorage().getClient(); + } catch (UserException e) { + throw new RuntimeException(e); + } + asyncFileSystemTaskFutures.add(CompletableFuture.runAsync(() -> { + s3Client.abortMultipartUpload(AbortMultipartUploadRequest.builder() + .bucket(uncompletedMpuPendingUpload.s3MPUPendingUpload.getBucket()) + .key(uncompletedMpuPendingUpload.s3MPUPendingUpload.getKey()) + .uploadId(uncompletedMpuPendingUpload.s3MPUPendingUpload.getUploadId()) + .build()); + }, fileSystemExecutor)); + } + } + public void doNothing() { // do nothing // only for regression test and unit test to throw exception @@ -1365,6 +1467,11 @@ public void abort() { public void rollback() { //delete write path pruneAndDeleteStagingDirectories(); + // abort the in-progress multipart uploads + abortMultiUploads(); + for (CompletableFuture future : asyncFileSystemTaskFutures) { + MoreFutures.getFutureValue(future, RuntimeException.class); + } } } @@ -1415,4 +1522,36 @@ public void wrapperAsyncRenameDirWithProfileSummary(Executor executor, fs, executor, renameFileFutures, cancelled, origFilePath, destFilePath, runWhenPathNotExist); summaryProfile.ifPresent(SummaryProfile::incRenameDirCnt); } + + private void s3Commit(ExecutorService fileSystemExecutor, List> asyncFileSystemTaskFutures, + AtomicBoolean fileSystemTaskCancelled, THivePartitionUpdate hivePartitionUpdate, String path) { + S3FileSystem s3FileSystem = (S3FileSystem) ((SwitchingFileSystem) fs).fileSystem(path); + S3Client s3Client; + try { + s3Client = (S3Client) s3FileSystem.getObjStorage().getClient(); + } catch (UserException e) { + throw new RuntimeException(e); + } + + for (TS3MPUPendingUpload s3MPUPendingUpload : hivePartitionUpdate.getS3MpuPendingUploads()) { + asyncFileSystemTaskFutures.add(CompletableFuture.runAsync(() -> { + if (fileSystemTaskCancelled.get()) { + return; + } + List completedParts = Lists.newArrayList(); + for (Map.Entry entry : s3MPUPendingUpload.getEtags().entrySet()) { + completedParts.add(CompletedPart.builder().eTag(entry.getValue()).partNumber(entry.getKey()) + .build()); + } + + s3Client.completeMultipartUpload(CompleteMultipartUploadRequest.builder() + .bucket(s3MPUPendingUpload.getBucket()) + .key(s3MPUPendingUpload.getKey()) + .uploadId(s3MPUPendingUpload.getUploadId()) + .multipartUpload(CompletedMultipartUpload.builder().parts(completedParts).build()) + .build()); + uncompletedMpuPendingUploads.remove(new UncompletedMpuPendingUpload(s3MPUPendingUpload, path)); + }, fileSystemExecutor)); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index fcebd67954e84f8..be5ecb163b16290 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -349,9 +349,11 @@ private FileCacheValue getFileCache(String location, String inputFormat, List partitionValues, String bindBrokerName) throws UserException { FileCacheValue result = new FileCacheValue(); + Map properties = new HashMap<>(); + jobConf.iterator().forEachRemaining(e -> properties.put(e.getKey(), e.getValue())); RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( new FileSystemCache.FileSystemCacheKey(LocationPath.getFSIdentity( - location, bindBrokerName), jobConf, bindBrokerName)); + location, bindBrokerName), properties, bindBrokerName)); result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, location, jobConf)); // For Tez engine, it may generate subdirectoies for "union" query. // So there may be files and directories in the table directory at the same time. eg: @@ -366,7 +368,7 @@ private FileCacheValue getFileCache(String location, String inputFormat, for (RemoteFile remoteFile : remoteFiles) { String srcPath = remoteFile.getPath().toString(); LocationPath locationPath = new LocationPath(srcPath, catalog.getProperties()); - Path convertedPath = locationPath.toScanRangeLocation(); + Path convertedPath = locationPath.toStorageLocation(); if (!convertedPath.toString().equals(srcPath)) { remoteFile.setPath(convertedPath); } @@ -777,10 +779,12 @@ public List getFilesByTransaction(List partitions return Collections.emptyList(); } String acidVersionPath = new Path(baseOrDeltaPath, "_orc_acid_version").toUri().toString(); + Map properties = new HashMap<>(); + jobConf.iterator().forEachRemaining(e -> properties.put(e.getKey(), e.getValue())); RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( new FileSystemCache.FileSystemCacheKey( LocationPath.getFSIdentity(baseOrDeltaPath.toUri().toString(), - bindBrokerName), jobConf, bindBrokerName)); + bindBrokerName), properties, bindBrokerName)); Status status = fs.exists(acidVersionPath); if (status != Status.OK) { if (status.getErrCode() == ErrCode.NOT_FOUND) { @@ -800,10 +804,12 @@ public List getFilesByTransaction(List partitions List deleteDeltas = new ArrayList<>(); for (AcidUtils.ParsedDelta delta : directory.getCurrentDirectories()) { String location = delta.getPath().toString(); + Map properties = new HashMap<>(); + jobConf.iterator().forEachRemaining(e -> properties.put(e.getKey(), e.getValue())); RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( new FileSystemCache.FileSystemCacheKey( LocationPath.getFSIdentity(location, bindBrokerName), - jobConf, bindBrokerName)); + properties, bindBrokerName)); List remoteFiles = new ArrayList<>(); Status status = fs.listFiles(location, false, remoteFiles); if (status.ok()) { @@ -825,10 +831,12 @@ public List getFilesByTransaction(List partitions // base if (directory.getBaseDirectory() != null) { String location = directory.getBaseDirectory().toString(); + Map properties = new HashMap<>(); + jobConf.iterator().forEachRemaining(e -> properties.put(e.getKey(), e.getValue())); RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( new FileSystemCache.FileSystemCacheKey( LocationPath.getFSIdentity(location, bindBrokerName), - jobConf, bindBrokerName)); + properties, bindBrokerName)); List remoteFiles = new ArrayList<>(); Status status = fs.listFiles(location, false, remoteFiles); if (status.ok()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java index a4566cd0b7a03cd..3f454a7895f46bf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java @@ -36,8 +36,6 @@ import org.apache.doris.datasource.jdbc.client.JdbcClient; import org.apache.doris.datasource.jdbc.client.JdbcClientConfig; import org.apache.doris.datasource.operations.ExternalMetadataOps; -import org.apache.doris.fs.FileSystem; -import org.apache.doris.fs.remote.dfs.DFSFileSystem; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -61,7 +59,6 @@ public class HiveMetadataOps implements ExternalMetadataOps { private static final Logger LOG = LogManager.getLogger(HiveMetadataOps.class); private static final int MIN_CLIENT_POOL_SIZE = 8; private final HMSCachedClient client; - private final FileSystem fs; private final HMSExternalCatalog catalog; public HiveMetadataOps(HiveConf hiveConf, JdbcClientConfig jdbcClientConfig, HMSExternalCatalog catalog) { @@ -75,23 +72,22 @@ public HiveMetadataOps(HMSExternalCatalog catalog, HMSCachedClient client) { this.catalog = catalog; this.client = client; // TODO Currently only supports DFSFileSystem, more types will be supported in the future - this.fs = new DFSFileSystem(catalog.getProperties()); + // this.fs = new DFSFileSystem(catalog.getProperties()); } - @VisibleForTesting - public HiveMetadataOps(HMSExternalCatalog catalog, HMSCachedClient client, FileSystem fs) { - this.catalog = catalog; - this.client = client; - this.fs = fs; - } + // for test + // public HiveMetadataOps(HMSExternalCatalog catalog, HMSCachedClient client) { + // this.catalog = catalog; + // this.client = client; + // } public HMSCachedClient getClient() { return client; } - public FileSystem getFs() { - return fs; + public HMSExternalCatalog getCatalog() { + return catalog; } public static HMSCachedClient createCachedClient(HiveConf hiveConf, int thriftClientPoolSize, diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java index eb1d77a322dfc24..94748e7e4273fac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java @@ -354,7 +354,7 @@ public List getSplits() throws UserException { long fileSize = baseFile.getFileSize(); // Need add hdfs host to location LocationPath locationPath = new LocationPath(filePath, hmsTable.getCatalogProperties()); - Path splitFilePath = locationPath.toScanRangeLocation(); + Path splitFilePath = locationPath.toStorageLocation(); splits.add(new FileSplit(splitFilePath, 0, fileSize, fileSize, new String[0], partition.getPartitionValues())); }); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFCatalog.java index 24f2df5acdc227f..ca5ccd5f359befe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFCatalog.java @@ -21,6 +21,7 @@ import org.apache.doris.datasource.credentials.CloudCredential; import org.apache.doris.datasource.iceberg.HiveCompatibleCatalog; import org.apache.doris.datasource.iceberg.dlf.client.DLFCachedClientPool; +import org.apache.doris.datasource.property.PropertyConverter; import org.apache.doris.datasource.property.constants.OssProperties; import org.apache.doris.datasource.property.constants.S3Properties; @@ -53,19 +54,21 @@ protected FileIO initializeFileIO(Map properties, Configuration String endpoint = properties.getOrDefault(Constants.ENDPOINT_KEY, properties.get(S3Properties.Env.ENDPOINT)); CloudCredential credential = new CloudCredential(); credential.setAccessKey(properties.getOrDefault(OssProperties.ACCESS_KEY, - properties.get(S3Properties.Env.ACCESS_KEY))); + properties.get(S3Properties.Env.ACCESS_KEY))); credential.setSecretKey(properties.getOrDefault(OssProperties.SECRET_KEY, - properties.get(S3Properties.Env.SECRET_KEY))); + properties.get(S3Properties.Env.SECRET_KEY))); if (properties.containsKey(OssProperties.SESSION_TOKEN) || properties.containsKey(S3Properties.Env.TOKEN)) { credential.setSessionToken(properties.getOrDefault(OssProperties.SESSION_TOKEN, properties.get(S3Properties.Env.TOKEN))); } String region = properties.getOrDefault(OssProperties.REGION, properties.get(S3Properties.Env.REGION)); + boolean isUsePathStyle = properties.getOrDefault(PropertyConverter.USE_PATH_STYLE, "false") + .equalsIgnoreCase("true"); // s3 file io just supports s3-like endpoint String s3Endpoint = endpoint.replace(region, "s3." + region); URI endpointUri = URI.create(s3Endpoint); - FileIO io = new S3FileIO(() -> S3Util.buildS3Client(endpointUri, region, credential)); + FileIO io = new S3FileIO(() -> S3Util.buildS3Client(endpointUri, region, credential, isUsePathStyle)); io.initialize(properties); return io; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index 961fb8ae1d66798..21826dfd8d53eba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -150,7 +150,7 @@ public void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit icebergSplit TIcebergDeleteFileDesc deleteFileDesc = new TIcebergDeleteFileDesc(); String deleteFilePath = filter.getDeleteFilePath(); LocationPath locationPath = new LocationPath(deleteFilePath, icebergSplit.getConfig()); - Path splitDeletePath = locationPath.toScanRangeLocation(); + Path splitDeletePath = locationPath.toStorageLocation(); deleteFileDesc.setPath(splitDeletePath.toString()); if (filter instanceof IcebergDeleteFileFilter.PositionDelete) { fileDesc.setContent(FileContent.POSITION_DELETES.id()); @@ -244,7 +244,7 @@ private List doGetSplits() throws UserException { partitionPathSet.add(structLike.toString()); } LocationPath locationPath = new LocationPath(dataFilePath, source.getCatalog().getProperties()); - Path finalDataFilePath = locationPath.toScanRangeLocation(); + Path finalDataFilePath = locationPath.toStorageLocation(); IcebergSplit split = new IcebergSplit( finalDataFilePath, splitTask.start(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index 733565e7063a672..681136f24b3e9c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -161,7 +161,7 @@ public List getSplits() throws UserException { List rawFiles = optRawFiles.get(); for (RawFile file : rawFiles) { LocationPath locationPath = new LocationPath(file.path(), source.getCatalog().getProperties()); - Path finalDataFilePath = locationPath.toScanRangeLocation(); + Path finalDataFilePath = locationPath.toStorageLocation(); try { splits.addAll( splitFile( diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java index b385fb838b3689c..bccd3147a9b5eaa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java @@ -58,6 +58,9 @@ public class PropertyConverter { private static final Logger LOG = LogManager.getLogger(PropertyConverter.class); public static final String USE_PATH_STYLE = "use_path_style"; + public static final String USE_PATH_STYLE_DEFAULT_VALUE = "false"; + public static final String FORCE_PARSING_BY_STANDARD_URI = "force_parsing_by_standard_uri"; + public static final String FORCE_PARSING_BY_STANDARD_URI_DEFAULT_VALUE = "false"; /** * Convert properties defined at doris to metadata properties on Cloud diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java index 149bbe2d378817d..dd66c359b9d410c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java @@ -23,8 +23,8 @@ import org.apache.doris.fs.remote.RemoteFileSystem; import com.github.benmanes.caffeine.cache.LoadingCache; -import org.apache.hadoop.mapred.JobConf; +import java.util.Map; import java.util.Objects; import java.util.OptionalLong; @@ -44,7 +44,7 @@ public FileSystemCache() { } private RemoteFileSystem loadFileSystem(FileSystemCacheKey key) { - return FileSystemFactory.getRemoteFileSystem(key.type, key.conf, key.bindBrokerName); + return FileSystemFactory.getRemoteFileSystem(key.type, key.properties, key.bindBrokerName); } public RemoteFileSystem getRemoteFileSystem(FileSystemCacheKey key) { @@ -55,13 +55,14 @@ public static class FileSystemCacheKey { private final FileSystemType type; // eg: hdfs://nameservices1 private final String fsIdent; - private final JobConf conf; + private final Map properties; private final String bindBrokerName; - public FileSystemCacheKey(Pair fs, JobConf conf, String bindBrokerName) { + public FileSystemCacheKey(Pair fs, + Map properties, String bindBrokerName) { this.type = fs.first; this.fsIdent = fs.second; - this.conf = conf; + this.properties = properties; this.bindBrokerName = bindBrokerName; } @@ -75,7 +76,7 @@ public boolean equals(Object obj) { } boolean equalsWithoutBroker = type.equals(((FileSystemCacheKey) obj).type) && fsIdent.equals(((FileSystemCacheKey) obj).fsIdent) - && conf == ((FileSystemCacheKey) obj).conf; + && properties == ((FileSystemCacheKey) obj).properties; if (bindBrokerName == null) { return equalsWithoutBroker; } @@ -85,9 +86,9 @@ public boolean equals(Object obj) { @Override public int hashCode() { if (bindBrokerName == null) { - return Objects.hash(conf, fsIdent, type); + return Objects.hash(properties, fsIdent, type); } - return Objects.hash(conf, fsIdent, type, bindBrokerName); + return Objects.hash(properties, fsIdent, type, bindBrokerName); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java index 63f552a8ab82b8a..cd7212c8e391fbe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java @@ -29,7 +29,6 @@ import org.apache.hadoop.fs.Path; import java.io.IOException; -import java.util.HashMap; import java.util.Map; public class FileSystemFactory { @@ -51,10 +50,8 @@ public static RemoteFileSystem get(String name, StorageBackend.StorageType type, } } - public static RemoteFileSystem getRemoteFileSystem(FileSystemType type, Configuration conf, + public static RemoteFileSystem getRemoteFileSystem(FileSystemType type, Map properties, String bindBrokerName) { - Map properties = new HashMap<>(); - conf.iterator().forEachRemaining(e -> properties.put(e.getKey(), e.getValue())); switch (type) { case S3: return new S3FileSystem(properties); diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemProvider.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemProvider.java new file mode 100644 index 000000000000000..aab7471fd99c0aa --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemProvider.java @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs; + +import org.apache.doris.datasource.SessionContext; + +public interface FileSystemProvider { + FileSystem get(SessionContext ctx); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemProviderImpl.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemProviderImpl.java new file mode 100644 index 000000000000000..680592ab4a87192 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemProviderImpl.java @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs; + +import org.apache.doris.datasource.ExternalMetaCacheMgr; +import org.apache.doris.datasource.SessionContext; +import org.apache.doris.fs.remote.SwitchingFileSystem; + +import java.util.Map; + +public class FileSystemProviderImpl implements FileSystemProvider { + private ExternalMetaCacheMgr extMetaCacheMgr; + private String bindBrokerName; + + private Map properties; + + public FileSystemProviderImpl(ExternalMetaCacheMgr extMetaCacheMgr, String bindBrokerName, + Map properties) { + this.extMetaCacheMgr = extMetaCacheMgr; + this.bindBrokerName = bindBrokerName; + this.properties = properties; + } + + @Override + public FileSystem get(SessionContext ctx) { + return new SwitchingFileSystem(extMetaCacheMgr, bindBrokerName, properties); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/ObjStorage.java b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/ObjStorage.java index b964e3022ac1f40..3c4246d0fe78102 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/ObjStorage.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/ObjStorage.java @@ -32,7 +32,7 @@ * @param cloud SDK Client */ public interface ObjStorage { - C getClient(String bucket) throws UserException; + C getClient() throws UserException; Triple getStsToken() throws DdlException; diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java index d1e8e74b49a3cf2..11ec72923d01c6c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java @@ -29,7 +29,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Triple; import org.apache.http.HttpStatus; -import org.apache.http.client.utils.URIBuilder; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.Nullable; @@ -68,15 +67,9 @@ public class S3ObjStorage implements ObjStorage { protected Map properties; - // false: the s3 client will automatically convert endpoint to virtual-hosted style, eg: - // endpoint: http://s3.us-east-2.amazonaws.com - // bucket/path: my_bucket/file.txt - // auto convert: http://my_bucket.s3.us-east-2.amazonaws.com/file.txt - // true: the s3 client will NOT automatically convert endpoint to virtual-hosted style, we need to do some tricks: - // endpoint: http://cos.ap-beijing.myqcloud.com - // bucket/path: my_bucket/file.txt - // convert manually: See S3URI() - private boolean forceHostedStyle = false; + private boolean isUsePathStyle = false; + + private boolean forceParsingByStandardUri = false; public S3ObjStorage(Map properties) { this.properties = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); @@ -104,38 +97,27 @@ protected void setProperties(Map properties) { // Some of them, such as aliyun's oss, only support virtual hosted-style, // and some of them(ceph) may only support // path-style, so we need to do some additional conversion. - // - // use_path_style | !use_path_style - // S3 forceHostedStyle=false | forceHostedStyle=false - // !S3 forceHostedStyle=false | forceHostedStyle=true - // - // That is, for S3 endpoint, ignore the `use_path_style` property, and the s3 client will automatically use - // virtual hosted-sytle. - // And for other endpoint, if `use_path_style` is true, use path style. Otherwise, use virtual hosted-sytle. - // 'forceHostedStyle==false' means that use path style. - if (!this.properties.get(S3Properties.ENDPOINT).toLowerCase().contains(S3Properties.S3_PREFIX)) { - String usePathStyle = this.properties.getOrDefault(PropertyConverter.USE_PATH_STYLE, "false"); - boolean isUsePathStyle = usePathStyle.equalsIgnoreCase("true"); - // when it's path style, we will not use virtual hosted-style - forceHostedStyle = !isUsePathStyle; - } else { - forceHostedStyle = false; - } + isUsePathStyle = this.properties.getOrDefault(PropertyConverter.USE_PATH_STYLE, "false") + .equalsIgnoreCase("true"); + forceParsingByStandardUri = this.properties.getOrDefault(PropertyConverter.FORCE_PARSING_BY_STANDARD_URI, + "false").equalsIgnoreCase("true"); } @Override - public S3Client getClient(String bucket) throws UserException { + public S3Client getClient() throws UserException { if (client == null) { - URI tmpEndpoint = URI.create(properties.get(S3Properties.ENDPOINT)); - URI endpoint = StringUtils.isEmpty(bucket) ? tmpEndpoint : - URI.create(new URIBuilder(tmpEndpoint).setHost(bucket + "." + tmpEndpoint.getHost()).toString()); + String endpointStr = properties.get(S3Properties.ENDPOINT); + if (!endpointStr.contains("://")) { + endpointStr = "http://" + endpointStr; + } + URI endpoint = URI.create(endpointStr); CloudCredential credential = new CloudCredential(); credential.setAccessKey(properties.get(S3Properties.ACCESS_KEY)); credential.setSecretKey(properties.get(S3Properties.SECRET_KEY)); if (properties.containsKey(S3Properties.SESSION_TOKEN)) { credential.setSessionToken(properties.get(S3Properties.SESSION_TOKEN)); } - client = S3Util.buildS3Client(endpoint, properties.get(S3Properties.REGION), credential); + client = S3Util.buildS3Client(endpoint, properties.get(S3Properties.REGION), credential, isUsePathStyle); } return client; } @@ -148,8 +130,8 @@ public Triple getStsToken() throws DdlException { @Override public Status headObject(String remotePath) { try { - S3URI uri = S3URI.create(remotePath, forceHostedStyle); - HeadObjectResponse response = getClient(uri.getVirtualBucket()) + S3URI uri = S3URI.create(remotePath, isUsePathStyle, forceParsingByStandardUri); + HeadObjectResponse response = getClient() .headObject(HeadObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build()); LOG.info("head file " + remotePath + " success: " + response.toString()); return Status.OK; @@ -169,8 +151,8 @@ public Status headObject(String remotePath) { @Override public Status getObject(String remoteFilePath, File localFile) { try { - S3URI uri = S3URI.create(remoteFilePath, forceHostedStyle); - GetObjectResponse response = getClient(uri.getVirtualBucket()).getObject( + S3URI uri = S3URI.create(remoteFilePath, isUsePathStyle, forceParsingByStandardUri); + GetObjectResponse response = getClient().getObject( GetObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(), localFile.toPath()); LOG.info("get file " + remoteFilePath + " success: " + response.toString()); return Status.OK; @@ -189,9 +171,9 @@ public Status getObject(String remoteFilePath, File localFile) { @Override public Status putObject(String remotePath, @Nullable RequestBody requestBody) { try { - S3URI uri = S3URI.create(remotePath, forceHostedStyle); + S3URI uri = S3URI.create(remotePath, isUsePathStyle, forceParsingByStandardUri); PutObjectResponse response = - getClient(uri.getVirtualBucket()) + getClient() .putObject( PutObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(), requestBody); @@ -209,9 +191,9 @@ public Status putObject(String remotePath, @Nullable RequestBody requestBody) { @Override public Status deleteObject(String remotePath) { try { - S3URI uri = S3URI.create(remotePath, forceHostedStyle); + S3URI uri = S3URI.create(remotePath, isUsePathStyle, forceParsingByStandardUri); DeleteObjectResponse response = - getClient(uri.getVirtualBucket()) + getClient() .deleteObject( DeleteObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build()); LOG.info("delete file " + remotePath + " success: " + response.toString()); @@ -231,7 +213,7 @@ public Status deleteObject(String remotePath) { @Override public Status deleteObjects(String absolutePath) { try { - S3URI baseUri = S3URI.create(absolutePath, forceHostedStyle); + S3URI baseUri = S3URI.create(absolutePath, isUsePathStyle, forceParsingByStandardUri); String continuationToken = ""; boolean isTruncated = false; long totalObjects = 0; @@ -250,7 +232,7 @@ public Status deleteObjects(String absolutePath) { .delete(delete) .build(); - DeleteObjectsResponse resp = getClient(baseUri.getVirtualBucket()).deleteObjects(req); + DeleteObjectsResponse resp = getClient().deleteObjects(req); if (resp.errors().size() > 0) { LOG.warn("{} errors returned while deleting {} objects for dir {}", resp.errors().size(), objectList.size(), absolutePath); @@ -268,7 +250,7 @@ public Status deleteObjects(String absolutePath) { } catch (DdlException e) { return new Status(Status.ErrCode.COMMON_ERROR, "list objects for delete objects failed: " + e.getMessage()); } catch (Exception e) { - LOG.warn("delete objects {} failed, force visual host style {}", absolutePath, e, forceHostedStyle); + LOG.warn(String.format("delete objects %s failed", absolutePath), e); return new Status(Status.ErrCode.COMMON_ERROR, "delete objects failed: " + e.getMessage()); } } @@ -276,9 +258,9 @@ public Status deleteObjects(String absolutePath) { @Override public Status copyObject(String origFilePath, String destFilePath) { try { - S3URI origUri = S3URI.create(origFilePath); - S3URI descUri = S3URI.create(destFilePath, forceHostedStyle); - CopyObjectResponse response = getClient(descUri.getVirtualBucket()) + S3URI origUri = S3URI.create(origFilePath, isUsePathStyle, forceParsingByStandardUri); + S3URI descUri = S3URI.create(destFilePath, isUsePathStyle, forceParsingByStandardUri); + CopyObjectResponse response = getClient() .copyObject( CopyObjectRequest.builder() .copySource(origUri.getBucket() + "/" + origUri.getKey()) @@ -299,31 +281,16 @@ public Status copyObject(String origFilePath, String destFilePath) { @Override public RemoteObjects listObjects(String absolutePath, String continuationToken) throws DdlException { try { - S3URI uri = S3URI.create(absolutePath, forceHostedStyle); + S3URI uri = S3URI.create(absolutePath, isUsePathStyle, forceParsingByStandardUri); String bucket = uri.getBucket(); String prefix = uri.getKey(); - if (!StringUtils.isEmpty(uri.getVirtualBucket())) { - // Support s3 compatible service. The generated HTTP request for list objects likes: - // - // GET /?list-type=2&prefix= - prefix = bucket + "/" + prefix; - String endpoint = properties.get(S3Properties.ENDPOINT); - if (endpoint.contains("cos.")) { - bucket = "/"; - } else if (endpoint.contains("oss-")) { - bucket = uri.getVirtualBucket(); - } else if (endpoint.contains("obs.")) { - // FIXME: unlike cos and oss, the obs will report 'The specified key does not exist'. - throw new DdlException("obs does not support list objects via s3 sdk. path: " + absolutePath); - } - } ListObjectsV2Request.Builder requestBuilder = ListObjectsV2Request.builder() .bucket(bucket) .prefix(normalizePrefix(prefix)); if (!StringUtils.isEmpty(continuationToken)) { requestBuilder.continuationToken(continuationToken); } - ListObjectsV2Response response = getClient(uri.getVirtualBucket()).listObjectsV2(requestBuilder.build()); + ListObjectsV2Response response = getClient().listObjectsV2(requestBuilder.build()); List remoteObjects = new ArrayList<>(); for (S3Object c : response.contents()) { String relativePath = getRelativePath(prefix, c.key()); @@ -331,7 +298,7 @@ public RemoteObjects listObjects(String absolutePath, String continuationToken) } return new RemoteObjects(remoteObjects, response.isTruncated(), response.nextContinuationToken()); } catch (Exception e) { - LOG.warn("Failed to list objects for S3: {}", absolutePath, e); + LOG.warn(String.format("Failed to list objects for S3: %s", absolutePath), e); throw new DdlException("Failed to list objects for S3, Error message: " + e.getMessage(), e); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/ObjFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/ObjFileSystem.java index 72b75350140eebf..63e535361853c06 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/ObjFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/ObjFileSystem.java @@ -43,6 +43,10 @@ public ObjFileSystem(String name, StorageBackend.StorageType type, ObjStorage this.objStorage = objStorage; } + public ObjStorage getObjStorage() { + return objStorage; + } + @Override public Status exists(String remotePath) { return objStorage.headObject(remotePath); diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/SwitchingFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/SwitchingFileSystem.java new file mode 100644 index 000000000000000..36aad8818bf4084 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/SwitchingFileSystem.java @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs.remote; + +import org.apache.doris.backup.Status; +import org.apache.doris.common.util.LocationPath; +import org.apache.doris.datasource.ExternalMetaCacheMgr; +import org.apache.doris.fs.FileSystem; +import org.apache.doris.fs.FileSystemCache; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class SwitchingFileSystem implements FileSystem { + + private final ExternalMetaCacheMgr extMetaCacheMgr; + + private final String bindBrokerName; + + private final Map properties; + + public SwitchingFileSystem(ExternalMetaCacheMgr extMetaCacheMgr, String bindBrokerName, + Map properties) { + this.extMetaCacheMgr = extMetaCacheMgr; + this.bindBrokerName = bindBrokerName; + this.properties = properties; + } + + @Override + public Map getProperties() { + return properties; + } + + @Override + public Status exists(String remotePath) { + return fileSystem(remotePath).exists(remotePath); + } + + @Override + public Status downloadWithFileSize(String remoteFilePath, String localFilePath, long fileSize) { + return fileSystem(remoteFilePath).downloadWithFileSize(remoteFilePath, localFilePath, fileSize); + } + + @Override + public Status upload(String localPath, String remotePath) { + return fileSystem(localPath).upload(localPath, remotePath); + } + + @Override + public Status directUpload(String content, String remoteFile) { + return fileSystem(remoteFile).directUpload(content, remoteFile); + } + + @Override + public Status rename(String origFilePath, String destFilePath) { + return fileSystem(origFilePath).rename(origFilePath, destFilePath); + } + + @Override + public Status delete(String remotePath) { + return fileSystem(remotePath).delete(remotePath); + } + + @Override + public Status makeDir(String remotePath) { + return fileSystem(remotePath).makeDir(remotePath); + } + + @Override + public Status listFiles(String remotePath, boolean recursive, List result) { + return fileSystem(remotePath).listFiles(remotePath, recursive, result); + } + + @Override + public Status globList(String remotePath, List result) { + return fileSystem(remotePath).globList(remotePath, result); + } + + @Override + public Status globList(String remotePath, List result, boolean fileNameOnly) { + return fileSystem(remotePath).globList(remotePath, result, fileNameOnly); + } + + @Override + public Status listDirectories(String remotePath, Set result) { + return fileSystem(remotePath).listDirectories(remotePath, result); + } + + public FileSystem fileSystem(String location) { + return extMetaCacheMgr.getFsCache().getRemoteFileSystem( + new FileSystemCache.FileSystemCacheKey( + LocationPath.getFSIdentity(location, + bindBrokerName), properties, bindBrokerName)); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java index 1944647a90dd0fe..f8dca0a196afe78 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java @@ -33,6 +33,7 @@ import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TFileCompressType; import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.THiveBucket; import org.apache.doris.thrift.THiveColumn; import org.apache.doris.thrift.THiveColumnType; @@ -128,21 +129,30 @@ public void bindDataSink(List insertCols, Optional setCompressType(tSink, formatType); THiveLocationParams locationParams = new THiveLocationParams(); - String location = sd.getLocation(); - - String writeTempPath = createTempPath(location); - locationParams.setWritePath(writeTempPath); - locationParams.setTargetPath(location); - locationParams.setFileType(LocationPath.getTFileTypeForBE(location)); + LocationPath locationPath = new LocationPath(sd.getLocation(), targetTable.getHadoopProperties()); + String location = locationPath.toString(); + String storageLocation = locationPath.toStorageLocation().toString(); + TFileType fileType = locationPath.getTFileTypeForBE(); + if (fileType == TFileType.FILE_S3) { + locationParams.setWritePath(storageLocation); + locationParams.setOriginalWritePath(location); + locationParams.setTargetPath(location); + } else { + String writeTempPath = createTempPath(location); + locationParams.setWritePath(writeTempPath); + locationParams.setOriginalWritePath(writeTempPath); + locationParams.setTargetPath(location); + if (insertCtx.isPresent()) { + HiveInsertCommandContext context = (HiveInsertCommandContext) insertCtx.get(); + tSink.setOverwrite(context.isOverwrite()); + context.setWritePath(writeTempPath); + } + } + locationParams.setFileType(fileType); tSink.setLocation(locationParams); tSink.setHadoopConfig(targetTable.getHadoopProperties()); - if (insertCtx.isPresent()) { - HiveInsertCommandContext context = (HiveInsertCommandContext) insertCtx.get(); - tSink.setOverwrite(context.isOverwrite()); - context.setWritePath(writeTempPath); - } tDataSink = new TDataSink(getDataSinkType()); tDataSink.setHiveTableSink(tSink); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java index 6141222246c6db3..44cbd482263d050 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java @@ -54,8 +54,6 @@ public class S3TableValuedFunction extends ExternalFileTableValuedFunction { ImmutableSet.of("access_key", "secret_key", "session_token", "region", "ACCESS_KEY", "SECRET_KEY", "SESSION_TOKEN", "REGION"); - private String virtualBucket = ""; - public S3TableValuedFunction(Map properties) throws AnalysisException { // 1. analyze common properties Map otherProps = super.parseCommonProperties(properties); @@ -67,13 +65,20 @@ public S3TableValuedFunction(Map properties) throws AnalysisExce } forwardCompatibleDeprecatedKeys(otherProps); - String usePathStyle = getOrDefaultAndRemove(otherProps, PropertyConverter.USE_PATH_STYLE, "false"); - boolean forceVirtualHosted = isVirtualHosted(uriStr, Boolean.parseBoolean(usePathStyle)); - S3URI s3uri = getS3Uri(uriStr, forceVirtualHosted); - String endpoint = forceVirtualHosted - ? getEndpointAndSetVirtualBucket(s3uri, otherProps) : s3uri.getBucketScheme(); + String usePathStyle = getOrDefaultAndRemove(otherProps, PropertyConverter.USE_PATH_STYLE, + PropertyConverter.USE_PATH_STYLE_DEFAULT_VALUE); + String forceParsingByStandardUri = getOrDefaultAndRemove(otherProps, + PropertyConverter.FORCE_PARSING_BY_STANDARD_URI, + PropertyConverter.FORCE_PARSING_BY_STANDARD_URI_DEFAULT_VALUE); + + S3URI s3uri = getS3Uri(uriStr, Boolean.parseBoolean(usePathStyle.toLowerCase()), + Boolean.parseBoolean(forceParsingByStandardUri.toLowerCase())); + String endpoint = otherProps.containsKey(S3Properties.ENDPOINT) ? otherProps.get(S3Properties.ENDPOINT) : + s3uri.getEndpoint().orElseThrow(() -> + new AnalysisException(String.format("Properties '%s' is required.", S3Properties.ENDPOINT))); if (!otherProps.containsKey(S3Properties.REGION)) { - String region = S3Properties.getRegionOfEndpoint(endpoint); + String region = s3uri.getRegion().orElseThrow(() -> + new AnalysisException(String.format("Properties '%s' is required.", S3Properties.REGION))); otherProps.put(S3Properties.REGION, region); } checkNecessaryS3Properties(otherProps); @@ -89,12 +94,7 @@ public S3TableValuedFunction(Map properties) throws AnalysisExce locationProperties.put(PropertyConverter.USE_PATH_STYLE, usePathStyle); locationProperties.putAll(S3ClientBEProperties.getBeFSProperties(locationProperties)); - if (forceVirtualHosted) { - filePath = NAME + S3URI.SCHEME_DELIM + virtualBucket + S3URI.PATH_DELIM - + s3uri.getBucket() + S3URI.PATH_DELIM + s3uri.getKey(); - } else { - filePath = NAME + S3URI.SCHEME_DELIM + s3uri.getKey(); - } + filePath = NAME + S3URI.SCHEME_DELIM + s3uri.getBucket() + S3URI.PATH_DELIM + s3uri.getKey(); if (FeConstants.runningUnitTest) { // Just check @@ -120,36 +120,9 @@ private void checkNecessaryS3Properties(Map props) throws Analys // do not check ak and sk, because we can read them from system environment. } - private String getEndpointAndSetVirtualBucket(S3URI s3uri, Map props) - throws AnalysisException { - String[] fields = s3uri.getVirtualBucket().split("\\.", 2); - virtualBucket = fields[0]; - if (fields.length > 1) { - // At this point, s3uri.getVirtualBucket() is: virtualBucket.endpoint, Eg: - // uri: http://my_bucket.cos.ap-beijing.myqcloud.com/file.txt - // s3uri.getVirtualBucket() = my_bucket.cos.ap-beijing.myqcloud.com, - // so we need separate virtualBucket and endpoint. - return fields[1]; - } else if (props.containsKey(S3Properties.ENDPOINT)) { - return props.get(S3Properties.ENDPOINT); - } else { - throw new AnalysisException("can not parse endpoint, please check uri."); - } - } - - private boolean isVirtualHosted(String uri, boolean usePathStyle) { - if (uri.toLowerCase().startsWith("s3")) { - // s3 protocol, default virtual-hosted style - return true; - } else { - // not s3 protocol, forceVirtualHosted is determined by USE_PATH_STYLE. - return !usePathStyle; - } - } - - private S3URI getS3Uri(String uri, boolean forceVirtualHosted) throws AnalysisException { + private S3URI getS3Uri(String uri, boolean isPathStyle, boolean forceParsingStandardUri) throws AnalysisException { try { - return S3URI.create(uri, forceVirtualHosted); + return S3URI.create(uri, isPathStyle, forceParsingStandardUri); } catch (UserException e) { throw new AnalysisException("parse s3 uri failed, uri = " + uri, e); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/HiveTransactionManager.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/HiveTransactionManager.java index 2499cc6eba44724..3a7722e9c6081b7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/HiveTransactionManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/HiveTransactionManager.java @@ -21,6 +21,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.datasource.hive.HMSTransaction; import org.apache.doris.datasource.hive.HiveMetadataOps; +import org.apache.doris.fs.FileSystemProvider; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -30,14 +31,17 @@ public class HiveTransactionManager implements TransactionManager { private final Map transactions = new ConcurrentHashMap<>(); private final HiveMetadataOps ops; - public HiveTransactionManager(HiveMetadataOps ops) { + private final FileSystemProvider fileSystemProvider; + + public HiveTransactionManager(HiveMetadataOps ops, FileSystemProvider fileSystemProvider) { this.ops = ops; + this.fileSystemProvider = fileSystemProvider; } @Override public long begin() { long id = Env.getCurrentEnv().getNextId(); - HMSTransaction hiveTransaction = new HMSTransaction(ops); + HMSTransaction hiveTransaction = new HMSTransaction(ops, fileSystemProvider); transactions.put(id, hiveTransaction); return id; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java index 334258a3f12d602..c4607460dc45972 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java @@ -18,10 +18,12 @@ package org.apache.doris.transaction; import org.apache.doris.datasource.hive.HiveMetadataOps; +import org.apache.doris.fs.FileSystemProvider; public class TransactionManagerFactory { - public static TransactionManager createHiveTransactionManager(HiveMetadataOps ops) { - return new HiveTransactionManager(ops); + public static TransactionManager createHiveTransactionManager(HiveMetadataOps ops, + FileSystemProvider fileSystemProvider) { + return new HiveTransactionManager(ops, fileSystemProvider); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java index 571826aa9c8ac1b..69130f57fff4b3c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java @@ -34,7 +34,7 @@ public void testHdfsLocationConvert() { LocationPath locationPath = new LocationPath("hdfs://dir/file.path", rangeProps); Assertions.assertTrue(locationPath.get().startsWith("hdfs://")); - String beLocation = locationPath.toScanRangeLocation().toString(); + String beLocation = locationPath.toStorageLocation().toString(); Assertions.assertTrue(beLocation.startsWith("hdfs://")); Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.DFS); @@ -45,21 +45,21 @@ public void testHdfsLocationConvert() { Assertions.assertTrue(locationPath.get().startsWith("hdfs://") && !locationPath.get().startsWith("hdfs:///")); - beLocation = locationPath.toScanRangeLocation().toString(); + beLocation = locationPath.toStorageLocation().toString(); Assertions.assertTrue(beLocation.startsWith("hdfs://") && !beLocation.startsWith("hdfs:///")); // nonstandard '/' for hdfs path locationPath = new LocationPath("hdfs:/dir/file.path", props); Assertions.assertTrue(locationPath.get().startsWith("hdfs://")); - beLocation = locationPath.toScanRangeLocation().toString(); + beLocation = locationPath.toStorageLocation().toString(); Assertions.assertTrue(beLocation.startsWith("hdfs://")); // empty ha nameservices props.put("dfs.nameservices", ""); locationPath = new LocationPath("hdfs:/dir/file.path", props); - beLocation = locationPath.toScanRangeLocation().toString(); + beLocation = locationPath.toStorageLocation().toString(); Assertions.assertTrue(locationPath.get().startsWith("/dir") && !locationPath.get().startsWith("hdfs://")); Assertions.assertTrue(beLocation.startsWith("/dir") && !beLocation.startsWith("hdfs://")); @@ -75,7 +75,7 @@ public void testJFSLocationConvert() { // FE Assertions.assertTrue(locationPath.get().startsWith("jfs://")); // BE - loc = locationPath.toScanRangeLocation().toString(); + loc = locationPath.toStorageLocation().toString(); Assertions.assertTrue(loc.startsWith("jfs://")); Assertions.assertEquals(LocationPath.getFSIdentity(loc, null).first, FileSystemType.JFS); } @@ -89,7 +89,7 @@ public void testGSLocationConvert() { // FE Assertions.assertTrue(locationPath.get().startsWith("s3://")); // BE - String beLoc = locationPath.toScanRangeLocation().toString(); + String beLoc = locationPath.toStorageLocation().toString(); Assertions.assertTrue(beLoc.startsWith("s3://")); Assertions.assertEquals(LocationPath.getFSIdentity(beLoc, null).first, FileSystemType.S3); } @@ -101,7 +101,7 @@ public void testOSSLocationConvert() { // FE Assertions.assertTrue(locationPath.get().startsWith("oss://")); // BE - String beLocation = locationPath.toScanRangeLocation().toString(); + String beLocation = locationPath.toStorageLocation().toString(); Assertions.assertTrue(beLocation.startsWith("s3://")); Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.S3); @@ -109,7 +109,7 @@ public void testOSSLocationConvert() { // FE Assertions.assertTrue(locationPath.get().startsWith("oss://test.oss-dls.aliyuncs")); // BE - beLocation = locationPath.toScanRangeLocation().toString(); + beLocation = locationPath.toStorageLocation().toString(); Assertions.assertTrue(beLocation.startsWith("oss://test.oss-dls.aliyuncs")); Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.DFS); @@ -121,7 +121,7 @@ public void testCOSLocationConvert() { LocationPath locationPath = new LocationPath("cos://test.com", rangeProps); // FE Assertions.assertTrue(locationPath.get().startsWith("cos://")); - String beLocation = locationPath.toScanRangeLocation().toString(); + String beLocation = locationPath.toStorageLocation().toString(); // BE Assertions.assertTrue(beLocation.startsWith("s3://")); Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.S3); @@ -130,7 +130,7 @@ public void testCOSLocationConvert() { // FE Assertions.assertTrue(locationPath.get().startsWith("cosn://")); // BE - beLocation = locationPath.toScanRangeLocation().toString(); + beLocation = locationPath.toStorageLocation().toString(); Assertions.assertTrue(beLocation.startsWith("s3://")); Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.S3); @@ -138,7 +138,7 @@ public void testCOSLocationConvert() { // FE Assertions.assertTrue(locationPath.get().startsWith("ofs://")); // BE - beLocation = locationPath.toScanRangeLocation().toString(); + beLocation = locationPath.toStorageLocation().toString(); Assertions.assertTrue(beLocation.startsWith("ofs://")); Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.OFS); @@ -147,7 +147,7 @@ public void testCOSLocationConvert() { // FE Assertions.assertTrue(locationPath.get().startsWith("gfs://")); // BE - beLocation = locationPath.toScanRangeLocation().toString(); + beLocation = locationPath.toStorageLocation().toString(); Assertions.assertTrue(beLocation.startsWith("gfs://")); Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.DFS); } @@ -159,7 +159,7 @@ public void testOBSLocationConvert() { // FE Assertions.assertTrue(locationPath.get().startsWith("obs://")); // BE - String beLocation = locationPath.toScanRangeLocation().toString(); + String beLocation = locationPath.toStorageLocation().toString(); Assertions.assertTrue(beLocation.startsWith("s3://")); Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.S3); } @@ -173,7 +173,7 @@ public void testUnsupportedLocationConvert() { Assertions.assertTrue(locationPath.get().startsWith("unknown://")); Assertions.assertTrue(locationPath.getLocationType() == LocationPath.LocationType.UNKNOWN); // BE - String beLocation = locationPath.toScanRangeLocation().toString(); + String beLocation = locationPath.toStorageLocation().toString(); Assertions.assertTrue(beLocation.startsWith("unknown://")); } @@ -186,7 +186,7 @@ public void testNoSchemeLocation() { Assertions.assertTrue(locationPath.get().equalsIgnoreCase("/path/to/local")); Assertions.assertTrue(locationPath.getLocationType() == LocationPath.LocationType.NOSCHEME); // BE - String beLocation = locationPath.toScanRangeLocation().toString(); + String beLocation = locationPath.toStorageLocation().toString(); Assertions.assertTrue(beLocation.equalsIgnoreCase("/path/to/local")); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/S3URITest.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/S3URITest.java index 383926d8844d553..1d92158c9cf0cfd 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/util/S3URITest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/S3URITest.java @@ -22,35 +22,157 @@ import org.junit.Assert; import org.junit.Test; +import java.util.Optional; + public class S3URITest { @Test public void testLocationParsing() throws UserException { - String p1 = "s3://bucket/path/to/file"; - S3URI uri1 = S3URI.create(p1); + String p1 = "s3://my-bucket/path/to/file"; + boolean isPathStyle = false; + boolean forceParsingStandardUri = false; + S3URI uri1 = S3URI.create(p1, isPathStyle, forceParsingStandardUri); - Assert.assertEquals("bucket", uri1.getBucket()); + Assert.assertEquals("my-bucket", uri1.getBucket()); Assert.assertEquals("path/to/file", uri1.getKey()); - Assert.assertEquals(p1, uri1.toString()); + Assert.assertEquals(Optional.empty(), uri1.getRegion()); + Assert.assertEquals(Optional.empty(), uri1.getEndpoint()); + Assert.assertEquals(Optional.empty(), uri1.getQueryParams()); } @Test - public void testPathLocationParsing() throws UserException { - String p1 = "s3://bucket/path/"; - S3URI uri1 = S3URI.create(p1); + public void testVirtualHostStyleParsing() throws UserException { + String p1 = "https://my-bucket.s3.us-west-1.amazonaws.com/resources/doc.txt?versionId=abc123&partNumber=77&partNumber=88"; + boolean isPathStyle = false; + boolean forceParsingStandardUri = false; + S3URI uri1 = S3URI.create(p1, isPathStyle, forceParsingStandardUri); - Assert.assertEquals("bucket", uri1.getBucket()); - Assert.assertEquals("path/", uri1.getKey()); - Assert.assertEquals(p1, uri1.toString()); + Assert.assertEquals("my-bucket", uri1.getBucket()); + Assert.assertEquals("resources/doc.txt", uri1.getKey()); + Assert.assertEquals("s3.us-west-1.amazonaws.com", uri1.getEndpoint().get()); + Assert.assertEquals("us-west-1", uri1.getRegion().get()); + Assert.assertEquals("abc123", uri1.getQueryParams().get().get("versionId").get(0)); + Assert.assertEquals(2, uri1.getQueryParams().get().get("partNumber").size()); + Assert.assertTrue(uri1.getQueryParams().get().get("partNumber").contains("77")); + Assert.assertTrue(uri1.getQueryParams().get().get("partNumber").contains("88")); + } + + @Test + public void testPathStyleParsing() throws UserException { + String p1 = "https://s3.us-west-1.amazonaws.com/my-bucket/resources/doc.txt?versionId=abc123&partNumber=77&partNumber=88"; + boolean isPathStyle = true; + boolean forceParsingStandardUri = false; + S3URI uri1 = S3URI.create(p1, isPathStyle, forceParsingStandardUri); + + Assert.assertEquals("my-bucket", uri1.getBucket()); + Assert.assertEquals("resources/doc.txt", uri1.getKey()); + Assert.assertEquals("s3.us-west-1.amazonaws.com", uri1.getEndpoint().get()); + Assert.assertEquals("us-west-1", uri1.getRegion().get()); + Assert.assertEquals("abc123", uri1.getQueryParams().get().get("versionId").get(0)); + Assert.assertEquals(2, uri1.getQueryParams().get().get("partNumber").size()); + Assert.assertTrue(uri1.getQueryParams().get().get("partNumber").contains("77")); + Assert.assertTrue(uri1.getQueryParams().get().get("partNumber").contains("88")); + } + + @Test + public void testForceParsingStandardUri() throws UserException { + String p1 = "s3://my-bucket.s3.us-west-1.amazonaws.com/path/to/file"; + S3URI uri1 = S3URI.create(p1, false, true); + + Assert.assertEquals("my-bucket", uri1.getBucket()); + Assert.assertEquals("path/to/file", uri1.getKey()); + Assert.assertEquals("s3.us-west-1.amazonaws.com", uri1.getEndpoint().get()); + Assert.assertEquals("us-west-1", uri1.getRegion().get()); + Assert.assertEquals(Optional.empty(), uri1.getQueryParams()); + + String p2 = "s3://s3.us-west-1.amazonaws.com/my-bucket/path/to/file"; + S3URI uri2 = S3URI.create(p2, true, true); + + Assert.assertEquals("my-bucket", uri2.getBucket()); + Assert.assertEquals("path/to/file", uri2.getKey()); + Assert.assertEquals("s3.us-west-1.amazonaws.com", uri2.getEndpoint().get()); + Assert.assertEquals(Optional.empty(), uri1.getQueryParams()); + } + + @Test + public void testOSSVirtualHostStyle() throws UserException { + String p1 = "https://my-bucket.oss-cn-bejing.aliyuncs.com/resources/doc.txt?versionId=abc123&partNumber=77&partNumber=88"; + boolean isPathStyle = false; + boolean forceParsingStandardUri = false; + S3URI uri1 = S3URI.create(p1, isPathStyle, forceParsingStandardUri); + + Assert.assertEquals("my-bucket", uri1.getBucket()); + Assert.assertEquals("resources/doc.txt", uri1.getKey()); + Assert.assertEquals("oss-cn-bejing.aliyuncs.com", uri1.getEndpoint().get()); + Assert.assertEquals("oss-cn-bejing", uri1.getRegion().get()); + Assert.assertEquals("abc123", uri1.getQueryParams().get().get("versionId").get(0)); + Assert.assertEquals(2, uri1.getQueryParams().get().get("partNumber").size()); + Assert.assertTrue(uri1.getQueryParams().get().get("partNumber").contains("77")); + Assert.assertTrue(uri1.getQueryParams().get().get("partNumber").contains("88")); + } + + @Test + public void testOSSPathStyle() throws UserException { + String p1 = "https://oss-cn-bejing.aliyuncs.com/my-bucket/resources/doc.txt?versionId=abc123&partNumber=77&partNumber=88"; + boolean isPathStyle = true; + boolean forceParsingStandardUri = false; + S3URI uri1 = S3URI.create(p1, isPathStyle, forceParsingStandardUri); + + Assert.assertEquals("my-bucket", uri1.getBucket()); + Assert.assertEquals("resources/doc.txt", uri1.getKey()); + Assert.assertEquals("oss-cn-bejing.aliyuncs.com", uri1.getEndpoint().get()); + Assert.assertEquals("oss-cn-bejing", uri1.getRegion().get()); + Assert.assertEquals("abc123", uri1.getQueryParams().get().get("versionId").get(0)); + Assert.assertEquals(2, uri1.getQueryParams().get().get("partNumber").size()); + Assert.assertTrue(uri1.getQueryParams().get().get("partNumber").contains("77")); + Assert.assertTrue(uri1.getQueryParams().get().get("partNumber").contains("88")); + } + + @Test + public void testCOSVirtualHostStyle() throws UserException { + String p1 = "https://my-bucket.cos.ap-beijing.myqcloud.com/resources/doc.txt"; + boolean isPathStyle = false; + boolean forceParsingStandardUri = false; + S3URI uri1 = S3URI.create(p1, isPathStyle, forceParsingStandardUri); + + Assert.assertEquals("my-bucket", uri1.getBucket()); + Assert.assertEquals("resources/doc.txt", uri1.getKey()); + Assert.assertEquals("cos.ap-beijing.myqcloud.com", uri1.getEndpoint().get()); + Assert.assertEquals("ap-beijing", uri1.getRegion().get()); + } + + @Test + public void testOBSVirtualHostStyle() throws UserException { + String p1 = "https://my-bucket.obs.cn-north-4.myhuaweicloud.com/test_obs/000000_0"; + boolean isPathStyle = false; + boolean forceParsingStandardUri = false; + S3URI uri1 = S3URI.create(p1, isPathStyle, forceParsingStandardUri); + + Assert.assertEquals("my-bucket", uri1.getBucket()); + Assert.assertEquals("test_obs/000000_0", uri1.getKey()); + Assert.assertEquals("obs.cn-north-4.myhuaweicloud.com", uri1.getEndpoint().get()); + Assert.assertEquals("cn-north-4", uri1.getRegion().get()); } @Test public void testEncodedString() throws UserException { - String p1 = "s3://bucket/path%20to%20file"; - S3URI uri1 = S3URI.create(p1); + String p1 = "s3://bucket/path%20to%20file?txt=hello%20world&partNumber=77&partNumber=88"; + boolean isPathStyle = false; + boolean forceParsingStandardUri = false; + S3URI uri1 = S3URI.create(p1, isPathStyle, forceParsingStandardUri); Assert.assertEquals("bucket", uri1.getBucket()); Assert.assertEquals("path%20to%20file", uri1.getKey()); - Assert.assertEquals(p1, uri1.toString()); + Assert.assertEquals(Optional.empty(), uri1.getEndpoint()); + Assert.assertEquals(Optional.empty(), uri1.getRegion()); + Assert.assertEquals("hello%20world", uri1.getQueryParams().get().get("txt").get(0)); + Assert.assertEquals(2, uri1.getQueryParams().get().get("partNumber").size()); + Assert.assertTrue(uri1.getQueryParams().get().get("partNumber").contains("77")); + Assert.assertTrue(uri1.getQueryParams().get().get("partNumber").contains("88")); + } + + @Test(expected = UserException.class) + public void missingBucket() throws UserException { + S3URI.create("https:///"); } @Test(expected = UserException.class) @@ -75,6 +197,9 @@ public void testQueryAndFragment() throws UserException { Assert.assertEquals("bucket", uri1.getBucket()); Assert.assertEquals("path/to/file", uri1.getKey()); - Assert.assertEquals(p1, uri1.toString()); + Assert.assertEquals(Optional.empty(), uri1.getEndpoint()); + Assert.assertEquals(Optional.empty(), uri1.getRegion()); + Assert.assertEquals("foo", uri1.getQueryParams().get().get("query").get(0)); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java index ba87dd8f48eaed0..428bd4822108baa 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.datasource.TestHMSCachedClient; +import org.apache.doris.fs.FileSystemProvider; import org.apache.doris.fs.LocalDfsFileSystem; import org.apache.doris.nereids.trees.plans.commands.insert.HiveInsertCommandContext; import org.apache.doris.qe.ConnectContext; @@ -60,6 +61,8 @@ public class HmsCommitTest { private static HiveMetadataOps hmsOps; private static HMSCachedClient hmsClient; + + private static FileSystemProvider fileSystemProvider; private static final String dbName = "test_db"; private static final String tbWithPartition = "test_tb_with_partition"; private static final String tbWithoutPartition = "test_tb_without_partition"; @@ -96,7 +99,8 @@ public static void createTestHiveCatalog() throws IOException { } else { hmsClient = new TestHMSCachedClient(); } - hmsOps = new HiveMetadataOps(null, hmsClient, fs); + hmsOps = new HiveMetadataOps(null, hmsClient); + fileSystemProvider = ctx -> fs; } public static void createTestHiveDatabase() { @@ -363,7 +367,7 @@ public THivePartitionUpdate createRandomOverwrite(String partition) throws IOExc public void commit(String dbName, String tableName, List hivePUs) { - HMSTransaction hmsTransaction = new HMSTransaction(hmsOps); + HMSTransaction hmsTransaction = new HMSTransaction(hmsOps, fileSystemProvider); hmsTransaction.setHivePartitionUpdates(hivePUs); HiveInsertCommandContext ctx = new HiveInsertCommandContext(); String queryId = DebugUtil.printId(ConnectContext.get().queryId()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/PropertyConverterTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/PropertyConverterTest.java index 4c4f52462104c07..9050035f7c8f31b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/PropertyConverterTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/PropertyConverterTest.java @@ -225,7 +225,7 @@ public void testBosBrokerRepositoryPropertiesConverter() throws Exception { public void testS3TVFPropertiesConverter() throws Exception { FeConstants.runningUnitTest = true; String queryOld = "select * from s3(\n" - + " 'uri' = 'http://s3.us-east-1.amazonaws.com/test.parquet',\n" + + " 'uri' = 'http://s3.us-east-1.amazonaws.com/my-bucket/test.parquet',\n" + " 'access_key' = 'akk',\n" + " 'secret_key' = 'skk',\n" + " 'region' = 'us-east-1',\n" @@ -239,7 +239,7 @@ public void testS3TVFPropertiesConverter() throws Exception { Assertions.assertEquals(s3Tvf.getBrokerDesc().getProperties().size(), 9); String queryNew = "select * from s3(\n" - + " 'uri' = 'http://s3.us-east-1.amazonaws.com/test.parquet',\n" + + " 'uri' = 'http://s3.us-east-1.amazonaws.com/my-bucket/test.parquet',\n" + " 's3.access_key' = 'akk',\n" + " 's3.secret_key' = 'skk',\n" + " 'format' = 'parquet',\n" diff --git a/fe/fe-core/src/test/java/org/apache/doris/fs/obj/S3FileSystemTest.java b/fe/fe-core/src/test/java/org/apache/doris/fs/obj/S3FileSystemTest.java index d5983bb3ab52459..442883573ce49a2 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/fs/obj/S3FileSystemTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/fs/obj/S3FileSystemTest.java @@ -98,12 +98,12 @@ public void setUp() throws Exception { mockedClient.setMockedData(content.getBytes()); new MockUp(S3ObjStorage.class) { @Mock - public S3Client getClient(String bucket) throws UserException { + public S3Client getClient() throws UserException { return mockedClient; } }; S3ObjStorage mockedStorage = new S3ObjStorage(properties); - Assertions.assertTrue(mockedStorage.getClient("mocked") instanceof MockedS3Client); + Assertions.assertTrue(mockedStorage.getClient() instanceof MockedS3Client); // inject storage to file system. fileSystem = new S3FileSystem(mockedStorage); new MockUp(S3FileSystem.class) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/fs/obj/S3ObjStorageTest.java b/fe/fe-core/src/test/java/org/apache/doris/fs/obj/S3ObjStorageTest.java index c4dce56c5782d07..f5995cc9b27423e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/fs/obj/S3ObjStorageTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/fs/obj/S3ObjStorageTest.java @@ -19,7 +19,6 @@ import org.apache.doris.backup.Status; import org.apache.doris.common.UserException; -import org.apache.doris.common.util.S3URI; import org.apache.commons.lang3.StringUtils; import org.junit.jupiter.api.Assertions; @@ -113,14 +112,7 @@ public void testBaseOp() throws Exception { client.setAccessible(true); MockedS3Client mockedClient = new MockedS3Client(); client.set(storage, mockedClient); - Assertions.assertTrue(storage.getClient("mocked") instanceof MockedS3Client); - - S3URI vUri = S3URI.create("s3://bucket/key", true); - S3URI uri = S3URI.create("s3://bucket/key", false); - Assertions.assertEquals(vUri.getVirtualBucket(), "bucket"); - Assertions.assertEquals(vUri.getBucket(), "key"); - Assertions.assertEquals(uri.getVirtualBucket(), ""); - Assertions.assertEquals(uri.getBucket(), "bucket"); + Assertions.assertTrue(storage.getClient() instanceof MockedS3Client); Status st = storage.headObject("s3://bucket/key"); Assertions.assertEquals(Status.OK, st); diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index d04f0d34a0b3372..b9cf79b0aec9eca 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -284,6 +284,8 @@ struct THiveLocationParams { 1: optional string write_path 2: optional string target_path 3: optional Types.TFileType file_type + // Other object store will convert write_path to s3 scheme path for BE, this field keeps the original write path. + 4: optional string original_write_path } struct TSortedColumn { @@ -338,6 +340,13 @@ enum TUpdateMode { OVERWRITE = 2 // insert overwrite } +struct TS3MPUPendingUpload { + 1: optional string bucket + 2: optional string key + 3: optional string upload_id + 4: optional map etags +} + struct THivePartitionUpdate { 1: optional string name 2: optional TUpdateMode update_mode @@ -345,6 +354,7 @@ struct THivePartitionUpdate { 4: optional list file_names 5: optional i64 row_count 6: optional i64 file_size + 7: optional list s3_mpu_pending_uploads } enum TFileContent { diff --git a/regression-test/suites/external_table_p0/hive/write/test_hive_write_insert_s3.groovy b/regression-test/suites/external_table_p0/hive/write/test_hive_write_insert_s3.groovy new file mode 100644 index 000000000000000..6d2750c81eb9f4a --- /dev/null +++ b/regression-test/suites/external_table_p0/hive/write/test_hive_write_insert_s3.groovy @@ -0,0 +1,161 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_hive_write_insert_s3", "p0,external,hive,external_docker,external_docker_hive") { + def format_compressions = ["parquet_snappy"] + + def q01 = { String format_compression, String catalog_name -> + logger.info("hive sql: " + """ truncate table all_types_${format_compression}_s3; """) + hive_docker """ truncate table all_types_${format_compression}_s3; """ + sql """refresh catalog ${catalog_name};""" + + sql """ + INSERT INTO all_types_${format_compression}_s3 + SELECT * FROM all_types_parquet_snappy_src; + """ + order_qt_q01 """ select * from all_types_${format_compression}_s3; + """ + + sql """ + INSERT INTO all_types_${format_compression}_s3 + SELECT boolean_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, decimal_col1, decimal_col2, + decimal_col3, decimal_col4, string_col, binary_col, date_col, timestamp_col1, timestamp_col2, timestamp_col3, char_col1, + char_col2, char_col3, varchar_col1, varchar_col2, varchar_col3, t_map_string, t_map_varchar, t_map_char, t_map_int, + t_map_bigint, t_map_float, t_map_double, t_map_boolean, t_map_decimal_precision_2, t_map_decimal_precision_4, + t_map_decimal_precision_8, t_map_decimal_precision_17, t_map_decimal_precision_18, t_map_decimal_precision_38, + t_array_string, t_array_int, t_array_bigint, t_array_float, t_array_double, t_array_boolean, t_array_varchar, + t_array_char, t_array_decimal_precision_2, t_array_decimal_precision_4, t_array_decimal_precision_8, + t_array_decimal_precision_17, t_array_decimal_precision_18, t_array_decimal_precision_38, t_struct_bigint, t_complex, + t_struct_nested, t_struct_null, t_struct_non_nulls_after_nulls, t_nested_struct_non_nulls_after_nulls, + t_map_null_value, t_array_string_starting_with_nulls, t_array_string_with_nulls_in_between, + t_array_string_ending_with_nulls, t_array_string_all_nulls, dt FROM all_types_parquet_snappy_src; + """ + order_qt_q02 """ select * from all_types_${format_compression}_s3; + """ + + sql """ + INSERT INTO all_types_${format_compression}_s3(float_col, t_map_int, t_array_decimal_precision_8, t_array_string_starting_with_nulls) + SELECT float_col, t_map_int, t_array_decimal_precision_8, t_array_string_starting_with_nulls FROM all_types_parquet_snappy_src; + """ + order_qt_q03 """ + select * from all_types_${format_compression}_s3; + """ + + sql """ + INSERT OVERWRITE TABLE all_types_${format_compression}_s3(float_col, t_map_int, t_array_decimal_precision_8, t_array_string_starting_with_nulls) + SELECT float_col, t_map_int, t_array_decimal_precision_8, t_array_string_starting_with_nulls FROM all_types_parquet_snappy_src; + """ + order_qt_q04 """ + select * from all_types_${format_compression}_s3; + """ + + logger.info("hive sql: " + """ truncate table all_types_${format_compression}_s3; """) + hive_docker """ truncate table all_types_${format_compression}_s3; """ + order_qt_q05 """ + select * from all_types_${format_compression}_s3; + """ + } + + def q02 = { String format_compression, String catalog_name -> + logger.info("hive sql: " + """ DROP TABLE IF EXISTS all_types_par_${format_compression}_s3_${catalog_name}_q02; """) + hive_docker """ DROP TABLE IF EXISTS all_types_par_${format_compression}_s3_${catalog_name}_q02; """ + logger.info("hive sql: " + """ CREATE TABLE IF NOT EXISTS all_types_par_${format_compression}_s3_${catalog_name}_q02 like all_types_par_${format_compression}; """) + hive_docker """ CREATE TABLE IF NOT EXISTS all_types_par_${format_compression}_s3_${catalog_name}_q02 like all_types_par_${format_compression}; """ + sql """refresh catalog ${catalog_name};""" + + sql """ + INSERT INTO all_types_par_${format_compression}_s3_${catalog_name}_q02 + SELECT * FROM all_types_par_parquet_snappy_src; + """ + order_qt_q01 """ select * from all_types_par_${format_compression}_s3_${catalog_name}_q02; + """ + + sql """ + INSERT INTO all_types_par_${format_compression}_s3_${catalog_name}_q02 + SELECT boolean_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, decimal_col1, decimal_col2, + decimal_col3, decimal_col4, string_col, binary_col, date_col, timestamp_col1, timestamp_col2, timestamp_col3, char_col1, + char_col2, char_col3, varchar_col1, varchar_col2, varchar_col3, t_map_string, t_map_varchar, t_map_char, t_map_int, + t_map_bigint, t_map_float, t_map_double, t_map_boolean, t_map_decimal_precision_2, t_map_decimal_precision_4, + t_map_decimal_precision_8, t_map_decimal_precision_17, t_map_decimal_precision_18, t_map_decimal_precision_38, + t_array_string, t_array_int, t_array_bigint, t_array_float, t_array_double, t_array_boolean, t_array_varchar, + t_array_char, t_array_decimal_precision_2, t_array_decimal_precision_4, t_array_decimal_precision_8, + t_array_decimal_precision_17, t_array_decimal_precision_18, t_array_decimal_precision_38, t_struct_bigint, t_complex, + t_struct_nested, t_struct_null, t_struct_non_nulls_after_nulls, t_nested_struct_non_nulls_after_nulls, + t_map_null_value, t_array_string_starting_with_nulls, t_array_string_with_nulls_in_between, + t_array_string_ending_with_nulls, t_array_string_all_nulls, dt FROM all_types_parquet_snappy_src; + """ + order_qt_q02 """ select * from all_types_par_${format_compression}_s3_${catalog_name}_q02; + """ + + sql """ + INSERT INTO all_types_par_${format_compression}_s3_${catalog_name}_q02(float_col, t_map_int, t_array_decimal_precision_8, t_array_string_starting_with_nulls, dt) + SELECT float_col, t_map_int, t_array_decimal_precision_8, t_array_string_starting_with_nulls, dt FROM all_types_parquet_snappy_src; + """ + order_qt_q03 """ select * from all_types_par_${format_compression}_s3_${catalog_name}_q02; + """ + + sql """ + INSERT OVERWRITE TABLE all_types_par_${format_compression}_s3_${catalog_name}_q02(float_col, t_map_int, t_array_decimal_precision_8, t_array_string_starting_with_nulls, dt) + SELECT float_col, t_map_int, t_array_decimal_precision_8, t_array_string_starting_with_nulls, dt FROM all_types_parquet_snappy_src; + """ + order_qt_q04 """ + select * from all_types_par_${format_compression}_s3_${catalog_name}_q02; + """ + + logger.info("hive sql: " + """ DROP TABLE IF EXISTS all_types_par_${format_compression}_s3_${catalog_name}_q02; """) + hive_docker """ DROP TABLE IF EXISTS all_types_par_${format_compression}_s3_${catalog_name}_q02; """ + } + + String enabled = context.config.otherConfigs.get("enableHiveTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + try { + String hms_port = context.config.otherConfigs.get("hms_port") + String hdfs_port = context.config.otherConfigs.get("hdfs_port") + String catalog_name = "test_hive_write_insert_s3" + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String ak = getS3AK() + String sk = getS3SK() + String endpoint = getS3Endpoint() + String region = getS3Region() + + sql """drop catalog if exists ${catalog_name}""" + sql """create catalog if not exists ${catalog_name} properties ( + 'type'='hms', + 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}', + 'fs.defaultFS' = 'hdfs://${externalEnvIp}:${hdfs_port}', + 's3.endpoint' = '${endpoint}', + 's3.access_key' = '${ak}', + 's3.secret_key' = '${sk}', + 's3.region' = '${region}' + );""" + sql """use `${catalog_name}`.`write_test`""" + logger.info("hive sql: " + """ use `write_test` """) + hive_docker """use `write_test`""" + + sql """set enable_fallback_to_original_planner=false;""" + + for (String format_compression in format_compressions) { + logger.info("Process format_compression" + format_compression) + q01(format_compression, catalog_name) + q02(format_compression, catalog_name) + } + + sql """drop catalog if exists ${catalog_name}""" + } finally { + } + } +} diff --git a/regression-test/suites/load_p0/tvf/test_s3_tvf.groovy b/regression-test/suites/load_p0/tvf/test_s3_tvf.groovy index 391b76d37ebdd30..83a93fd5d8223f3 100644 --- a/regression-test/suites/load_p0/tvf/test_s3_tvf.groovy +++ b/regression-test/suites/load_p0/tvf/test_s3_tvf.groovy @@ -56,40 +56,46 @@ suite("test_s3_tvf", "load_p0") { attributeList.add(new TvfAttribute(table, ["K00", "K01", "K02", "K03", "K04", "K05", "K06", "K07", "K08", "K09", "K10", "K11", "K12", "K13", "K14", "K15", "K16", "K17", "K18"], "", "") .addProperty("uri", "s3://doris-build-1308700295.cos.ap-beijing.myqcloud.com/regression/load/data/basic_data.csv") .addProperty("format", "csv") - .addProperty("column_separator", "|")) + .addProperty("column_separator", "|") + .addProperty("force_parsing_by_standard_uri", "true")) } attributeList.add(new TvfAttribute("agg_tbl_basic_tvf", "c1 as k00,c2 as k01,c3 as k02,c4 as k03,c5 as k04,c6 as k05,c7 as k06,c8 as k07,c9 as k08,c10 as k09,c11 as k10,c12 as k11,c13 as k12,c14 as k13,c15 as k14,c16 as k15,c17 as k16,c18 as k17,c19 as k18, to_bitmap(c6) as k19, HLL_HASH(c6) as k20, TO_QUANTILE_STATE(c5, 1.0) as k21, to_bitmap(c6) as kd19, HLL_HASH(c6) as kd20, TO_QUANTILE_STATE(c5, 1.0) as kd21", "k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18,k19,k20,k21,kd19,kd20,kd21" ,"", "") .addProperty("uri", "s3://doris-build-1308700295.cos.ap-beijing.myqcloud.com/regression/load/data/basic_data.csv") .addProperty("format", "csv") - .addProperty("column_separator", "|")) + .addProperty("column_separator", "|") + .addProperty("force_parsing_by_standard_uri", "true")) for(String table : arrayTables) { attributeList.add(new TvfAttribute(table, ["K00", "K01", "K02", "K03", "K04", "K05", "K06", "K07", "K08", "K09", "K10", "K11", "K12", "K13", "K14", "K15", "K16", "K17"], "", "").addProperty("uri", "s3://doris-build-1308700295.cos.ap-beijing.myqcloud.com/regression/load/data/basic_array_data.csv") .addProperty("uri", "s3://doris-build-1308700295.cos.ap-beijing.myqcloud.com/regression/load/data/basic_array_data.csv") .addProperty("format", "csv") - .addProperty("column_separator", "|")) + .addProperty("column_separator", "|") + .addProperty("force_parsing_by_standard_uri", "true")) } for(String table : basicTables) { attributeList.add(new TvfAttribute(table, ["k00", "k01", "k02", "k03", "k04", "k05", "k06", "k07", "k08", "k09", "k10", "k11", "k12", "k13", "k14", "k15", "k16", "k17", "k18"], "", "") .addProperty("uri", "s3://doris-build-1308700295.cos.ap-beijing.myqcloud.com/regression/load/data/basic_data.csv") .addProperty("format", "csv") - .addProperty("column_separator", "|")) + .addProperty("column_separator", "|") + .addProperty("force_parsing_by_standard_uri", "true")) } attributeList.add(new TvfAttribute("agg_tbl_basic_tvf", "c1 as k00,c2 as k01,c3 as k02,c4 as k03,c5 as k04,c6 as k05,c7 as k06,c8 as k07,c9 as k08,c10 as k09,c11 as k10,c12 as k11,c13 as k12,c14 as k13,c15 as k14,c16 as k15,c17 as k16,c18 as k17,c19 as k18, to_bitmap(c6) as k19, HLL_HASH(c6) as k20, TO_QUANTILE_STATE(c5, 1.0) as k21, to_bitmap(c6) as kd19, HLL_HASH(c6) as kd20, TO_QUANTILE_STATE(c5, 1.0) as kd21", "k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18,k19,k20,k21,kd19,kd20,kd21" ,"", "") .addProperty("uri", "s3://doris-build-1308700295.cos.ap-beijing.myqcloud.com/regression/load/data/basic_data.csv") .addProperty("format", "csv") - .addProperty("column_separator", "|")) + .addProperty("column_separator", "|") + .addProperty("force_parsing_by_standard_uri", "true")) for(String table : arrayTables) { attributeList.add(new TvfAttribute(table, ["k00", "k01", "k02", "k03", "k04", "k05", "k06", "k07", "k08", "k09", "k10", "k11", "k12", "k13", "k14", "k15", "k16", "k17"], "", "") .addProperty("uri", "s3://doris-build-1308700295.cos.ap-beijing.myqcloud.com/regression/load/data/basic_array_data.csv") .addProperty("format", "csv") - .addProperty("column_separator", "|")) + .addProperty("column_separator", "|") + .addProperty("force_parsing_by_standard_uri", "true")) } for(String table : basicTables) { @@ -140,20 +146,23 @@ suite("test_s3_tvf", "load_p0") { attributeList.add(new TvfAttribute(table, ["k00", "k01", "k02", "k03", "k04", "k05", "k06", "k07", "k08", "k09", "k10", "k11", "k12", "k13", "k14", "k15", "k16", "k17", "k18"], "", "", true) .addProperty("uri", "s3://doris-build-1308700295.cos.ap-beijing.myqcloud.com/regression/load/data/basic_data_with_errors.csv") .addProperty("format", "csv") - .addProperty("column_separator", "|")) + .addProperty("column_separator", "|") + .addProperty("force_parsing_by_standard_uri", "true")) } attributeList.add(new TvfAttribute("agg_tbl_basic_tvf", "c1 as k00,c2 as k01,c3 as k02,c4 as k03,c5 as k04,c6 as k05,c7 as k06,c8 as k07,c9 as k08,c10 as k09,c11 as k10,c12 as k11,c13 as k12,c14 as k13,c15 as k14,c16 as k15,c17 as k16,c18 as k17,c19 as k18, to_bitmap(c6) as k19, HLL_HASH(c6) as k20, TO_QUANTILE_STATE(c5, 1.0) as k21, to_bitmap(c6) as kd19, HLL_HASH(c6) as kd20, TO_QUANTILE_STATE(c5, 1.0) as kd21", "k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18,k19,k20,k21,kd19,kd20,kd21" ,"", "", true) .addProperty("uri", "s3://doris-build-1308700295.cos.ap-beijing.myqcloud.com/regression/load/data/basic_data_with_errors.csv") .addProperty("format", "csv") - .addProperty("column_separator", "|")) + .addProperty("column_separator", "|") + .addProperty("force_parsing_by_standard_uri", "true")) for(String table : arrayTables) { attributeList.add(new TvfAttribute(table, ["k00", "k01", "k02", "k03", "k04", "k05", "k06", "k07", "k08", "k09", "k10", "k11", "k12", "k13", "k14", "k15", "k16", "k17", "kd01", "kd02", "kd03", "kd04", "kd05", "kd06", "kd07", "kd08", "kd09", "kd10", "kd11", "kd12", "kd13", "kd14", "kd15", "kd16"], "", "", true) .addProperty("uri", "s3://doris-build-1308700295.cos.ap-beijing.myqcloud.com/regression/load/data/basic_array_data_with_errors.csv") .addProperty("format", "csv") - .addProperty("column_separator", "|")) + .addProperty("column_separator", "|") + .addProperty("force_parsing_by_standard_uri", "true")) } /* skip lines */ @@ -162,7 +171,8 @@ suite("test_s3_tvf", "load_p0") { .addProperty("uri", "s3://doris-build-1308700295.cos.ap-beijing.myqcloud.com/regression/load/data/basic_data_with_errors.csv") .addProperty("format", "csv") .addProperty("column_separator", "|") - .addProperty("skip_lines", "10")) + .addProperty("skip_lines", "10") + .addProperty("force_parsing_by_standard_uri", "true")) } attributeList.add(new TvfAttribute("agg_tbl_basic_tvf", "c1 as k00,c2 as k01,c3 as k02,c4 as k03,c5 as k04,c6 as k05,c7 as k06,c8 as k07,c9 as k08,c10 as k09,c11 as k10,c12 as k11,c13 as k12,c14 as k13,c15 as k14,c16 as k15,c17 as k16,c18 as k17,c19 as k18, to_bitmap(c6) as k19, HLL_HASH(c6) as k20, TO_QUANTILE_STATE(c5, 1.0) as k21, to_bitmap(c6) as kd19, HLL_HASH(c6) as kd20, TO_QUANTILE_STATE(c5, 1.0) as kd21", @@ -170,14 +180,16 @@ suite("test_s3_tvf", "load_p0") { .addProperty("uri", "s3://doris-build-1308700295.cos.ap-beijing.myqcloud.com/regression/load/data/basic_data_with_errors.csv") .addProperty("format", "csv") .addProperty("column_separator", "|") - .addProperty("skip_lines", "10")) + .addProperty("skip_lines", "10") + .addProperty("force_parsing_by_standard_uri", "true")) for(String table : arrayTables) { attributeList.add(new TvfAttribute(table, ["k00", "k01", "k02", "k03", "k04", "k05", "k06", "k07", "k08", "k09", "k10", "k11", "k12", "k13", "k14", "k15", "k16", "k17"], "", "") .addProperty("uri", "s3://doris-build-1308700295.cos.ap-beijing.myqcloud.com/regression/load/data/basic_array_data_with_errors.csv") .addProperty("format", "csv") .addProperty("column_separator", "|") - .addProperty("skip_lines", "10")) + .addProperty("skip_lines", "10") + .addProperty("force_parsing_by_standard_uri", "true")) } /* compress type */ @@ -251,67 +263,77 @@ suite("test_s3_tvf", "load_p0") { attributeList.add(new TvfAttribute(table, ["k00", "k01", "k02", "k03", "k04", "k05", "k06", "k07", "k08", "k09", "k10", "k11", "k12", "k13", "k14", "k15", "k16", "k17", "k18"], "WHERE c1 > 50", "") .addProperty("uri", "s3://doris-build-1308700295.cos.ap-beijing.myqcloud.com/regression/load/data/basic_data.csv") .addProperty("format", "csv") - .addProperty("column_separator", "|")) + .addProperty("column_separator", "|") + .addProperty("force_parsing_by_standard_uri", "true")) } attributeList.add(new TvfAttribute("agg_tbl_basic_tvf", "c1 as k00,c2 as k01,c3 as k02,c4 as k03,c5 as k04,c6 as k05,c7 as k06,c8 as k07,c9 as k08,c10 as k09,c11 as k10,c12 as k11,c13 as k12,c14 as k13,c15 as k14,c16 as k15,c17 as k16,c18 as k17,c19 as k18, to_bitmap(c6) as k19, HLL_HASH(c6) as k20, TO_QUANTILE_STATE(c5, 1.0) as k21, to_bitmap(c6) as kd19, HLL_HASH(c6) as kd20, TO_QUANTILE_STATE(c5, 1.0) as kd21", "k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18,k19,k20,k21,kd19,kd20,kd21" ,"WHERE c1 > 50", "") .addProperty("uri", "s3://doris-build-1308700295.cos.ap-beijing.myqcloud.com/regression/load/data/basic_data.csv") .addProperty("format", "csv") - .addProperty("column_separator", "|")) + .addProperty("column_separator", "|") + .addProperty("force_parsing_by_standard_uri", "true")) for(String table : arrayTables) { attributeList.add(new TvfAttribute(table, ["k00", "k01", "k02", "k03", "k04", "k05", "k06", "k07", "k08", "k09", "k10", "k11", "k12", "k13", "k14", "k15", "k16", "k17"], "WHERE c1 > 50", "") .addProperty("uri", "s3://doris-build-1308700295.cos.ap-beijing.myqcloud.com/regression/load/data/basic_array_data.csv") .addProperty("format", "csv") - .addProperty("column_separator", "|")) + .addProperty("column_separator", "|") + .addProperty("force_parsing_by_standard_uri", "true")) } for(String table : uniqTable) { attributeList.add(new TvfAttribute(table, ["k00", "k01", "k02", "k03", "k04", "k05", "k06", "k07", "k08", "k09", "k10", "k11", "k12", "k13", "k14", "k15", "k16", "k17", "k18"], "", "ORDER BY c1") .addProperty("uri", "s3://doris-build-1308700295.cos.ap-beijing.myqcloud.com/regression/load/data/basic_data.csv") .addProperty("format", "csv") - .addProperty("column_separator", "|")) + .addProperty("column_separator", "|") + .addProperty("force_parsing_by_standard_uri", "true")) } for(String table : basicTables) { attributeList.add(new TvfAttribute(table, "k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18","k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18", "", "") .addProperty("uri", "s3://doris-build-1308700295.cos.ap-beijing.myqcloud.com/regression/load/data/basic_data.parq") .addProperty("format", "parquet") - .addProperty("column_separator", "|")) + .addProperty("column_separator", "|") + .addProperty("force_parsing_by_standard_uri", "true")) } attributeList.add(new TvfAttribute("agg_tbl_basic_tvf", "k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18, to_bitmap(k05) as k19, HLL_HASH(k05) as k20, TO_QUANTILE_STATE(k04, 1.0) as k21, to_bitmap(k05) as kd19, HLL_HASH(k05) as kd20, TO_QUANTILE_STATE(k04, 1.0) as kd21", "k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18,k19,k20,k21,kd19,kd20,kd21" ,"", "") .addProperty("uri", "s3://doris-build-1308700295.cos.ap-beijing.myqcloud.com/regression/load/data/basic_data.parq") .addProperty("format", "parquet") - .addProperty("column_separator", "|")) + .addProperty("column_separator", "|") + .addProperty("force_parsing_by_standard_uri", "true")) for(String table : arrayTables) { attributeList.add(new TvfAttribute(table, "k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17", "k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17", "", "") .addProperty("uri", "s3://doris-build-1308700295.cos.ap-beijing.myqcloud.com/regression/load/data/basic_array_data.parq") .addProperty("format", "parquet") - .addProperty("column_separator", "|")) + .addProperty("column_separator", "|") + .addProperty("force_parsing_by_standard_uri", "true")) } for(String table : basicTables) { attributeList.add(new TvfAttribute(table, "k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18","k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18", "", "") .addProperty("uri", "s3://doris-build-1308700295.cos.ap-beijing.myqcloud.com/regression/load/data/basic_data.orc") .addProperty("format", "orc") - .addProperty("column_separator", "|")) + .addProperty("column_separator", "|") + .addProperty("force_parsing_by_standard_uri", "true")) } attributeList.add(new TvfAttribute("agg_tbl_basic_tvf", "k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18, to_bitmap(k05) as k19, HLL_HASH(k05) as k20, TO_QUANTILE_STATE(k04, 1.0) as k21, to_bitmap(k05) as kd19, HLL_HASH(k05) as kd20, TO_QUANTILE_STATE(k04, 1.0) as kd21", "k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18,k19,k20,k21,kd19,kd20,kd21" ,"", "") .addProperty("uri", "s3://doris-build-1308700295.cos.ap-beijing.myqcloud.com/regression/load/data/basic_data.orc") .addProperty("format", "orc") - .addProperty("column_separator", "|")) + .addProperty("column_separator", "|") + .addProperty("force_parsing_by_standard_uri", "true")) for(String table : arrayTables) { attributeList.add(new TvfAttribute(table, "k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17", "k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17", "", "") .addProperty("uri", "s3://doris-build-1308700295.cos.ap-beijing.myqcloud.com/regression/load/data/basic_array_data.orc") .addProperty("format", "orc") - .addProperty("column_separator", "|")) + .addProperty("column_separator", "|") + .addProperty("force_parsing_by_standard_uri", "true")) } for(String table : basicTables) { @@ -320,7 +342,8 @@ suite("test_s3_tvf", "load_p0") { .addProperty("format", "json") .addProperty("read_json_by_line", "false") .addProperty("strip_outer_array", "true") - .addProperty("column_separator", "|")) + .addProperty("column_separator", "|") + .addProperty("force_parsing_by_standard_uri", "true")) } attributeList.add(new TvfAttribute("agg_tbl_basic_tvf", "k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18, to_bitmap(k05) as k19, HLL_HASH(k05) as k20, TO_QUANTILE_STATE(k04, 1.0) as k21, to_bitmap(k05) as kd19, HLL_HASH(k05) as kd20, TO_QUANTILE_STATE(k04, 1.0) as kd21", @@ -329,7 +352,8 @@ suite("test_s3_tvf", "load_p0") { .addProperty("format", "json") .addProperty("read_json_by_line", "false") .addProperty("strip_outer_array", "true") - .addProperty("column_separator", "|")) + .addProperty("column_separator", "|") + .addProperty("force_parsing_by_standard_uri", "true")) for(String table : arrayTables) { attributeList.add(new TvfAttribute(table, "k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17", "k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17", "", "") @@ -337,7 +361,8 @@ suite("test_s3_tvf", "load_p0") { .addProperty("format", "json") .addProperty("read_json_by_line", "false") .addProperty("strip_outer_array", "true") - .addProperty("column_separator", "|")) + .addProperty("column_separator", "|") + .addProperty("force_parsing_by_standard_uri", "true")) } for(String table : basicTables) { @@ -346,7 +371,8 @@ suite("test_s3_tvf", "load_p0") { .addProperty("format", "json") .addProperty("read_json_by_line", "true") .addProperty("strip_outer_array", "false") - .addProperty("column_separator", "|")) + .addProperty("column_separator", "|") + .addProperty("force_parsing_by_standard_uri", "true")) } attributeList.add(new TvfAttribute("agg_tbl_basic_tvf", "k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18, to_bitmap(k05) as k19, HLL_HASH(k05) as k20, TO_QUANTILE_STATE(k04, 1.0) as k21, to_bitmap(k05) as kd19, HLL_HASH(k05) as kd20, TO_QUANTILE_STATE(k04, 1.0) as kd21", @@ -355,7 +381,8 @@ suite("test_s3_tvf", "load_p0") { .addProperty("format", "json") .addProperty("read_json_by_line", "true") .addProperty("strip_outer_array", "false") - .addProperty("column_separator", "|")) + .addProperty("column_separator", "|") + .addProperty("force_parsing_by_standard_uri", "true")) for(String table : arrayTables) { attributeList.add(new TvfAttribute(table, "k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17", "k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17", "", "") @@ -363,7 +390,8 @@ suite("test_s3_tvf", "load_p0") { .addProperty("format", "json") .addProperty("read_json_by_line", "true") .addProperty("strip_outer_array", "false") - .addProperty("column_separator", "|")) + .addProperty("column_separator", "|") + .addProperty("force_parsing_by_standard_uri", "true")) } def ak = getS3AK() @@ -483,4 +511,4 @@ class TvfAttribute { properties.put(k, v) return this } -} \ No newline at end of file +}