Skip to content

Commit

Permalink
Merge branch 'master' into 20240920_fix_memory
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz authored Sep 29, 2024
2 parents 3bf659f + cf5a159 commit b1d85f0
Show file tree
Hide file tree
Showing 207 changed files with 3,186 additions and 625 deletions.
3 changes: 2 additions & 1 deletion be/src/cloud/cloud_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "cloud/cloud_rowset_writer.h"

#include "common/status.h"
#include "io/cache/block_file_cache_factory.h"
#include "io/fs/file_system.h"
#include "olap/rowset/rowset_factory.h"
Expand All @@ -34,7 +35,7 @@ Status CloudRowsetWriter::init(const RowsetWriterContext& rowset_writer_context)
if (_context.is_local_rowset()) {
// In cloud mode, this branch implies it is an intermediate rowset for external merge sort,
// we use `global_local_filesystem` to write data to `tmp_file_dir`(see `local_segment_path`).
_context.tablet_path = io::FileCacheFactory::instance()->get_cache_path();
_context.tablet_path = io::FileCacheFactory::instance()->pick_one_cache_path();
} else {
_rowset_meta->set_remote_storage_resource(*_context.storage_resource);
}
Expand Down
11 changes: 11 additions & 0 deletions be/src/cloud/injection_point_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,17 @@ void register_suites() {
arg0->second = true;
});
});
// curl be_ip:http_port/api/injection_point/apply_suite?name=Segment::parse_footer:magic_number_corruption'
suite_map.emplace("Segment::parse_footer:magic_number_corruption", [] {
auto* sp = SyncPoint::get_instance();
sp->set_call_back("Segment::parse_footer:magic_number_corruption_inj", [](auto&& args) {
if (auto p = std::any_cast<uint8_t*>(args[0])) {
memset(p, 0, 12);
} else {
std::cerr << "Failed to cast std::any to uint8_t*" << std::endl;
}
});
});
}

void set_sleep(const std::string& point, HttpRequest* req) {
Expand Down
73 changes: 73 additions & 0 deletions be/src/common/cast_set.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <limits>
#include <type_traits>

#include "common/exception.h"
#include "common/status.h"

namespace doris {

template <typename T, typename U>
void check_cast_value(U b) {
if constexpr (std::is_unsigned_v<U>) {
if (b > std::numeric_limits<T>::max()) {
throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
"value {} cast to type {} out of range [{},{}]", b,
typeid(T).name(), std::numeric_limits<T>::min(),
std::numeric_limits<T>::max());
}
} else if constexpr (std::is_unsigned_v<T>) {
if (b < 0 || b > std::numeric_limits<T>::max()) {
throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
"value {} cast to type {} out of range [{},{}]", b,
typeid(T).name(), std::numeric_limits<T>::min(),
std::numeric_limits<T>::max());
}
} else {
if (b < std::numeric_limits<T>::min() || b > std::numeric_limits<T>::max()) {
throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
"value {} cast to type {} out of range [{},{}]", b,
typeid(T).name(), std::numeric_limits<T>::min(),
std::numeric_limits<T>::max());
}
}
}

template <typename T, typename U, bool need_check_value = true>
requires std::is_integral_v<T> && std::is_integral_v<U>
void cast_set(T& a, U b) {
if constexpr (need_check_value) {
check_cast_value<T>(b);
}
a = static_cast<T>(b);
}

template <typename T, typename U, bool need_check_value = true>
requires std::is_integral_v<T> && std::is_integral_v<U>
T cast_set(U b) {
if constexpr (need_check_value) {
check_cast_value<T>(b);
}
return static_cast<T>(b);
}

} // namespace doris
24 changes: 24 additions & 0 deletions be/src/common/compile_check_begin.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic error "-Wshorten-64-to-32"
#endif
//#include "common/compile_check_begin.h"
23 changes: 23 additions & 0 deletions be/src/common/compile_check_end.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#ifdef __clang__
#pragma clang diagnostic pop
#endif
// #include "common/compile_check_end.h"
3 changes: 3 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,9 @@ DEFINE_mInt64(file_cache_ttl_valid_check_interval_second, "0"); // zero for not
DEFINE_Bool(enable_ttl_cache_evict_using_lru, "true");
// rename ttl filename to new format during read, with some performance cost
DEFINE_mBool(translate_to_new_ttl_format_during_read, "false");
DEFINE_mBool(enbale_dump_error_file, "true");
// limit the max size of error log on disk
DEFINE_mInt64(file_cache_error_log_limit_bytes, "209715200"); // 200MB

DEFINE_mInt32(index_cache_entry_stay_time_after_lookup_s, "1800");
DEFINE_mInt32(inverted_index_cache_stale_sweep_time_sec, "600");
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1060,6 +1060,9 @@ DECLARE_mInt64(file_cache_ttl_valid_check_interval_second);
DECLARE_Bool(enable_ttl_cache_evict_using_lru);
// rename ttl filename to new format during read, with some performance cost
DECLARE_Bool(translate_to_new_ttl_format_during_read);
DECLARE_mBool(enbale_dump_error_file);
// limit the max size of error log on disk
DECLARE_mInt64(file_cache_error_log_limit_bytes);

// inverted index searcher cache
// cache entry stay time after lookup
Expand Down
2 changes: 1 addition & 1 deletion be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1227,7 +1227,7 @@ void IRuntimeFilter::update_state() {
auto execution_timeout = _state->execution_timeout * 1000;
auto runtime_filter_wait_time_ms = _state->runtime_filter_wait_time_ms;
// bitmap filter is precise filter and only filter once, so it must be applied.
int64_t wait_times_ms = _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER
int64_t wait_times_ms = _runtime_filter_type == RuntimeFilterType::BITMAP_FILTER
? execution_timeout
: runtime_filter_wait_time_ms;
auto expected = _rf_state_atomic.load(std::memory_order_acquire);
Expand Down
13 changes: 13 additions & 0 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,19 @@ void StreamLoadAction::handle(HttpRequest* req) {
_save_stream_load_record(ctx, str);
}
#endif

LOG(INFO) << "finished to execute stream load. label=" << ctx->label
<< ", txn_id=" << ctx->txn_id << ", query_id=" << ctx->id
<< ", load_cost_ms=" << ctx->load_cost_millis << ", receive_data_cost_ms="
<< (ctx->receive_and_read_data_cost_nanos - ctx->read_data_cost_nanos) / 1000000
<< ", read_data_cost_ms=" << ctx->read_data_cost_nanos / 1000000
<< ", write_data_cost_ms=" << ctx->write_data_cost_nanos / 1000000
<< ", commit_and_publish_txn_cost_ms="
<< ctx->commit_and_publish_txn_cost_nanos / 1000000
<< ", number_total_rows=" << ctx->number_total_rows
<< ", number_loaded_rows=" << ctx->number_loaded_rows
<< ", receive_bytes=" << ctx->receive_bytes << ", loaded_bytes=" << ctx->loaded_bytes;

// update statistics
streaming_load_requests_total->increment(1);
streaming_load_duration_ms->increment(ctx->load_cost_millis);
Expand Down
5 changes: 4 additions & 1 deletion be/src/io/cache/block_file_cache_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
#pragma once

#include <memory>
#include <optional>
#include <string>
#include <string_view>
#include <vector>

#include "common/status.h"
Expand All @@ -46,7 +48,8 @@ class FileCacheFactory {

size_t try_release(const std::string& base_path);

const std::string& get_cache_path() {
std::string_view pick_one_cache_path() {
DCHECK(!_caches.empty());
size_t cur_index = _next_index.fetch_add(1);
return _caches[cur_index % _caches.size()]->get_base_path();
}
Expand Down
86 changes: 85 additions & 1 deletion be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
#include <memory>
#include <utility>

#include "cloud/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "cpp/sync_point.h"
#include "io/cache/block_file_cache.h"
#include "io/cache/block_file_cache_factory.h"
#include "io/cache/cached_remote_file_reader.h"
#include "io/fs/file_reader.h"
#include "io/fs/file_system.h"
#include "io/io_common.h"
Expand Down Expand Up @@ -294,6 +296,67 @@ Status Segment::new_iterator(SchemaSPtr schema, const StorageReadOptions& read_o
return iter->get()->init(read_options);
}

Status Segment::_write_error_file(size_t file_size, size_t offset, size_t bytes_read, char* data,
io::IOContext& io_ctx) {
if (!config::enbale_dump_error_file || !doris::config::is_cloud_mode()) {
return Status::OK();
}

std::string file_name = _rowset_id.to_string() + "_" + std::to_string(_segment_id) + ".dat";
std::string dir_path = io::FileCacheFactory::instance()->get_base_paths()[0] + "/error_file/";
Status create_st = io::global_local_filesystem()->create_directory(dir_path, true);
if (!create_st.ok() && !create_st.is<ErrorCode::ALREADY_EXIST>()) {
LOG(WARNING) << "failed to create error file dir: " << create_st.to_string();
return create_st;
}
size_t dir_size = 0;
RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(dir_path, &dir_size));
if (dir_size > config::file_cache_error_log_limit_bytes) {
LOG(WARNING) << "error file dir size is too large: " << dir_size;
return Status::OK();
}

std::string error_part;
error_part.resize(bytes_read);
std::string part_path = dir_path + file_name + ".part_offset_" + std::to_string(offset);
LOG(WARNING) << "writer error part to " << part_path;
bool is_part_exist = false;
RETURN_IF_ERROR(io::global_local_filesystem()->exists(part_path, &is_part_exist));
if (is_part_exist) {
LOG(WARNING) << "error part already exists: " << part_path;
} else {
std::unique_ptr<io::FileWriter> part_writer;
RETURN_IF_ERROR(io::global_local_filesystem()->create_file(part_path, &part_writer));
RETURN_IF_ERROR(part_writer->append(Slice(data, bytes_read)));
RETURN_IF_ERROR(part_writer->close());
}

std::string error_file;
error_file.resize(file_size);
auto* cached_reader = dynamic_cast<io::CachedRemoteFileReader*>(_file_reader.get());
if (cached_reader == nullptr) {
return Status::InternalError("file reader is not CachedRemoteFileReader");
}
size_t error_file_bytes_read = 0;
RETURN_IF_ERROR(cached_reader->get_remote_reader()->read_at(
0, Slice(error_file.data(), file_size), &error_file_bytes_read, &io_ctx));
DCHECK(error_file_bytes_read == file_size);
//std::string file_path = dir_path + std::to_string(cur_time) + "_" + ss.str();
std::string file_path = dir_path + file_name;
LOG(WARNING) << "writer error file to " << file_path;
bool is_file_exist = false;
RETURN_IF_ERROR(io::global_local_filesystem()->exists(file_path, &is_file_exist));
if (is_file_exist) {
LOG(WARNING) << "error file already exists: " << part_path;
} else {
std::unique_ptr<io::FileWriter> writer;
RETURN_IF_ERROR(io::global_local_filesystem()->create_file(file_path, &writer));
RETURN_IF_ERROR(writer->append(Slice(error_file.data(), file_size)));
RETURN_IF_ERROR(writer->close());
}
return Status::OK(); // already exists
};

Status Segment::_parse_footer(SegmentFooterPB* footer) {
// Footer := SegmentFooterPB, FooterPBSize(4), FooterPBChecksum(4), MagicNumber(4)
auto file_size = _file_reader->size();
Expand All @@ -310,8 +373,14 @@ Status Segment::_parse_footer(SegmentFooterPB* footer) {
RETURN_IF_ERROR(
_file_reader->read_at(file_size - 12, Slice(fixed_buf, 12), &bytes_read, &io_ctx));
DCHECK_EQ(bytes_read, 12);

TEST_SYNC_POINT_CALLBACK("Segment::parse_footer:magic_number_corruption", fixed_buf);
TEST_INJECTION_POINT_CALLBACK("Segment::parse_footer:magic_number_corruption_inj", fixed_buf);
if (memcmp(fixed_buf + 8, k_segment_magic, k_segment_magic_length) != 0) {
Status st =
_write_error_file(file_size, file_size - 12, bytes_read, (char*)fixed_buf, io_ctx);
if (!st.ok()) {
LOG(WARNING) << "failed to write error file: " << st.to_string();
}
return Status::Corruption(
"Bad segment file {}: file_size: {}, magic number not match, cache_key: {}",
_file_reader->path().native(), file_size,
Expand All @@ -321,6 +390,11 @@ Status Segment::_parse_footer(SegmentFooterPB* footer) {
// read footer PB
uint32_t footer_length = decode_fixed32_le(fixed_buf);
if (file_size < 12 + footer_length) {
Status st =
_write_error_file(file_size, file_size - 12, bytes_read, (char*)fixed_buf, io_ctx);
if (!st.ok()) {
LOG(WARNING) << "failed to write error file: " << st.to_string();
}
return Status::Corruption("Bad segment file {}: file size {} < {}, cache_key: {}",
_file_reader->path().native(), file_size, 12 + footer_length,
file_cache_key_str(_file_reader->path().native()));
Expand All @@ -336,6 +410,11 @@ Status Segment::_parse_footer(SegmentFooterPB* footer) {
uint32_t expect_checksum = decode_fixed32_le(fixed_buf + 4);
uint32_t actual_checksum = crc32c::Value(footer_buf.data(), footer_buf.size());
if (actual_checksum != expect_checksum) {
Status st = _write_error_file(file_size, file_size - 12 - footer_length, bytes_read,
footer_buf.data(), io_ctx);
if (!st.ok()) {
LOG(WARNING) << "failed to write error file: " << st.to_string();
}
return Status::Corruption(
"Bad segment file {}: file_size = {}, footer checksum not match, actual={} "
"vs expect={}, cache_key: {}",
Expand All @@ -345,6 +424,11 @@ Status Segment::_parse_footer(SegmentFooterPB* footer) {

// deserialize footer PB
if (!footer->ParseFromString(footer_buf)) {
Status st = _write_error_file(file_size, file_size - 12 - footer_length, bytes_read,
footer_buf.data(), io_ctx);
if (!st.ok()) {
LOG(WARNING) << "failed to write error file: " << st.to_string();
}
return Status::Corruption(
"Bad segment file {}: file_size = {}, failed to parse SegmentFooterPB, cache_key: ",
_file_reader->path().native(), file_size,
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/segment_v2/segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ class Segment : public std::enable_shared_from_this<Segment> {
std::unique_ptr<ColumnIterator>* iter,
const SubcolumnColumnReaders::Node* root,
vectorized::DataTypePtr target_type_hint);
Status _write_error_file(size_t file_size, size_t offset, size_t bytes_read, char* data,
io::IOContext& io_ctx);

Status _load_index_impl();
Status _open_inverted_index();
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/wal/wal_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ Status WalTable::_relay_wal_one_by_one() {
auto msg = st.msg();
if (st.ok() || st.is<ErrorCode::PUBLISH_TIMEOUT>() || st.is<ErrorCode::NOT_FOUND>() ||
st.is<ErrorCode::DATA_QUALITY_ERROR>() ||
(msg.find("LabelAlreadyUsedException") != msg.npos &&
(msg.find("[COMMITTED]") != msg.npos || msg.find("[VISIBLE]") != msg.npos))) {
(msg.find("has already been used") != msg.npos &&
(msg.find("COMMITTED") != msg.npos || msg.find("VISIBLE") != msg.npos))) {
LOG(INFO) << "succeed to replay wal=" << wal_info->get_wal_path()
<< ", st=" << st.to_string();
// delete wal
Expand Down
Loading

0 comments on commit b1d85f0

Please sign in to comment.