Skip to content

Commit

Permalink
Merge branch 'master' into 0716-source
Browse files Browse the repository at this point in the history
  • Loading branch information
hello-stephen authored Jul 26, 2024
2 parents 2368ea2 + 9741887 commit 3fe4364
Show file tree
Hide file tree
Showing 4,032 changed files with 162,438 additions and 13,492 deletions.
The diff you're trying to view is too large. We only load the first 3000 changed files.
1 change: 1 addition & 0 deletions .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ github:
- P1 Regression (Doris Regression)
- External Regression (Doris External Regression)
- cloud_p1 (Doris Cloud Regression)
- cloud_p0 (Doris Cloud Regression)
- FE UT (Doris FE UT)
- BE UT (Doris BE UT)
- Build Broker
Expand Down
47 changes: 47 additions & 0 deletions .github/workflows/lfs-warning.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
---
name: 'Check Large File'

on: [push, pull_request_target]

jobs:
large-file-checker:
name: "Check large file"
runs-on: ubuntu-latest
steps:
- name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )"
uses: actions/checkout@v3
with:
persist-credentials: false
submodules: recursive

- name: "Checkout lfs-warning commit"
run: |
rm -rf ./.github/actions/lfs-warning
git clone https://github.com/ppremk/lfs-warning .github/actions/lfs-warning
pushd .github/actions/lfs-warning &>/dev/null
git checkout 4b98a8a5e6c429c23c34eee02d71553bca216425
popd &>/dev/null
- name: "Check Large File"
uses: ./.github/actions/lfs-warning
with:
token: ${{ secrets.GITHUB_TOKEN }}
filesizelimit: 1MB

7 changes: 1 addition & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,12 @@ under the License.
[![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html)
[![GitHub release](https://img.shields.io/github/release/apache/doris.svg)](https://github.com/apache/doris/releases)
[![OSSRank](https://shields.io/endpoint?url=https://ossrank.com/shield/516)](https://ossrank.com/p/516)
[![Jenkins Vec](https://img.shields.io/jenkins/tests?compact_message&jobUrl=https://ci-builds.apache.org/job/Doris/job/doris_daily_enable_vectorized&label=VectorizedEngine)](https://ci-builds.apache.org/job/Doris/job/doris_daily_enable_vectorized)
[![Total Line](https://img.shields.io/badge/Total_Line-GitHub-blue)]((https://github.com/apache/doris))
[![Join the chat at https://gitter.im/apache-doris/Lobby](https://badges.gitter.im/apache-doris/Lobby.svg)](https://gitter.im/apache-doris/Lobby?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
[![Commit activity](https://img.shields.io/github/commit-activity/m/apache/doris)](https://github.com/apache/doris/commits/master/)
[![EN doc](https://img.shields.io/badge/Docs-English-blue.svg)](https://doris.apache.org/docs/get-starting/quick-start)
[![CN doc](https://img.shields.io/badge/文档-中文版-blue.svg)](https://doris.apache.org/zh-CN/docs/get-starting/quick-start/)



<div>


[![Official Website](<https://img.shields.io/badge/-Visit%20the%20Official%20Website%20%E2%86%92-rgb(15,214,106)?style=for-the-badge>)](https://doris.apache.org/)
[![Quick Download](<https://img.shields.io/badge/-Quick%20%20Download%20%E2%86%92-rgb(66,56,255)?style=for-the-badge>)](https://doris.apache.org/download)

Expand Down
28 changes: 16 additions & 12 deletions be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,14 @@ else()
find_package(Boost ${BOOST_VERSION} COMPONENTS system container)
endif()

# Set if use libazure or not
option(BUILD_AZURE "ON for building azure support for BE or OFF for not" OFF)
message(STATUS "build azure: ${BUILD_AZURE}")
if(BUILD_AZURE STREQUAL "ON")
add_definitions(-DUSE_AZURE)
endif()


set(GPERFTOOLS_HOME "${THIRDPARTY_DIR}/gperftools")

include (cmake/thirdparty.cmake)
Expand Down Expand Up @@ -358,40 +366,36 @@ if (USE_UNWIND)
endif()
endif()

if (ENABLE_STACKTRACE)
add_definitions(-DENABLE_STACKTRACE)
endif()

if (USE_DWARF)
add_compile_options(-gdwarf-5)
endif()

# For CMAKE_BUILD_TYPE=Debug
if (OS_MACOSX AND ARCH_ARM)
# Using -O0 may meet ARM64 branch out of range errors when linking with tcmalloc.
set(CXX_FLAGS_DEBUG "${CXX_GCC_FLAGS} -Og")
set(CXX_FLAGS_DEBUG "-Og")
else()
set(CXX_FLAGS_DEBUG "${CXX_GCC_FLAGS} -O0")
set(CXX_FLAGS_DEBUG "-O0")
endif()

# For CMAKE_BUILD_TYPE=Release
# -O3: Enable all compiler optimizations
# -DNDEBUG: Turn off dchecks/asserts/debug only code.
set(CXX_FLAGS_RELEASE "${CXX_GCC_FLAGS} -O3 -DNDEBUG")
set(CXX_FLAGS_ASAN "${CXX_GCC_FLAGS} -O0 -fsanitize=address -fsanitize=undefined -fno-strict-aliasing -fno-sanitize=alignment,signed-integer-overflow,float-cast-overflow -DUNDEFINED_BEHAVIOR_SANITIZER -DADDRESS_SANITIZER")
set(CXX_FLAGS_LSAN "${CXX_GCC_FLAGS} -O0 -fsanitize=leak -DLEAK_SANITIZER")
set(CXX_FLAGS_RELEASE "-O3 -DNDEBUG")
set(CXX_FLAGS_ASAN "-O0 -fsanitize=address -fsanitize=undefined -fno-strict-aliasing -fno-sanitize=alignment,signed-integer-overflow,float-cast-overflow -DUNDEFINED_BEHAVIOR_SANITIZER -DADDRESS_SANITIZER")
set(CXX_FLAGS_LSAN "-O0 -fsanitize=leak -DLEAK_SANITIZER")
## Use for BE-UT
set(CXX_FLAGS_ASAN_UT "${CXX_GCC_FLAGS} -O0 -fsanitize=address -DADDRESS_SANITIZER")
set(CXX_FLAGS_ASAN_UT "-O0 -fsanitize=address -DADDRESS_SANITIZER")

# Set the flags to the undefined behavior sanitizer, also known as "ubsan"
# Turn on sanitizer and debug symbols to get stack traces:
set(CXX_FLAGS_UBSAN "${CXX_GCC_FLAGS} -O0 -fno-wrapv -mcmodel=medium -fsanitize=undefined -DUNDEFINED_BEHAVIOR_SANITIZER")
set(CXX_FLAGS_UBSAN "-O0 -fno-wrapv -mcmodel=medium -fsanitize=undefined -DUNDEFINED_BEHAVIOR_SANITIZER")

# Set the flags to the thread sanitizer, also known as "tsan"
# Turn on sanitizer and debug symbols to get stack traces:
# Use -Wno-builtin-declaration-mismatch to mute warnings like "new declaration ‘__tsan_atomic16 __tsan_atomic16_fetch_nand(..."
# If use -O0 to compile, BE will stack overflow when start. https://github.com/apache/doris/issues/8868
set(CXX_FLAGS_TSAN "${CXX_GCC_FLAGS} -O1 -fsanitize=thread -DTHREAD_SANITIZER -Wno-missing-declarations")
set(CXX_FLAGS_TSAN "-O1 -fsanitize=thread -DTHREAD_SANITIZER -Wno-missing-declarations")

# Set compile flags based on the build type.
if ("${CMAKE_BUILD_TYPE}" STREQUAL "DEBUG")
Expand Down
10 changes: 6 additions & 4 deletions be/cmake/thirdparty.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,12 @@ if (NOT OS_MACOSX)
add_thirdparty(aws-s2n LIBNAME "lib/libs2n.a")
endif()

add_thirdparty(azure-core)
add_thirdparty(azure-identity)
add_thirdparty(azure-storage-blobs)
add_thirdparty(azure-storage-common)
if(BUILD_AZURE STREQUAL "ON")
add_thirdparty(azure-core)
add_thirdparty(azure-identity)
add_thirdparty(azure-storage-blobs)
add_thirdparty(azure-storage-common)
endif()

add_thirdparty(minizip LIB64)
add_thirdparty(simdjson LIB64)
Expand Down
6 changes: 5 additions & 1 deletion be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include <sstream>
#include <string>
#include <thread>
#include <type_traits>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -1384,9 +1385,12 @@ void update_s3_resource(const TStorageResource& param, io::RemoteFileSystemSPtr
auto client = static_cast<io::S3FileSystem*>(existed_fs.get())->client_holder();
auto new_s3_conf = S3Conf::get_s3_conf(param.s3_storage_param);
S3ClientConf conf {
.endpoint {},
.region {},
.ak = std::move(new_s3_conf.client_conf.ak),
.sk = std::move(new_s3_conf.client_conf.sk),
.token = std::move(new_s3_conf.client_conf.token),
.bucket {},
.provider = new_s3_conf.client_conf.provider,
};
st = client->reset(conf);
Expand Down Expand Up @@ -1789,7 +1793,7 @@ void PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest&
if (tablet->exceed_version_limit(config::max_tablet_version_num * 2 / 3) &&
published_count % 20 == 0) {
auto st = _engine.submit_compaction_task(
tablet, CompactionType::CUMULATIVE_COMPACTION, true);
tablet, CompactionType::CUMULATIVE_COMPACTION, true, false);
if (!st.ok()) [[unlikely]] {
LOG(WARNING) << "trigger compaction failed, tablet_id=" << tablet_id
<< ", published=" << published_count << " : " << st;
Expand Down
3 changes: 3 additions & 0 deletions be/src/agent/workload_group_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topi
// 4 create and update task scheduler
wg->upsert_task_scheduler(&workload_group_info, _exec_env);

// 5 upsert io throttle
wg->upsert_scan_io_throttle(&workload_group_info);

LOG(INFO) << "[topic_publish_wg]update workload group finish, wg info="
<< wg->debug_string() << ", enable_cpu_hard_limit="
<< (_exec_env->workload_group_mgr()->enable_cpu_hard_limit() ? "true" : "false")
Expand Down
8 changes: 8 additions & 0 deletions be/src/cloud/cloud_backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
#include "common/status.h"
#include "io/cache/block_file_cache_downloader.h"
#include "io/cache/block_file_cache_factory.h"
#include "runtime/stream_load/stream_load_context.h"
#include "runtime/stream_load/stream_load_recorder.h"
#include "util/brpc_client_cache.h" // BrpcClientCache
#include "util/thrift_server.h"

Expand Down Expand Up @@ -186,4 +188,10 @@ void CloudBackendService::check_warm_up_cache_async(TCheckWarmUpCacheAsyncRespon
response.status = t_status;
}

void CloudBackendService::get_stream_load_record(TStreamLoadRecordResult& result,
int64_t last_stream_record_time) {
BaseBackendService::get_stream_load_record(result, last_stream_record_time,
_engine.get_stream_load_recorder());
}

} // namespace doris
3 changes: 3 additions & 0 deletions be/src/cloud/cloud_backend_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ class CloudBackendService final : public BaseBackendService {
void check_warm_up_cache_async(TCheckWarmUpCacheAsyncResponse& response,
const TCheckWarmUpCacheAsyncRequest& request) override;

void get_stream_load_record(TStreamLoadRecordResult& result,
int64_t last_stream_record_time) override;

private:
CloudStorageEngine& _engine;
};
Expand Down
4 changes: 4 additions & 0 deletions be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ Status CloudStorageEngine::open() {

_tablet_hotspot = std::make_unique<TabletHotspot>();

RETURN_NOT_OK_STATUS_WITH_WARN(
init_stream_load_recorder(ExecEnv::GetInstance()->store_paths()[0].path),
"init StreamLoadRecorder failed");

return ThreadPoolBuilder("SyncLoadForTabletsThreadPool")
.set_max_threads(config::sync_load_for_tablets_thread)
.set_min_threads(config::sync_load_for_tablets_thread)
Expand Down
12 changes: 6 additions & 6 deletions be/src/cloud/cloud_stream_load_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ Status CloudStreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) {
Status st = Status::InternalError<false>("impossible branch reached, " + op_info);

if (ctx->txn_operation.compare("commit") == 0) {
if (topt == TxnOpParamType::WITH_TXN_ID) {
if (!config::enable_stream_load_commit_txn_on_be) {
VLOG_DEBUG << "2pc commit stream load txn with FE support: " << op_info;
st = StreamLoadExecutor::operate_txn_2pc(ctx);
} else if (topt == TxnOpParamType::WITH_TXN_ID) {
VLOG_DEBUG << "2pc commit stream load txn directly: " << op_info;
st = _exec_env->storage_engine().to_cloud().meta_mgr().commit_txn(*ctx, true);
} else if (topt == TxnOpParamType::WITH_LABEL) {
Expand Down Expand Up @@ -93,12 +96,9 @@ Status CloudStreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) {
}

Status CloudStreamLoadExecutor::commit_txn(StreamLoadContext* ctx) {
if (ctx->load_type == TLoadType::ROUTINE_LOAD) {
return StreamLoadExecutor::commit_txn(ctx);
}

// forward to fe to excute commit transaction for MoW table
if (ctx->is_mow_table()) {
if (ctx->is_mow_table() || !config::enable_stream_load_commit_txn_on_be ||
ctx->load_type == TLoadType::ROUTINE_LOAD) {
Status st;
int retry_times = 0;
while (retry_times < config::mow_stream_load_commit_retry_times) {
Expand Down
19 changes: 19 additions & 0 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@

#include <atomic>
#include <memory>
#include <shared_mutex>
#include <unordered_map>
#include <vector>

#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_storage_engine.h"
Expand All @@ -42,8 +44,10 @@
#include "olap/rowset/rowset_writer.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "olap/storage_policy.h"
#include "olap/tablet_schema.h"
#include "olap/txn_manager.h"
#include "util/debug_points.h"
#include "vec/common/schema_util.h"

namespace doris {
using namespace ErrorCode;
Expand Down Expand Up @@ -131,6 +135,19 @@ Status CloudTablet::sync_rowsets(int64_t query_version, bool warmup_delta_data)
return st;
}

TabletSchemaSPtr CloudTablet::merged_tablet_schema() const {
std::shared_lock rdlock(_meta_lock);
TabletSchemaSPtr target_schema;
std::vector<TabletSchemaSPtr> schemas;
for (const auto& [_, rowset] : _rs_version_map) {
schemas.push_back(rowset->tablet_schema());
}
// get the max version schema and merge all schema
static_cast<void>(
vectorized::schema_util::get_least_common_schema(schemas, nullptr, target_schema));
return target_schema;
}

// Sync tablet meta and all rowset meta if not running.
// This could happen when BE didn't finish schema change job and another BE committed this schema change job.
// It should be a quite rare situation.
Expand Down Expand Up @@ -227,6 +244,7 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_
{
.expiration_time = expiration_time,
},
.download_done {},
});
}
#endif
Expand Down Expand Up @@ -463,6 +481,7 @@ int64_t CloudTablet::get_cloud_base_compaction_score() const {
if (_tablet_meta->compaction_policy() == CUMULATIVE_TIME_SERIES_POLICY) {
bool has_delete = false;
int64_t point = cumulative_layer_point();
std::shared_lock<std::shared_mutex> rlock(_meta_lock);
for (const auto& rs_meta : _tablet_meta->all_rs_metas()) {
if (rs_meta->start_version() >= point) {
continue;
Expand Down
15 changes: 3 additions & 12 deletions be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,18 +147,6 @@ class CloudTablet final : public BaseTablet {

std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_base_compaction();

void traverse_rowsets(std::function<void(const RowsetSharedPtr&)> visitor,
bool include_stale = false) {
std::shared_lock rlock(_meta_lock);
for (auto& [v, rs] : _rs_version_map) {
visitor(rs);
}
if (!include_stale) return;
for (auto& [v, rs] : _stale_rs_version_map) {
visitor(rs);
}
}

inline Version max_version() const {
std::shared_lock rdlock(_meta_lock);
return _tablet_meta->max_version();
Expand Down Expand Up @@ -206,6 +194,9 @@ class CloudTablet final : public BaseTablet {
int64_t last_cumu_compaction_success_time_ms = 0;
int64_t last_cumu_no_suitable_version_ms = 0;

// Return merged extended schema
TabletSchemaSPtr merged_tablet_schema() const override;

private:
// FIXME(plat1ko): No need to record base size if rowsets are ordered by version
void update_base_size(const Rowset& rs);
Expand Down
Loading

0 comments on commit 3fe4364

Please sign in to comment.