Skip to content

Commit

Permalink
Merge branch 'branch-2.0' into backoff_timeout_2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
sollhui authored Mar 21, 2024
2 parents 66d7d5e + 11f616f commit c24786c
Show file tree
Hide file tree
Showing 55 changed files with 1,397 additions and 154 deletions.
6 changes: 4 additions & 2 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -417,9 +417,11 @@ Status VOlapTablePartitionParam::_create_partition_key(const TExprNode& t_expr,
}
column->insert_data(reinterpret_cast<const char*>(&dt), 0);
} else if (TypeDescriptor::from_thrift(t_expr.type).is_datetime_v2_type()) {
vectorized::DateV2Value<doris::vectorized::DateTimeV2ValueType> dt;
vectorized::DateV2Value<vectorized::DateTimeV2ValueType> dt;
const int32_t scale =
t_expr.type.types.empty() ? -1 : t_expr.type.types.front().scalar_type.scale;
if (!dt.from_date_str(t_expr.date_literal.value.c_str(),
t_expr.date_literal.value.size())) {
t_expr.date_literal.value.size(), scale)) {
std::stringstream ss;
ss << "invalid date literal in partition column, date=" << t_expr.date_literal;
return Status::InternalError(ss.str());
Expand Down
9 changes: 5 additions & 4 deletions be/src/io/fs/s3_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,11 @@ Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_rea
}
auto outcome = client->GetObject(request);
if (!outcome.IsSuccess()) {
return Status::IOError("failed to read from {}: {}, exception {}, error code {}",
_path.native(), outcome.GetError().GetMessage(),
outcome.GetError().GetExceptionName(),
outcome.GetError().GetResponseCode());
return Status::IOError(
"failed to read from {}: {}, exception {}, error code {}, request id {}",
_path.native(), outcome.GetError().GetMessage(),
outcome.GetError().GetExceptionName(), outcome.GetError().GetResponseCode(),
outcome.GetError().GetRequestId());
}
*bytes_read = outcome.GetResult().GetContentLength();
if (*bytes_read != bytes_req) {
Expand Down
4 changes: 2 additions & 2 deletions be/src/io/fs/s3_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -526,10 +526,10 @@ Status S3FileSystem::get_key(const Path& path, std::string* key) const {

template <typename AwsOutcome>
std::string S3FileSystem::error_msg(const std::string& key, const AwsOutcome& outcome) const {
return fmt::format("(endpoint: {}, bucket: {}, key:{}, {}), {}, error code {}",
return fmt::format("(endpoint: {}, bucket: {}, key:{}, {}), {}, error code {}, request id {}",
_s3_conf.endpoint, _s3_conf.bucket, key,
outcome.GetError().GetExceptionName(), outcome.GetError().GetMessage(),
outcome.GetError().GetResponseCode());
outcome.GetError().GetResponseCode(), outcome.GetError().GetRequestId());
}

std::string S3FileSystem::error_msg(const std::string& key, const std::string& err) const {
Expand Down
26 changes: 16 additions & 10 deletions be/src/io/fs/s3_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,10 @@ Status S3FileWriter::_create_multi_upload_request() {
}
return Status::IOError(
"failed to create multipart upload(bucket={}, key={}, upload_id={}): {}, exception {}, "
"error code {}",
"error code {}, request id {}",
_bucket, _path.native(), _upload_id, outcome.GetError().GetMessage(),
outcome.GetError().GetExceptionName(), outcome.GetError().GetResponseCode());
outcome.GetError().GetExceptionName(), outcome.GetError().GetResponseCode(),
outcome.GetError().GetRequestId());
}

void S3FileWriter::_wait_until_finish(std::string_view task_name) {
Expand Down Expand Up @@ -174,9 +175,10 @@ Status S3FileWriter::abort() {
}
return Status::IOError(
"failed to abort multipart upload(bucket={}, key={}, upload_id={}): {}, exception {}, "
"error code {}",
"error code {}, request id {}",
_bucket, _path.native(), _upload_id, outcome.GetError().GetMessage(),
outcome.GetError().GetExceptionName(), outcome.GetError().GetResponseCode());
outcome.GetError().GetExceptionName(), outcome.GetError().GetResponseCode(),
outcome.GetError().GetRequestId());
}

Status S3FileWriter::close() {
Expand Down Expand Up @@ -306,11 +308,12 @@ void S3FileWriter::_upload_one_part(int64_t part_num, S3FileBuffer& buf) {
if (!upload_part_outcome.IsSuccess()) {
auto s = Status::IOError(
"failed to upload part (bucket={}, key={}, part_num={}, up_load_id={}): {}, "
"exception {}, error code {}",
"exception {}, error code {}, request id {}",
_bucket, _path.native(), part_num, _upload_id,
upload_part_outcome.GetError().GetMessage(),
upload_part_outcome.GetError().GetExceptionName(),
upload_part_outcome.GetError().GetResponseCode());
upload_part_outcome.GetError().GetResponseCode(),
upload_part_outcome.GetError().GetRequestId());
LOG_WARNING(s.to_string());
buf._on_failed(s);
return;
Expand Down Expand Up @@ -357,10 +360,11 @@ Status S3FileWriter::_complete() {
if (!compute_outcome.IsSuccess()) {
auto s = Status::IOError(
"failed to create complete multi part upload (bucket={}, key={}): {}, exception "
"{}, error code {}",
"{}, error code {}, request id {}",
_bucket, _path.native(), compute_outcome.GetError().GetMessage(),
compute_outcome.GetError().GetExceptionName(),
compute_outcome.GetError().GetResponseCode());
compute_outcome.GetError().GetResponseCode(),
compute_outcome.GetError().GetRequestId());
LOG_WARNING(s.to_string());
return s;
}
Expand Down Expand Up @@ -399,9 +403,11 @@ void S3FileWriter::_put_object(S3FileBuffer& buf) {
auto response = _client->PutObject(request);
if (!response.IsSuccess()) {
_st = Status::InternalError(
"failed to put object (bucket={}, key={}), Error: [{}:{}, responseCode:{}]",
"failed to put object (bucket={}, key={}), Error: [{}:{}, responseCode:{}, request "
"id:{}]",
_bucket, _path.native(), response.GetError().GetExceptionName(),
response.GetError().GetMessage(), response.GetError().GetResponseCode());
response.GetError().GetMessage(), response.GetError().GetResponseCode(),
response.GetError().GetRequestId());
LOG(WARNING) << _st;
buf._on_failed(_st);
return;
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/iterators.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class StorageReadOptions {
int block_row_max = 4096 - 32; // see https://github.com/apache/doris/pull/11816

TabletSchemaSPtr tablet_schema = nullptr;
bool enable_unique_key_merge_on_write = false;
bool record_rowids = false;
// flag for enable topn opt
bool use_topn_opt = false;
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/beta_rowset_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
}
_read_options.use_page_cache = _read_context->use_page_cache;
_read_options.tablet_schema = _read_context->tablet_schema;
_read_options.enable_unique_key_merge_on_write =
_read_context->enable_unique_key_merge_on_write;
_read_options.record_rowids = _read_context->record_rowids;
_read_options.use_topn_opt = _read_context->use_topn_opt;
_read_options.read_orderby_key_reverse = _read_context->read_orderby_key_reverse;
Expand Down
6 changes: 6 additions & 0 deletions be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,12 @@ Status SegmentIterator::_apply_inverted_index_on_block_column_predicate(
}

bool SegmentIterator::_need_read_data(ColumnId cid) {
// only support DUP_KEYS and UNIQUE_KEYS with MOW
if (!((_opts.tablet_schema->keys_type() == KeysType::DUP_KEYS ||
(_opts.tablet_schema->keys_type() == KeysType::UNIQUE_KEYS &&
_opts.enable_unique_key_merge_on_write)))) {
return true;
}
// if there is delete predicate, we always need to read data
if (_opts.delete_condition_predicates->num_of_column_predicate() > 0) {
return true;
Expand Down
59 changes: 59 additions & 0 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <stddef.h>
#include <stdint.h>
#include <sys/stat.h>
#include <vec/exec/vjdbc_connector.h>

#include <algorithm>
#include <exception>
Expand Down Expand Up @@ -683,6 +684,64 @@ void PInternalServiceImpl::tablet_fetch_data(google::protobuf::RpcController* co
}
}

void PInternalServiceImpl::test_jdbc_connection(google::protobuf::RpcController* controller,
const PJdbcTestConnectionRequest* request,
PJdbcTestConnectionResult* result,
google::protobuf::Closure* done) {
bool ret = _heavy_work_pool.try_offer([request, result, done]() {
VLOG_RPC << "test jdbc connection";
brpc::ClosureGuard closure_guard(done);
TTableDescriptor table_desc;
vectorized::JdbcConnectorParam jdbc_param;
Status st = Status::OK();
{
const uint8_t* buf = (const uint8_t*)request->jdbc_table().data();
uint32_t len = request->jdbc_table().size();
st = deserialize_thrift_msg(buf, &len, false, &table_desc);
if (!st.ok()) {
LOG(WARNING) << "test jdbc connection failed, errmsg=" << st;
st.to_protobuf(result->mutable_status());
return;
}
}
TJdbcTable jdbc_table = (table_desc.jdbcTable);
jdbc_param.catalog_id = jdbc_table.catalog_id;
jdbc_param.driver_class = jdbc_table.jdbc_driver_class;
jdbc_param.driver_path = jdbc_table.jdbc_driver_url;
jdbc_param.driver_checksum = jdbc_table.jdbc_driver_checksum;
jdbc_param.jdbc_url = jdbc_table.jdbc_url;
jdbc_param.user = jdbc_table.jdbc_user;
jdbc_param.passwd = jdbc_table.jdbc_password;
jdbc_param.query_string = request->query_str();
jdbc_param.table_type = static_cast<TOdbcTableType::type>(request->jdbc_table_type());
jdbc_param.connection_pool_min_size = jdbc_table.connection_pool_min_size;
jdbc_param.connection_pool_max_size = jdbc_table.connection_pool_max_size;
jdbc_param.connection_pool_max_life_time = jdbc_table.connection_pool_max_life_time;
jdbc_param.connection_pool_max_wait_time = jdbc_table.connection_pool_max_wait_time;
jdbc_param.connection_pool_keep_alive = jdbc_table.connection_pool_keep_alive;

std::unique_ptr<vectorized::JdbcConnector> jdbc_connector;
jdbc_connector.reset(new (std::nothrow) vectorized::JdbcConnector(jdbc_param));

st = jdbc_connector->test_connection();
st.to_protobuf(result->mutable_status());

Status clean_st = jdbc_connector->clean_datasource();
if (!clean_st.ok()) {
LOG(WARNING) << "Failed to clean JDBC datasource: " << clean_st.msg();
}
Status close_st = jdbc_connector->close();
if (!close_st.ok()) {
LOG(WARNING) << "Failed to close JDBC connector: " << close_st.msg();
}
});

if (!ret) {
offer_failed(result, done, _heavy_work_pool);
return;
}
}

void PInternalServiceImpl::get_column_ids_by_tablet_ids(google::protobuf::RpcController* controller,
const PFetchColIdsRequest* request,
PFetchColIdsResponse* response,
Expand Down
5 changes: 5 additions & 0 deletions be/src/service/internal_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,11 @@ class PInternalServiceImpl : public PBackendService {
void glob(google::protobuf::RpcController* controller, const PGlobRequest* request,
PGlobResponse* response, google::protobuf::Closure* done) override;

void test_jdbc_connection(google::protobuf::RpcController* controller,
const PJdbcTestConnectionRequest* request,
PJdbcTestConnectionResult* result,
google::protobuf::Closure* done) override;

private:
void _exec_plan_fragment_in_pthread(google::protobuf::RpcController* controller,
const PExecPlanFragmentRequest* request,
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/data_types/data_type_time_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,9 @@ void DataTypeDateTimeV2::to_string(const IColumn& column, size_t row_num,
}

Status DataTypeDateTimeV2::from_string(ReadBuffer& rb, IColumn* column) const {
auto* column_data = static_cast<ColumnUInt64*>(column);
auto* column_data = assert_cast<ColumnUInt64*>(column);
UInt64 val = 0;
if (!read_datetime_v2_text_impl<UInt64>(val, rb)) {
if (!read_datetime_v2_text_impl<UInt64>(val, rb, _scale)) {
return Status::InvalidArgument("parse date fail, string: '{}'",
std::string(rb.position(), rb.count()).c_str());
}
Expand Down
5 changes: 4 additions & 1 deletion be/src/vec/data_types/data_type_time_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,10 @@ class DataTypeDateTimeV2 final : public DataTypeNumberBase<UInt64> {

Field get_field(const TExprNode& node) const override {
DateV2Value<DateTimeV2ValueType> value;
if (value.from_date_str(node.date_literal.value.c_str(), node.date_literal.value.size())) {
const int32_t scale =
node.type.types.empty() ? -1 : node.type.types.front().scalar_type.scale;
if (value.from_date_str(node.date_literal.value.c_str(), node.date_literal.value.size(),
scale)) {
return value.to_date_int_val();
} else {
throw doris::Exception(doris::ErrorCode::INVALID_ARGUMENT,
Expand Down
6 changes: 3 additions & 3 deletions be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ Status DataTypeDateTimeV2SerDe::deserialize_one_cell_from_json(IColumn& column,
}

} else if (ReadBuffer rb(slice.data, slice.size);
!read_datetime_v2_text_impl<UInt64>(val, rb)) {
return Status::InvalidDataFormat("parse date fail, string: '{}'",
std::string(rb.position(), rb.count()).c_str());
!read_datetime_v2_text_impl<UInt64>(val, rb, scale)) {
return Status::InvalidArgument("parse date fail, string: '{}'",
std::string(rb.position(), rb.count()).c_str());
}
column_data.insert_value(val);
return Status::OK();
Expand Down
27 changes: 26 additions & 1 deletion be/src/vec/exec/vjdbc_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,11 @@ Status JdbcConnector::open(RuntimeState* state, bool read) {
ctor_params.__set_jdbc_password(_conn_param.passwd);
ctor_params.__set_jdbc_driver_class(_conn_param.driver_class);
ctor_params.__set_driver_path(local_location);
ctor_params.__set_batch_size(read ? state->batch_size() : 0);
if (state == nullptr) {
ctor_params.__set_batch_size(read ? 1 : 0);
} else {
ctor_params.__set_batch_size(read ? state->batch_size() : 0);
}
ctor_params.__set_op(read ? TJdbcOperation::READ : TJdbcOperation::WRITE);
ctor_params.__set_table_type(_conn_param.table_type);
ctor_params.__set_connection_pool_min_size(_conn_param.connection_pool_min_size);
Expand Down Expand Up @@ -185,6 +189,23 @@ Status JdbcConnector::open(RuntimeState* state, bool read) {
return Status::OK();
}

Status JdbcConnector::test_connection() {
RETURN_IF_ERROR(open(nullptr, true));

JNIEnv* env = nullptr;
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));

env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_test_connection_id);
return JniUtil::GetJniExceptionMsg(env);
}

Status JdbcConnector::clean_datasource() {
JNIEnv* env = nullptr;
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_clean_datasource_id);
return JniUtil::GetJniExceptionMsg(env);
}

Status JdbcConnector::query() {
if (!_is_open) {
return Status::InternalError("Query before open of JdbcConnector.");
Expand Down Expand Up @@ -840,6 +861,10 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) {
JDBC_EXECUTOR_TRANSACTION_SIGNATURE, _executor_abort_trans_id));
RETURN_IF_ERROR(register_id(_executor_clazz, "getResultColumnTypeNames",
JDBC_EXECUTOR_GET_TYPES_SIGNATURE, _executor_get_types_id));
RETURN_IF_ERROR(
register_id(_executor_clazz, "testConnection", "()V", _executor_test_connection_id));
RETURN_IF_ERROR(
register_id(_executor_clazz, "cleanDataSource", "()V", _executor_clean_datasource_id));
return Status::OK();
}

Expand Down
5 changes: 5 additions & 0 deletions be/src/vec/exec/vjdbc_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ class JdbcConnector : public TableConnector {

Status close() override;

Status test_connection();
Status clean_datasource();

private:
Status _register_func_id(JNIEnv* env);
Status _check_column_type();
Expand Down Expand Up @@ -165,6 +168,8 @@ class JdbcConnector : public TableConnector {
jmethodID _executor_begin_trans_id;
jmethodID _executor_finish_trans_id;
jmethodID _executor_abort_trans_id;
jmethodID _executor_test_connection_id;
jmethodID _executor_clean_datasource_id;
std::map<int, int> _map_column_idx_to_cast_idx;
std::vector<DataTypePtr> _input_array_string_types;
std::vector<MutableColumnPtr>
Expand Down
Loading

0 comments on commit c24786c

Please sign in to comment.