Skip to content

Commit

Permalink
Merge branch 'master' into master-new-job
Browse files Browse the repository at this point in the history
  • Loading branch information
CalvinKirs authored Oct 10, 2024
2 parents b195bd7 + c0749bc commit 20e6f03
Show file tree
Hide file tree
Showing 1,736 changed files with 138,085 additions and 6,518 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/code-checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ jobs:
popd
export PATH="${DEFAULT_DIR}/ldb-toolchain/bin/:$(pwd)/thirdparty/installed/bin/:${PATH}"
DISABLE_BE_JAVA_EXTENSIONS=ON DO_NOT_CHECK_JAVA_ENV=ON DORIS_TOOLCHAIN=clang ENABLE_PCH=OFF OUTPUT_BE_BINARY=0 ./build.sh --be --cloud
DISABLE_BE_JAVA_EXTENSIONS=ON DO_NOT_CHECK_JAVA_ENV=ON DORIS_TOOLCHAIN=clang ENABLE_PCH=OFF OUTPUT_BE_BINARY=0 ./build.sh --be
fi
echo "should_check=${{ steps.filter.outputs.cpp_changes }}" >>${GITHUB_OUTPUT}
Expand Down
11 changes: 6 additions & 5 deletions .github/workflows/scope-label.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
---
name: Add Scope Labeler

on:
pull_request_target:
types:
- opened
- synchronize
# This action has some error, skip it temporarily
#on:
# pull_request_target:
# types:
# - opened
# - synchronize

jobs:
process:
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ thirdparty/doris-thirdparty*.tar.xz
docker/thirdparties/docker-compose/mysql/data
docker/thirdparties/docker-compose/hive/scripts/tpch1.db/
docker/thirdparties/docker-compose/hive/scripts/paimon1
docker/thirdparties/docker-compose/hive/scripts/tvf_data

fe_plugins/output
fe_plugins/**/.factorypath
Expand Down
1 change: 1 addition & 0 deletions be/src/agent/be_exec_version_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ void BeExecVersionManager::check_function_compatibility(int current_be_exec_vers
* c. change FunctionIsIPAddressInRange from AlwaysNotNullable to DependOnArguments
* d. change some agg function nullable property: PR #37215
* e. change variant serde to fix PR #38413
* f. support const column in serialize/deserialize function: PR #41175
*/
const int BeExecVersionManager::max_be_exec_version = 7;
const int BeExecVersionManager::min_be_exec_version = 0;
Expand Down
2 changes: 2 additions & 0 deletions be/src/agent/be_exec_version_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ constexpr inline int AGG_FUNCTION_NULLABLE = 5; // change some agg nullable prop
constexpr inline int VARIANT_SERDE = 6; // change variant serde to fix PR #38413
constexpr inline int AGGREGATION_2_1_VERSION =
6; // some aggregation changed the data format after this version
constexpr inline int USE_CONST_SERDE =
7; // support const column in serialize/deserialize function: PR #41175

class BeExecVersionManager {
public:
Expand Down
17 changes: 7 additions & 10 deletions be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,20 +240,12 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
master_info.network_address.hostname, master_info.network_address.port);
}

if (need_report) {
LOG(INFO) << "Master FE is changed or restarted. report tablet and disk info immediately";
_engine.notify_listeners();
}

if (master_info.__isset.meta_service_endpoint != config::is_cloud_mode()) {
LOG(WARNING) << "Detected mismatch in cloud mode configuration between FE and BE. "
<< "FE cloud mode: "
<< (master_info.__isset.meta_service_endpoint ? "true" : "false")
<< ", BE cloud mode: " << (config::is_cloud_mode() ? "true" : "false");
return Status::InvalidArgument<false>(
"fe and be do not work in same mode, fe cloud mode: {},"
" be cloud mode: {}",
master_info.__isset.meta_service_endpoint, config::is_cloud_mode());
<< ", BE cloud mode: " << (config::is_cloud_mode() ? "true" : "false")
<< ". If fe is earlier than version 3.0.2, the message can be ignored.";
}

if (master_info.__isset.meta_service_endpoint) {
Expand Down Expand Up @@ -283,6 +275,11 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
LOG(INFO) << "set config cloud_unique_id " << master_info.cloud_unique_id << " " << st;
}

if (need_report) {
LOG(INFO) << "Master FE is changed or restarted. report tablet and disk info immediately";
_engine.notify_listeners();
}

return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ Status CloudBaseCompaction::modify_rowsets() {
int64_t initiator = HashUtil::hash64(_uuid.data(), _uuid.size(), 0) &
std::numeric_limits<int64_t>::max();
RETURN_IF_ERROR(cloud_tablet()->calc_delete_bitmap_for_compaction(
_input_rowsets, _output_rowset, _rowid_conversion, compaction_type(),
_input_rowsets, _output_rowset, *_rowid_conversion, compaction_type(),
_stats.merged_rows, _stats.filtered_rows, initiator, output_rowset_delete_bitmap,
_allow_delete_in_cumu_compaction));
LOG_INFO("update delete bitmap in CloudBaseCompaction, tablet_id={}, range=[{}-{}]",
Expand Down
11 changes: 10 additions & 1 deletion be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,13 +255,22 @@ Status CloudCumulativeCompaction::modify_rowsets() {
compaction_job->add_txn_id(_output_rowset->txn_id());
compaction_job->add_output_rowset_ids(_output_rowset->rowset_id().to_string());

DBUG_EXECUTE_IF("CloudCumulativeCompaction::modify_rowsets.enable_spin_wait", {
LOG(INFO) << "CloudCumulativeCompaction::modify_rowsets.enable_spin_wait, start";
while (DebugPoints::instance()->is_enable(
"CloudCumulativeCompaction::modify_rowsets.block")) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
LOG(INFO) << "CloudCumulativeCompaction::modify_rowsets.enable_spin_wait, exit";
});

DeleteBitmapPtr output_rowset_delete_bitmap = nullptr;
int64_t initiator =
HashUtil::hash64(_uuid.data(), _uuid.size(), 0) & std::numeric_limits<int64_t>::max();
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) {
RETURN_IF_ERROR(cloud_tablet()->calc_delete_bitmap_for_compaction(
_input_rowsets, _output_rowset, _rowid_conversion, compaction_type(),
_input_rowsets, _output_rowset, *_rowid_conversion, compaction_type(),
_stats.merged_rows, _stats.filtered_rows, initiator, output_rowset_delete_bitmap,
_allow_delete_in_cumu_compaction));
LOG_INFO("update delete bitmap in CloudCumulativeCompaction, tablet_id={}, range=[{}-{}]",
Expand Down
6 changes: 4 additions & 2 deletions be/src/cloud/cloud_delete_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ Status CloudDeleteTask::execute(CloudStorageEngine& engine, const TPushReq& requ
LOG_WARNING("tablet exceeds max version num limit")
.tag("limit", config::max_tablet_version_num)
.tag("tablet_id", tablet->tablet_id());
return Status::Error<TOO_MANY_VERSION>("too many versions, versions={} tablet={}",
config::max_tablet_version_num, tablet->tablet_id());
return Status::Error<TOO_MANY_VERSION>(
"too many versions, versions={} tablet={}. Please reduce the frequency of loading "
"data or adjust the max_tablet_version_num in be.conf to a larger value.",
config::max_tablet_version_num, tablet->tablet_id());
}

// check delete condition if push for delete
Expand Down
3 changes: 2 additions & 1 deletion be/src/cloud/cloud_rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ Status CloudRowsetBuilder::check_tablet_version_count() {
if (version_count > config::max_tablet_version_num) {
return Status::Error<TOO_MANY_VERSION>(
"failed to init rowset builder. version count: {}, exceed limit: {}, "
"tablet: {}",
"tablet: {}. Please reduce the frequency of loading data or adjust the "
"max_tablet_version_num in be.conf to a larger value.",
version_count, config::max_tablet_version_num, _tablet->tablet_id());
}
return Status::OK();
Expand Down
5 changes: 3 additions & 2 deletions be/src/cloud/cloud_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "cloud/cloud_rowset_writer.h"

#include "common/status.h"
#include "io/cache/block_file_cache_factory.h"
#include "io/fs/file_system.h"
#include "olap/rowset/rowset_factory.h"
Expand All @@ -34,7 +35,7 @@ Status CloudRowsetWriter::init(const RowsetWriterContext& rowset_writer_context)
if (_context.is_local_rowset()) {
// In cloud mode, this branch implies it is an intermediate rowset for external merge sort,
// we use `global_local_filesystem` to write data to `tmp_file_dir`(see `local_segment_path`).
_context.tablet_path = io::FileCacheFactory::instance()->get_cache_path();
_context.tablet_path = io::FileCacheFactory::instance()->pick_one_cache_path();
} else {
_rowset_meta->set_remote_storage_resource(*_context.storage_resource);
}
Expand Down Expand Up @@ -93,7 +94,7 @@ Status CloudRowsetWriter::build(RowsetSharedPtr& rowset) {
// transfer 0 (PREPARED -> COMMITTED): finish writing a rowset and the rowset' meta will not be changed
// transfer 1 (PREPARED -> BEGIN_PARTIAL_UPDATE): finish writing a rowset, but may append new segments later and the rowset's meta may be changed
// transfer 2 (BEGIN_PARTIAL_UPDATE -> VISIBLE): finish adding new segments and the rowset' meta will not be changed, the rowset is visible to users
if (_context.partial_update_info && _context.partial_update_info->is_partial_update) {
if (_context.partial_update_info && _context.partial_update_info->is_partial_update()) {
_rowset_meta->set_rowset_state(BEGIN_PARTIAL_UPDATE);
} else {
_rowset_meta->set_rowset_state(COMMITTED);
Expand Down
2 changes: 2 additions & 0 deletions be/src/cloud/cloud_stream_load_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "common/logging.h"
#include "common/status.h"
#include "runtime/stream_load/stream_load_context.h"
#include "util/debug_points.h"

namespace doris {

Expand Down Expand Up @@ -96,6 +97,7 @@ Status CloudStreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) {
}

Status CloudStreamLoadExecutor::commit_txn(StreamLoadContext* ctx) {
DBUG_EXECUTE_IF("StreamLoadExecutor.commit_txn.block", DBUG_BLOCK);
// forward to fe to excute commit transaction for MoW table
if (ctx->is_mow_table() || !config::enable_stream_load_commit_txn_on_be ||
ctx->load_type == TLoadType::ROUTINE_LOAD) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx
RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::PREPARE));

if (txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update &&
if (txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update() &&
rowset_writer->num_rows() > 0) {
const auto& rowset_meta = rowset->rowset_meta();
RETURN_IF_ERROR(_engine.meta_mgr().update_tmp_rowset(*rowset_meta));
Expand Down
28 changes: 22 additions & 6 deletions be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,18 @@ Status CloudTxnDeleteBitmapCache::get_tablet_txn_info(
*publish_status = iter->second.publish_status;
*previous_publish_info = iter->second.publish_info;
}
RETURN_IF_ERROR(
get_delete_bitmap(transaction_id, tablet_id, delete_bitmap, rowset_ids, nullptr));
return Status::OK();

auto st = get_delete_bitmap(transaction_id, tablet_id, delete_bitmap, rowset_ids, nullptr);

if (st.is<ErrorCode::NOT_FOUND>()) {
// Because of the rowset_ids become empty, all delete bitmap
// will be recalculate in CalcDeleteBitmapTask
if (delete_bitmap != nullptr) {
*delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id);
}
return Status::OK();
}
return st;
}

Status CloudTxnDeleteBitmapCache::get_delete_bitmap(
Expand All @@ -95,6 +104,13 @@ Status CloudTxnDeleteBitmapCache::get_delete_bitmap(
CacheKey key(key_str);
Cache::Handle* handle = lookup(key);

DBUG_EXECUTE_IF("CloudTxnDeleteBitmapCache::get_delete_bitmap.cache_miss", {
handle = nullptr;
LOG(INFO) << "CloudTxnDeleteBitmapCache::get_delete_bitmap.cache_miss, make cache missed "
"when get delete bitmap, txn_id:"
<< transaction_id << ", tablet_id: " << tablet_id;
});

DeleteBitmapCacheValue* val =
handle == nullptr ? nullptr : reinterpret_cast<DeleteBitmapCacheValue*>(value(handle));
if (val) {
Expand All @@ -109,9 +125,9 @@ Status CloudTxnDeleteBitmapCache::get_delete_bitmap(
LOG_INFO("cache missed when get delete bitmap")
.tag("txn_id", transaction_id)
.tag("tablet_id", tablet_id);
// Because of the rowset_ids become empty, all delete bitmap
// will be recalculate in CalcDeleteBitmapTask
*delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id);
return Status::Error<ErrorCode::NOT_FOUND, false>(
"cache missed when get delete bitmap, tablet_id={}, transaction_id={}", tablet_id,
transaction_id);
}
return Status::OK();
}
Expand Down
11 changes: 11 additions & 0 deletions be/src/cloud/injection_point_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,17 @@ void register_suites() {
arg0->second = true;
});
});
// curl be_ip:http_port/api/injection_point/apply_suite?name=Segment::parse_footer:magic_number_corruption'
suite_map.emplace("Segment::parse_footer:magic_number_corruption", [] {
auto* sp = SyncPoint::get_instance();
sp->set_call_back("Segment::parse_footer:magic_number_corruption_inj", [](auto&& args) {
if (auto p = std::any_cast<uint8_t*>(args[0])) {
memset(p, 0, 12);
} else {
std::cerr << "Failed to cast std::any to uint8_t*" << std::endl;
}
});
});
}

void set_sleep(const std::string& point, HttpRequest* req) {
Expand Down
4 changes: 4 additions & 0 deletions be/src/cloud/pb_convert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ void doris_tablet_schema_to_cloud(TabletSchemaCloudPB* out, const TabletSchemaPB
out->mutable_row_store_column_unique_ids()->CopyFrom(in.row_store_column_unique_ids());
out->set_inverted_index_storage_format(in.inverted_index_storage_format());
out->set_enable_variant_flatten_nested(in.variant_enable_flatten_nested());
out->set_skip_bitmap_col_idx(in.skip_bitmap_col_idx());
}

void doris_tablet_schema_to_cloud(TabletSchemaCloudPB* out, TabletSchemaPB&& in) {
Expand Down Expand Up @@ -313,6 +314,7 @@ void doris_tablet_schema_to_cloud(TabletSchemaCloudPB* out, TabletSchemaPB&& in)
out->mutable_row_store_column_unique_ids()->Swap(in.mutable_row_store_column_unique_ids());
out->set_inverted_index_storage_format(in.inverted_index_storage_format());
out->set_enable_variant_flatten_nested(in.variant_enable_flatten_nested());
out->set_skip_bitmap_col_idx(in.skip_bitmap_col_idx());
}

TabletSchemaPB cloud_tablet_schema_to_doris(const TabletSchemaCloudPB& in) {
Expand Down Expand Up @@ -353,6 +355,7 @@ void cloud_tablet_schema_to_doris(TabletSchemaPB* out, const TabletSchemaCloudPB
out->mutable_row_store_column_unique_ids()->CopyFrom(in.row_store_column_unique_ids());
out->set_inverted_index_storage_format(in.inverted_index_storage_format());
out->set_variant_enable_flatten_nested(in.enable_variant_flatten_nested());
out->set_skip_bitmap_col_idx(in.skip_bitmap_col_idx());
}

void cloud_tablet_schema_to_doris(TabletSchemaPB* out, TabletSchemaCloudPB&& in) {
Expand Down Expand Up @@ -381,6 +384,7 @@ void cloud_tablet_schema_to_doris(TabletSchemaPB* out, TabletSchemaCloudPB&& in)
out->mutable_row_store_column_unique_ids()->Swap(in.mutable_row_store_column_unique_ids());
out->set_inverted_index_storage_format(in.inverted_index_storage_format());
out->set_variant_enable_flatten_nested(in.enable_variant_flatten_nested());
out->set_skip_bitmap_col_idx(in.skip_bitmap_col_idx());
}

TabletMetaCloudPB doris_tablet_meta_to_cloud(const TabletMetaPB& in) {
Expand Down
73 changes: 73 additions & 0 deletions be/src/common/cast_set.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <limits>
#include <type_traits>

#include "common/exception.h"
#include "common/status.h"

namespace doris {

template <typename T, typename U>
void check_cast_value(U b) {
if constexpr (std::is_unsigned_v<U>) {
if (b > std::numeric_limits<T>::max()) {
throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
"value {} cast to type {} out of range [{},{}]", b,
typeid(T).name(), std::numeric_limits<T>::min(),
std::numeric_limits<T>::max());
}
} else if constexpr (std::is_unsigned_v<T>) {
if (b < 0 || b > std::numeric_limits<T>::max()) {
throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
"value {} cast to type {} out of range [{},{}]", b,
typeid(T).name(), std::numeric_limits<T>::min(),
std::numeric_limits<T>::max());
}
} else {
if (b < std::numeric_limits<T>::min() || b > std::numeric_limits<T>::max()) {
throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
"value {} cast to type {} out of range [{},{}]", b,
typeid(T).name(), std::numeric_limits<T>::min(),
std::numeric_limits<T>::max());
}
}
}

template <typename T, typename U, bool need_check_value = true>
requires std::is_integral_v<T> && std::is_integral_v<U>
void cast_set(T& a, U b) {
if constexpr (need_check_value) {
check_cast_value<T>(b);
}
a = static_cast<T>(b);
}

template <typename T, typename U, bool need_check_value = true>
requires std::is_integral_v<T> && std::is_integral_v<U>
T cast_set(U b) {
if constexpr (need_check_value) {
check_cast_value<T>(b);
}
return static_cast<T>(b);
}

} // namespace doris
24 changes: 24 additions & 0 deletions be/src/common/compile_check_begin.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic error "-Wshorten-64-to-32"
#endif
//#include "common/compile_check_begin.h"
Loading

0 comments on commit 20e6f03

Please sign in to comment.