Skip to content

Commit

Permalink
Merge branch 'master' into gavin-improve-cloud-mgr-retry-rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
gavinchou authored Sep 1, 2024
2 parents 9bb06c2 + f87e51c commit 4d0d206
Show file tree
Hide file tree
Showing 237 changed files with 3,617 additions and 2,076 deletions.
3 changes: 0 additions & 3 deletions .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@ github:
- Clang Formatter
- CheckStyle
- P0 Regression (Doris Regression)
- P1 Regression (Doris Regression)
- External Regression (Doris External Regression)
- cloud_p1 (Doris Cloud Regression)
- cloud_p0 (Doris Cloud Regression)
- FE UT (Doris FE UT)
- BE UT (Doris BE UT)
Expand Down Expand Up @@ -114,7 +112,6 @@ github:
- Clang Formatter
- CheckStyle
- P0 Regression (Doris Regression)
- P1 Regression (Doris Regression)
- External Regression (Doris External Regression)
- FE UT (Doris FE UT)
- BE UT (Doris BE UT)
Expand Down
4 changes: 3 additions & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -971,7 +971,7 @@ DEFINE_mInt64(workload_group_scan_task_wait_timeout_ms, "10000");
DEFINE_Bool(enable_index_apply_preds_except_leafnode_of_andnode, "true");

DEFINE_mDouble(variant_ratio_of_defaults_as_sparse_column, "1");
DEFINE_mInt64(variant_threshold_rows_to_estimate_sparse_column, "1000");
DEFINE_mInt64(variant_threshold_rows_to_estimate_sparse_column, "2048");
DEFINE_mBool(variant_throw_exeception_on_invalid_json, "false");

// block file cache
Expand Down Expand Up @@ -1320,6 +1320,8 @@ DEFINE_mInt64(pipeline_task_leakage_detect_period_secs, "60");
DEFINE_mInt32(snappy_compression_block_size, "262144");
DEFINE_mInt32(lz4_compression_block_size, "262144");

DEFINE_mBool(enable_pipeline_task_leakage_detect, "false");

// clang-format off
#ifdef BE_TEST
// test s3
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1414,6 +1414,8 @@ DECLARE_mInt64(pipeline_task_leakage_detect_period_secs);
DECLARE_mInt32(snappy_compression_block_size);
DECLARE_mInt32(lz4_compression_block_size);

DECLARE_mBool(enable_pipeline_task_leakage_detect);

#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
Expand Down
6 changes: 5 additions & 1 deletion be/src/exec/schema_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,11 @@ Status SchemaScanner::get_next_block_async(RuntimeState* state) {
_opened = true;
}
bool eos = false;
_scanner_status.update(get_next_block_internal(_data_block.get(), &eos));
auto call_next_block_internal = [&]() -> Status {
RETURN_IF_CATCH_EXCEPTION(
{ return get_next_block_internal(_data_block.get(), &eos); });
};
_scanner_status.update(call_next_block_internal());
_eos = eos;
_async_thread_running = false;
_dependency->set_ready();
Expand Down
108 changes: 85 additions & 23 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,14 @@ PColumnType to_proto(PrimitiveType type) {
return PColumnType::COLUMN_TYPE_VARCHAR;
case TYPE_STRING:
return PColumnType::COLUMN_TYPE_STRING;
case TYPE_IPV4:
return PColumnType::COLUMN_TYPE_IPV4;
case TYPE_IPV6:
return PColumnType::COLUMN_TYPE_IPV6;
default:
DCHECK(false) << "Invalid type.";
throw Exception(ErrorCode::INTERNAL_ERROR,
"runtime filter meet invalid PrimitiveType type {}", int(type));
}
DCHECK(false);
return PColumnType::COLUMN_TYPE_INT;
}

// PColumnType->PrimitiveType
Expand Down Expand Up @@ -160,10 +163,14 @@ PrimitiveType to_primitive_type(PColumnType type) {
return TYPE_CHAR;
case PColumnType::COLUMN_TYPE_STRING:
return TYPE_STRING;
case PColumnType::COLUMN_TYPE_IPV4:
return TYPE_IPV4;
case PColumnType::COLUMN_TYPE_IPV6:
return TYPE_IPV6;
default:
DCHECK(false);
throw Exception(ErrorCode::INTERNAL_ERROR,
"runtime filter meet invalid PColumnType type {}", int(type));
}
return TYPE_INT;
}

// PFilterType -> RuntimeFilterType
Expand Down Expand Up @@ -554,14 +561,13 @@ class RuntimePredicateWrapper {
}

Status assign(const PInFilter* in_filter, bool contain_null) {
PrimitiveType type = to_primitive_type(in_filter->column_type());
_context->hybrid_set.reset(create_set(type));
_context->hybrid_set.reset(create_set(_column_return_type));
if (contain_null) {
_context->hybrid_set->set_null_aware(true);
_context->hybrid_set->insert((const void*)nullptr);
}

switch (type) {
switch (_column_return_type) {
case TYPE_BOOLEAN: {
batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) {
bool bool_val = column.boolval();
Expand Down Expand Up @@ -701,9 +707,27 @@ class RuntimePredicateWrapper {
});
break;
}
case TYPE_IPV4: {
batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) {
int32_t tmp = column.intval();
set->insert(&tmp);
});
break;
}
case TYPE_IPV6: {
batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) {
auto string_val = column.stringval();
StringParser::ParseResult result;
auto int128_val = StringParser::string_to_int<uint128_t>(
string_val.c_str(), string_val.length(), &result);
DCHECK(result == StringParser::PARSE_SUCCESS);
set->insert(&int128_val);
});
break;
}
default: {
return Status::InternalError("not support assign to in filter, type: " +
type_to_string(type));
type_to_string(_column_return_type));
}
}
return Status::OK();
Expand All @@ -726,15 +750,14 @@ class RuntimePredicateWrapper {
// used by shuffle runtime filter
// assign this filter by protobuf
Status assign(const PMinMaxFilter* minmax_filter, bool contain_null) {
PrimitiveType type = to_primitive_type(minmax_filter->column_type());
_context->minmax_func.reset(create_minmax_filter(type));
_context->minmax_func.reset(create_minmax_filter(_column_return_type));

if (contain_null) {
_context->minmax_func->set_null_aware(true);
_context->minmax_func->set_contain_null();
}

switch (type) {
switch (_column_return_type) {
case TYPE_BOOLEAN: {
bool min_val = minmax_filter->min_val().boolval();
bool max_val = minmax_filter->max_val().boolval();
Expand Down Expand Up @@ -850,6 +873,23 @@ class RuntimePredicateWrapper {
auto max_val_ref = minmax_filter->max_val().stringval();
return _context->minmax_func->assign(&min_val_ref, &max_val_ref);
}
case TYPE_IPV4: {
int tmp_min = minmax_filter->min_val().intval();
int tmp_max = minmax_filter->max_val().intval();
return _context->minmax_func->assign(&tmp_min, &tmp_max);
}
case TYPE_IPV6: {
auto min_string_val = minmax_filter->min_val().stringval();
auto max_string_val = minmax_filter->max_val().stringval();
StringParser::ParseResult result;
auto min_val = StringParser::string_to_int<uint128_t>(min_string_val.c_str(),
min_string_val.length(), &result);
DCHECK(result == StringParser::PARSE_SUCCESS);
auto max_val = StringParser::string_to_int<uint128_t>(max_string_val.c_str(),
max_string_val.length(), &result);
DCHECK(result == StringParser::PARSE_SUCCESS);
return _context->minmax_func->assign(&min_val, &max_val);
}
default:
break;
}
Expand Down Expand Up @@ -1140,7 +1180,7 @@ Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr) {
merge_filter_request->set_filter_id(_filter_id);
merge_filter_request->set_is_pipeline(true);
auto column_type = _wrapper->column_type();
merge_filter_request->set_column_type(to_proto(column_type));
RETURN_IF_CATCH_EXCEPTION(merge_filter_request->set_column_type(to_proto(column_type)));
merge_filter_callback->cntl_->set_timeout_ms(wait_time_ms());

if (get_ignored()) {
Expand Down Expand Up @@ -1413,13 +1453,10 @@ template <class T>
Status IRuntimeFilter::_create_wrapper(const T* param,
std::unique_ptr<RuntimePredicateWrapper>* wrapper) {
int filter_type = param->request->filter_type();
PrimitiveType column_type = PrimitiveType::INVALID_TYPE;
if (param->request->has_in_filter()) {
column_type = to_primitive_type(param->request->in_filter().column_type());
}
if (param->request->has_column_type()) {
column_type = to_primitive_type(param->request->column_type());
if (!param->request->has_column_type()) {
return Status::InternalError("unknown filter column type");
}
PrimitiveType column_type = to_primitive_type(param->request->column_type());
*wrapper = std::make_unique<RuntimePredicateWrapper>(column_type, get_type(filter_type),
param->request->filter_id());

Expand Down Expand Up @@ -1639,9 +1676,21 @@ void IRuntimeFilter::to_protobuf(PInFilter* filter) {
});
return;
}
case TYPE_IPV4: {
batch_copy<IPv4>(filter, it, [](PColumnValue* column, const IPv4* value) {
column->set_intval(*reinterpret_cast<const int32_t*>(value));
});
return;
}
case TYPE_IPV6: {
batch_copy<IPv6>(filter, it, [](PColumnValue* column, const IPv6* value) {
column->set_stringval(LargeIntValue::to_string(*value));
});
return;
}
default: {
DCHECK(false) << "unknown type";
break;
throw Exception(ErrorCode::INTERNAL_ERROR,
"runtime filter meet invalid PrimitiveType type {}", int(column_type));
}
}
}
Expand Down Expand Up @@ -1755,9 +1804,22 @@ void IRuntimeFilter::to_protobuf(PMinMaxFilter* filter) {
filter->mutable_max_val()->set_stringval(*max_string_value);
break;
}
case TYPE_IPV4: {
filter->mutable_min_val()->set_intval(*reinterpret_cast<const int32_t*>(min_data));
filter->mutable_max_val()->set_intval(*reinterpret_cast<const int32_t*>(max_data));
return;
}
case TYPE_IPV6: {
filter->mutable_min_val()->set_stringval(
LargeIntValue::to_string(*reinterpret_cast<const uint128_t*>(min_data)));
filter->mutable_max_val()->set_stringval(
LargeIntValue::to_string(*reinterpret_cast<const uint128_t*>(max_data)));
return;
}
default: {
DCHECK(false) << "unknown type";
break;
throw Exception(ErrorCode::INTERNAL_ERROR,
"runtime filter meet invalid PrimitiveType type {}",
int(_wrapper->column_type()));
}
}
}
Expand Down
58 changes: 30 additions & 28 deletions be/src/http/action/http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,37 +237,39 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) {
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());

int64_t start_read_data_time = MonotonicNanos();
Status st = ctx->allocate_schema_buffer();
if (!st.ok()) {
ctx->status = st;
return;
}
while (evbuffer_get_length(evbuf) > 0) {
try {
auto bb = ByteBuffer::allocate(128 * 1024);
auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
bb->pos = remove_bytes;
bb->flip();
auto st = ctx->body_sink->append(bb);
// schema_buffer stores 1M of data for parsing column information
// need to determine whether to cache for the first time
if (ctx->is_read_schema) {
if (ctx->schema_buffer()->pos + remove_bytes < config::stream_tvf_buffer_size) {
ctx->schema_buffer()->put_bytes(bb->ptr, remove_bytes);
} else {
LOG(INFO) << "use a portion of data to request fe to obtain column information";
ctx->is_read_schema = false;
ctx->status = process_put(req, ctx);
}
}
if (!st.ok() && !ctx->status.ok()) {
LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief();
ctx->status = st;
return;
}
ctx->receive_bytes += remove_bytes;
} catch (const doris::Exception& e) {
if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) {
ctx->status = Status::MemoryLimitExceeded(
fmt::format("PreCatch error code:{}, {}, ", e.code(), e.to_string()));
ByteBufferPtr bb;
st = ByteBuffer::allocate(128 * 1024, &bb);
if (!st.ok()) {
ctx->status = st;
return;
}
auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
bb->pos = remove_bytes;
bb->flip();
st = ctx->body_sink->append(bb);
// schema_buffer stores 1M of data for parsing column information
// need to determine whether to cache for the first time
if (ctx->is_read_schema) {
if (ctx->schema_buffer()->pos + remove_bytes < config::stream_tvf_buffer_size) {
ctx->schema_buffer()->put_bytes(bb->ptr, remove_bytes);
} else {
LOG(INFO) << "use a portion of data to request fe to obtain column information";
ctx->is_read_schema = false;
ctx->status = process_put(req, ctx);
}
ctx->status = Status::Error<false>(e.code(), e.to_string());
}
if (!st.ok()) {
LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief();
ctx->status = st;
return;
}
ctx->receive_bytes += remove_bytes;
}
// after all the data has been read and it has not reached 1M, it will execute here
if (ctx->is_read_schema) {
Expand Down
35 changes: 16 additions & 19 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -344,25 +344,22 @@ void StreamLoadAction::on_chunk_data(HttpRequest* req) {

int64_t start_read_data_time = MonotonicNanos();
while (evbuffer_get_length(evbuf) > 0) {
try {
auto bb = ByteBuffer::allocate(128 * 1024);
auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
bb->pos = remove_bytes;
bb->flip();
auto st = ctx->body_sink->append(bb);
if (!st.ok()) {
LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief();
ctx->status = st;
return;
}
ctx->receive_bytes += remove_bytes;
} catch (const doris::Exception& e) {
if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) {
ctx->status = Status::MemoryLimitExceeded(
fmt::format("PreCatch error code:{}, {}, ", e.code(), e.to_string()));
}
ctx->status = Status::Error<false>(e.code(), e.to_string());
}
ByteBufferPtr bb;
Status st = ByteBuffer::allocate(128 * 1024, &bb);
if (!st.ok()) {
ctx->status = st;
return;
}
auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
bb->pos = remove_bytes;
bb->flip();
st = ctx->body_sink->append(bb);
if (!st.ok()) {
LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief();
ctx->status = st;
return;
}
ctx->receive_bytes += remove_bytes;
}
int64_t read_data_time = MonotonicNanos() - start_read_data_time;
int64_t last_receive_and_read_data_cost_nanos = ctx->receive_and_read_data_cost_nanos;
Expand Down
1 change: 1 addition & 0 deletions be/src/io/file_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderS
return Status::InternalError("unknown stream load id: {}", UniqueId(load_id).to_string());
}
if (need_schema) {
RETURN_IF_ERROR(stream_load_ctx->allocate_schema_buffer());
// Here, a portion of the data is processed to parse column information
auto pipe = std::make_shared<io::StreamLoadPipe>(
io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */,
Expand Down
4 changes: 2 additions & 2 deletions be/src/io/fs/stream_load_pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ Status StreamLoadPipe::read_one_message(std::unique_ptr<uint8_t[]>* data, size_t
Status StreamLoadPipe::append_and_flush(const char* data, size_t size, size_t proto_byte_size) {
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
ByteBufferPtr buf;
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(ByteBuffer::create_and_allocate(buf, 128 * 1024));
RETURN_IF_ERROR(ByteBuffer::allocate(128 * 1024, &buf));
buf->put_bytes(data, size);
buf->flip();
return _append(buf, proto_byte_size);
Expand Down Expand Up @@ -148,7 +148,7 @@ Status StreamLoadPipe::append(const char* data, size_t size) {
size_t chunk_size = std::max(_min_chunk_size, size - pos);
chunk_size = BitUtil::RoundUpToPowerOfTwo(chunk_size);
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(ByteBuffer::create_and_allocate(_write_buf, chunk_size));
RETURN_IF_ERROR(ByteBuffer::allocate(chunk_size, &_write_buf));
_write_buf->put_bytes(data + pos, size - pos);
return Status::OK();
}
Expand Down
Loading

0 comments on commit 4d0d206

Please sign in to comment.