Skip to content

Commit

Permalink
Merge branch 'master' into enable-strong-consistency
Browse files Browse the repository at this point in the history
  • Loading branch information
TangSiyang2001 authored Jun 30, 2023
2 parents 9907dd4 + 96aa0e5 commit 6c422e3
Show file tree
Hide file tree
Showing 521 changed files with 17,239 additions and 7,305 deletions.
4 changes: 1 addition & 3 deletions .github/workflows/pr-approve-status.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ jobs:
approves=()
reviewers_unique=()
for ((i=${#reviewers[@]}-1;i>=0;i--)); do
# shellcheck disable=SC2076
# shellcheck disable=SC2199
if [[ ! "${reviewers_unique[@]}" =~ "${reviewers[$i]}" ]]; then
if ! echo "${reviewers_unique[@]}" | grep -q -w "${reviewers[$i]}" && [ "${statuses[$i]}" != "COMMENTED" ]; then
reviewers_unique+=( "${reviewers[$i]}" )
if [ "${statuses[$i]}" == "APPROVED" ]; then
approves+=( "${reviewers[$i]}" )
Expand Down
6 changes: 4 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ core.*
.DS_Store
.classpath
nohup.out
custom_env.sh
custom_env_mac.sh
/custom_env.sh
/custom_env_mac.sh
derby.log
dependency-reduced-pom.xml
yarn.lock
Expand All @@ -33,6 +33,7 @@ package-lock.json
.cache
.settings/
**/.idea/
!.idea/vcs.xml
**/.vscode/
**/.fleet/

Expand All @@ -42,6 +43,7 @@ docs/.temp

# output, thirdparty, extension
output/
output.bak/
rpc_data/
metastore_db/
thirdparty/src*
Expand Down
32 changes: 32 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 3 additions & 19 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -506,25 +506,6 @@ DEFINE_Int32(min_buffer_size, "1024"); // 1024, The minimum read buffer size (in
// With 1024B through 8MB buffers, this is up to ~2GB of buffers.
DEFINE_Int32(max_free_io_buffers, "128");

// Whether to disable the memory cache pool,
// including MemPool, ChunkAllocator, DiskIO free buffer.
DEFINE_Bool(disable_mem_pools, "false");

// The reserved bytes limit of Chunk Allocator, usually set as a percentage of mem_limit.
// defaults to bytes if no unit is given, the number of bytes must be a multiple of 2.
// must larger than 0. and if larger than physical memory size, it will be set to physical memory size.
// increase this variable can improve performance,
// but will acquire more free memory which can not be used by other modules.
DEFINE_mString(chunk_reserved_bytes_limit, "0");
// 1024, The minimum chunk allocator size (in bytes)
DEFINE_Int32(min_chunk_reserved_bytes, "1024");
// Disable Chunk Allocator in Vectorized Allocator, this will reduce memory cache.
// For high concurrent queries, using Chunk Allocator with vectorized Allocator can reduce the impact
// of gperftools tcmalloc central lock.
// Jemalloc or google tcmalloc have core cache, Chunk Allocator may no longer be needed after replacing
// gperftools tcmalloc.
DEFINE_mBool(disable_chunk_allocator_in_vec, "true");

// The probing algorithm of partitioned hash table.
// Enable quadratic probing hash table
DEFINE_Bool(enable_quadratic_probing, "false");
Expand Down Expand Up @@ -1043,6 +1024,9 @@ DEFINE_Bool(enable_set_in_bitmap_value, "false");
DEFINE_Int64(max_hdfs_file_handle_cache_num, "20000");
DEFINE_Int64(max_external_file_meta_cache_num, "20000");

// max_write_buffer_number for rocksdb
DEFINE_Int32(rocksdb_max_write_buffer_number, "5");

#ifdef BE_TEST
// test s3
DEFINE_String(test_s3_resource, "resource");
Expand Down
22 changes: 3 additions & 19 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -543,25 +543,6 @@ DECLARE_Int32(min_buffer_size); // 1024, The minimum read buffer size (in bytes)
// With 1024B through 8MB buffers, this is up to ~2GB of buffers.
DECLARE_Int32(max_free_io_buffers);

// Whether to disable the memory cache pool,
// including MemPool, ChunkAllocator, DiskIO free buffer.
DECLARE_Bool(disable_mem_pools);

// The reserved bytes limit of Chunk Allocator, usually set as a percentage of mem_limit.
// defaults to bytes if no unit is given, the number of bytes must be a multiple of 2.
// must larger than 0. and if larger than physical memory size, it will be set to physical memory size.
// increase this variable can improve performance,
// but will acquire more free memory which can not be used by other modules.
DECLARE_mString(chunk_reserved_bytes_limit);
// 1024, The minimum chunk allocator size (in bytes)
DECLARE_Int32(min_chunk_reserved_bytes);
// Disable Chunk Allocator in Vectorized Allocator, this will reduce memory cache.
// For high concurrent queries, using Chunk Allocator with vectorized Allocator can reduce the impact
// of gperftools tcmalloc central lock.
// Jemalloc or google tcmalloc have core cache, Chunk Allocator may no longer be needed after replacing
// gperftools tcmalloc.
DECLARE_mBool(disable_chunk_allocator_in_vec);

// The probing algorithm of partitioned hash table.
// Enable quadratic probing hash table
DECLARE_Bool(enable_quadratic_probing);
Expand Down Expand Up @@ -1059,6 +1040,9 @@ DECLARE_Int64(max_hdfs_file_handle_cache_num);
// max number of meta info of external files, such as parquet footer
DECLARE_Int64(max_external_file_meta_cache_num);

// max_write_buffer_number for rocksdb
DECLARE_Int32(rocksdb_max_write_buffer_number);

#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
Expand Down
4 changes: 2 additions & 2 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ void Daemon::memory_gc_thread() {
// No longer full gc and minor gc during sleep.
memory_full_gc_sleep_time_ms = config::memory_gc_sleep_time_ms;
memory_minor_gc_sleep_time_ms = config::memory_gc_sleep_time_ms;
doris::MemTrackerLimiter::print_log_process_usage("process full gc", false);
doris::MemTrackerLimiter::print_log_process_usage("Start Full GC", false);
if (doris::MemInfo::process_full_gc()) {
// If there is not enough memory to be gc, the process memory usage will not be printed in the next continuous gc.
doris::MemTrackerLimiter::enable_print_log_process_usage();
Expand All @@ -255,7 +255,7 @@ void Daemon::memory_gc_thread() {
proc_mem_no_allocator_cache >= doris::MemInfo::soft_mem_limit())) {
// No minor gc during sleep, but full gc is possible.
memory_minor_gc_sleep_time_ms = config::memory_gc_sleep_time_ms;
doris::MemTrackerLimiter::print_log_process_usage("process minor gc", false);
doris::MemTrackerLimiter::print_log_process_usage("Start Minor GC", false);
if (doris::MemInfo::process_minor_gc()) {
doris::MemTrackerLimiter::enable_print_log_process_usage();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/http/action/pprof_actions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
namespace doris {

// pprof default sample time in seconds.
static const std::string SECOND_KEY = "seconds";
[[maybe_unused]] static const std::string SECOND_KEY = "seconds";
static const int kPprofDefaultSampleSecs = 30;

// Protect, only one thread can work
Expand Down
117 changes: 110 additions & 7 deletions be/src/io/fs/benchmark/base_benchmark.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
#include <vector>

#include "common/status.h"
#include "io/fs/file_reader.h"
#include "io/fs/file_writer.h"
#include "util/slice.h"

namespace doris::io {

Expand All @@ -44,24 +47,22 @@ void bm_log(const std::string& fmt, Args&&... args) {
class BaseBenchmark {
public:
BaseBenchmark(const std::string& name, int threads, int iterations, size_t file_size,
int repetitions, const std::map<std::string, std::string>& conf_map)
const std::map<std::string, std::string>& conf_map)
: _name(name),
_threads(threads),
_iterations(iterations),
_file_size(file_size),
_repetitions(repetitions),
_conf_map(conf_map) {}
virtual ~BaseBenchmark() = default;

virtual Status init() { return Status::OK(); }
virtual Status run(benchmark::State& state) { return Status::OK(); }

void set_repetition(int rep) { _repetitions = rep; }

void register_bm() {
auto bm = benchmark::RegisterBenchmark(_name.c_str(), [&](benchmark::State& state) {
Status st;
if (state.thread_index() == 0) {
st = this->init();
}
Status st = this->init();
if (st != Status::OK()) {
bm_log("Benchmark {} init error: {}", _name, st.to_string());
return;
Expand Down Expand Up @@ -92,12 +93,114 @@ class BaseBenchmark {
});
}

virtual std::string get_file_path(benchmark::State& state) {
std::string base_dir = _conf_map["base_dir"];
std::string file_path;
if (base_dir.ends_with("/")) {
file_path = fmt::format("{}test_{}", base_dir, state.thread_index());
} else {
file_path = fmt::format("{}/test_{}", base_dir, state.thread_index());
}
bm_log("file_path: {}", file_path);
return file_path;
}

Status read(benchmark::State& state, FileReaderSPtr reader) {
bm_log("begin to read {}", _name);
size_t buffer_size =
_conf_map.contains("buffer_size") ? std::stol(_conf_map["buffer_size"]) : 1000000L;
std::vector<char> buffer;
buffer.resize(buffer_size);
doris::Slice data = {buffer.data(), buffer.size()};
size_t offset = 0;
size_t bytes_read = 0;

size_t read_size = reader->size();
if (_file_size > 0) {
read_size = std::min(read_size, _file_size);
}
long remaining_size = read_size;

Status status;
auto start = std::chrono::high_resolution_clock::now();
while (remaining_size > 0) {
bytes_read = 0;
size_t size = std::min(buffer_size, (size_t)remaining_size);
data.size = size;
status = reader->read_at(offset, data, &bytes_read);
if (status != Status::OK() || bytes_read < 0) {
bm_log("reader read_at error: {}", status.to_string());
break;
}
if (bytes_read == 0) { // EOF
break;
}
offset += bytes_read;
remaining_size -= bytes_read;
}
auto end = std::chrono::high_resolution_clock::now();
auto elapsed_seconds =
std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
state.SetIterationTime(elapsed_seconds.count());
state.counters["ReadRate(B/S)"] =
benchmark::Counter(read_size, benchmark::Counter::kIsRate);
state.counters["ReadTotal(B)"] = read_size;
state.counters["ReadTime(S)"] = elapsed_seconds.count();

if (status.ok() && reader != nullptr) {
status = reader->close();
}
bm_log("finish to read {}, size {}, seconds: {}, status: {}", _name, read_size,
elapsed_seconds.count(), status);
return status;
}

Status write(benchmark::State& state, FileWriter* writer) {
bm_log("begin to write {}, size: {}", _name, _file_size);
size_t write_size = _file_size;
size_t buffer_size =
_conf_map.contains("buffer_size") ? std::stol(_conf_map["buffer_size"]) : 1000000L;
long remaining_size = write_size;
std::vector<char> buffer;
buffer.resize(buffer_size);
doris::Slice data = {buffer.data(), buffer.size()};

Status status;
auto start = std::chrono::high_resolution_clock::now();
while (remaining_size > 0) {
size_t size = std::min(buffer_size, (size_t)remaining_size);
data.size = size;
status = writer->append(data);
if (status != Status::OK()) {
bm_log("writer append error: {}", status.to_string());
break;
}
remaining_size -= size;
}
if (status.ok() && writer != nullptr) {
status = writer->close();
}

auto end = std::chrono::high_resolution_clock::now();
auto elapsed_seconds =
std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
state.SetIterationTime(elapsed_seconds.count());
state.counters["WriteRate(B/S)"] =
benchmark::Counter(write_size, benchmark::Counter::kIsRate);
state.counters["WriteTotal(B)"] = write_size;
state.counters["WriteTime(S)"] = elapsed_seconds.count();

bm_log("finish to write {}, size: {}, seconds: {}, status: {}", _name, write_size,
elapsed_seconds.count(), status);
return status;
}

protected:
std::string _name;
int _threads;
int _iterations;
size_t _file_size;
int _repetitions = 1;
int _repetitions = 3;
std::map<std::string, std::string> _conf_map;
};

Expand Down
14 changes: 12 additions & 2 deletions be/src/io/fs/benchmark/benchmark_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,18 @@ Status BenchmarkFactory::getBm(const std::string fs_type, const std::string op_t
const std::map<std::string, std::string>& conf_map,
BaseBenchmark** bm) {
if (fs_type == "s3") {
if (op_type == "read") {
*bm = new S3ReadBenchmark(threads, iterations, file_size, conf_map);
if (op_type == "create_write") {
*bm = new S3CreateWriteBenchmark(threads, iterations, file_size, conf_map);
} else if (op_type == "open_read") {
*bm = new S3OpenReadBenchmark(threads, iterations, file_size, conf_map);
} else if (op_type == "single_read") {
*bm = new S3SingleReadBenchmark(threads, iterations, file_size, conf_map);
} else if (op_type == "rename") {
*bm = new S3RenameBenchmark(threads, iterations, file_size, conf_map);
} else if (op_type == "exists") {
*bm = new S3ExistsBenchmark(threads, iterations, file_size, conf_map);
} else if (op_type == "list") {
*bm = new S3ListBenchmark(threads, iterations, file_size, conf_map);
} else {
return Status::Error<ErrorCode::INVALID_ARGUMENT>(
"unknown params: fs_type: {}, op_type: {}, iterations: {}", fs_type, op_type,
Expand Down
11 changes: 11 additions & 0 deletions be/src/io/fs/benchmark/fs_benchmark_tool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include <fstream>

#include "io/fs/benchmark/benchmark_factory.hpp"
#include "io/fs/s3_file_write_bufferpool.h"
#include "util/threadpool.h"

DEFINE_string(fs_type, "hdfs", "Supported File System: s3, hdfs");
DEFINE_string(operation, "create_write",
Expand Down Expand Up @@ -107,6 +109,15 @@ int main(int argc, char** argv) {
return 1;
}

// init s3 write buffer pool
std::unique_ptr<doris::ThreadPool> buffered_reader_prefetch_thread_pool;
doris::ThreadPoolBuilder("BufferedReaderPrefetchThreadPool")
.set_min_threads(16)
.set_max_threads(64)
.build(&buffered_reader_prefetch_thread_pool);
doris::io::S3FileBufferPool* s3_buffer_pool = doris::io::S3FileBufferPool::GetInstance();
s3_buffer_pool->init(524288000, 5242880, buffered_reader_prefetch_thread_pool.get());

try {
doris::io::MultiBenchmark multi_bm(FLAGS_fs_type, FLAGS_operation, std::stoi(FLAGS_threads),
std::stoi(FLAGS_iterations), std::stol(FLAGS_file_size),
Expand Down
Loading

0 comments on commit 6c422e3

Please sign in to comment.