Skip to content

Commit

Permalink
Merge branch 'master' into fix-0807
Browse files Browse the repository at this point in the history
  • Loading branch information
felixwluo authored Sep 5, 2024
2 parents 24876e9 + 8e513ff commit 5d544f8
Show file tree
Hide file tree
Showing 108 changed files with 4,420 additions and 1,310 deletions.
3 changes: 0 additions & 3 deletions .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ github:
strict: false
contexts:
- License Check
- Clang Formatter
- CheckStyle
- P0 Regression (Doris Regression)
- External Regression (Doris External Regression)
Expand Down Expand Up @@ -87,7 +86,6 @@ github:
strict: false
contexts:
- License Check
- Clang Formatter
- CheckStyle
- Build Broker
- ShellCheck
Expand All @@ -109,7 +107,6 @@ github:
strict: false
contexts:
- License Check
- Clang Formatter
- CheckStyle
- P0 Regression (Doris Regression)
- External Regression (Doris External Regression)
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/clang-format.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ jobs:
git checkout 6adbe14579e5b8e19eb3e31e5ff2479f3bd302c7
popd &>/dev/null
- name: Install Python dependencies
uses: actions/setup-python@v5
with:
python-version: '3.10' # Adjust if needed

- name: "Format it!"
if: ${{ steps.filter.outputs.changes == 'true' }}
uses: ./.github/actions/clang-format-lint-action
Expand Down
7 changes: 6 additions & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,12 @@ DEFINE_mInt32(max_fill_rate, "2");

DEFINE_mInt32(double_resize_threshold, "23");

DEFINE_Int64(max_sys_mem_available_low_water_mark_bytes, "6871947673");
// The maximum low water mark of the system `/proc/meminfo/MemAvailable`, Unit byte, default -1.
// if it is -1, then low water mark = min(MemTotal - MemLimit, MemTotal * 5%), which is 3.2G on a 64G machine.
// Turn up max. more memory buffers will be reserved for Memory GC.
// Turn down max. will use as much memory as possible.
// note that: `max_` prefix should be removed, but keep it for compatibility.
DEFINE_Int64(max_sys_mem_available_low_water_mark_bytes, "-1");

DEFINE_Int64(memtable_limiter_reserved_memory_bytes, "838860800");

Expand Down
8 changes: 4 additions & 4 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,11 @@ DECLARE_mInt32(max_fill_rate);

DECLARE_mInt32(double_resize_threshold);

// The maximum low water mark of the system `/proc/meminfo/MemAvailable`, Unit byte, default 6.4G,
// actual low water mark=min(6.4G, MemTotal * 5%), avoid wasting too much memory on machines
// with large memory larger than 128G.
// Turn up max. On machines with more than 128G memory, more memory buffers will be reserved for Full GC.
// The maximum low water mark of the system `/proc/meminfo/MemAvailable`, Unit byte, default -1.
// if it is -1, then low water mark = min(MemTotal - MemLimit, MemTotal * 5%), which is 3.2G on a 64G machine.
// Turn up max. more memory buffers will be reserved for Memory GC.
// Turn down max. will use as much memory as possible.
// note that: `max_` prefix should be removed, but keep it for compatibility.
DECLARE_Int64(max_sys_mem_available_low_water_mark_bytes);

// reserve a small amount of memory so we do not trigger MinorGC
Expand Down
10 changes: 4 additions & 6 deletions be/src/exec/decompressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ Status Lz4BlockDecompressor::decompress(uint8_t* input, size_t input_len, size_t
}

std::size_t decompressed_large_block_len = 0;
do {
while (remaining_decompressed_large_block_len > 0) {
// Check that input length should not be negative.
if (input_len < sizeof(uint32_t)) {
*more_input_bytes = sizeof(uint32_t) - input_len;
Expand Down Expand Up @@ -505,8 +505,7 @@ Status Lz4BlockDecompressor::decompress(uint8_t* input, size_t input_len, size_t
output_ptr += decompressed_small_block_len;
remaining_decompressed_large_block_len -= decompressed_small_block_len;
decompressed_large_block_len += decompressed_small_block_len;

} while (remaining_decompressed_large_block_len > 0);
};

if (*more_input_bytes != 0) {
// Need more input buffer
Expand Down Expand Up @@ -586,7 +585,7 @@ Status SnappyBlockDecompressor::decompress(uint8_t* input, size_t input_len,
}

std::size_t decompressed_large_block_len = 0;
do {
while (remaining_decompressed_large_block_len > 0) {
// Check that input length should not be negative.
if (input_len < sizeof(uint32_t)) {
*more_input_bytes = sizeof(uint32_t) - input_len;
Expand Down Expand Up @@ -630,8 +629,7 @@ Status SnappyBlockDecompressor::decompress(uint8_t* input, size_t input_len,
output_ptr += decompressed_small_block_len;
remaining_decompressed_large_block_len -= decompressed_small_block_len;
decompressed_large_block_len += decompressed_small_block_len;

} while (remaining_decompressed_large_block_len > 0);
};

if (*more_input_bytes != 0) {
// Need more input buffer
Expand Down
68 changes: 1 addition & 67 deletions be/src/olap/cumulative_compaction_time_series_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,11 @@ namespace doris {

uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet* tablet) {
uint32_t score = 0;
uint32_t level0_score = 0;
bool base_rowset_exist = false;
const int64_t point = tablet->cumulative_layer_point();

int64_t level0_total_size = 0;
RowsetMetaSharedPtr first_meta;
int64_t first_version = INT64_MAX;
std::list<RowsetMetaSharedPtr> checked_rs_metas;
// NOTE: tablet._meta_lock is hold
auto& rs_metas = tablet->tablet_meta()->all_rs_metas();
// check the base rowset and collect the rowsets of cumulative part
Expand All @@ -53,12 +50,6 @@ uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(
} else {
// collect the rowsets of cumulative part
score += rs_meta->get_compaction_score();
if (rs_meta->compaction_level() == 0) {
level0_total_size += rs_meta->total_disk_size();
level0_score += rs_meta->get_compaction_score();
} else {
checked_rs_metas.push_back(rs_meta);
}
}
}

Expand All @@ -73,64 +64,7 @@ uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(
return 0;
}

// Condition 1: the size of input files for compaction meets the requirement of parameter compaction_goal_size
int64_t compaction_goal_size_mbytes =
tablet->tablet_meta()->time_series_compaction_goal_size_mbytes();
if (level0_total_size >= compaction_goal_size_mbytes * 1024 * 1024) {
return score;
}

// Condition 2: the number of input files reaches the threshold specified by parameter compaction_file_count_threshold
if (level0_score >= tablet->tablet_meta()->time_series_compaction_file_count_threshold()) {
return score;
}

// Condition 3: level1 achieve compaction_goal_size
if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) {
checked_rs_metas.sort([](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) {
return a->version().first < b->version().first;
});
int32_t rs_meta_count = 0;
int64_t continuous_size = 0;
for (const auto& rs_meta : checked_rs_metas) {
rs_meta_count++;
continuous_size += rs_meta->total_disk_size();
if (rs_meta_count >= 2) {
if (continuous_size >= compaction_goal_size_mbytes * 1024 * 1024) {
return score;
}
}
}
}

int64_t now = UnixMillis();
int64_t last_cumu = tablet->last_cumu_compaction_success_time();
if (last_cumu != 0) {
int64_t cumu_interval = now - last_cumu;

// Condition 4: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_second
if (cumu_interval >
(tablet->tablet_meta()->time_series_compaction_time_threshold_seconds() * 1000)) {
return score;
}
} else if (score > 0) {
// If the compaction process has not been successfully executed,
// the condition for triggering compaction based on the last successful compaction time (condition 3) will never be met
tablet->set_last_cumu_compaction_success_time(now);
}

// Condition 5: If there is a continuous set of empty rowsets, prioritize merging.
std::vector<RowsetSharedPtr> input_rowsets;
std::vector<RowsetSharedPtr> candidate_rowsets =
tablet->pick_candidate_rowsets_to_cumulative_compaction();
tablet->calc_consecutive_empty_rowsets(
&input_rowsets, candidate_rowsets,
tablet->tablet_meta()->time_series_compaction_empty_rowsets_threshold());
if (!input_rowsets.empty()) {
return score;
}

return 0;
return score;
}

void TimeSeriesCumulativeCompactionPolicy::calculate_cumulative_point(
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,8 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
segment_pos);

} else {
if (!_opts.rowset_ctx->partial_update_info->can_insert_new_rows_in_partial_update) {
if (!_opts.rowset_ctx->partial_update_info->can_insert_new_rows_in_partial_update &&
!have_delete_sign) {
std::string error_column;
for (auto cid : _opts.rowset_ctx->partial_update_info->missing_cids) {
const TabletColumn& col = _tablet_schema->column(cid);
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,8 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
DeleteBitmap::TEMP_VERSION_COMMON},
segment_pos);
} else {
if (!_opts.rowset_ctx->partial_update_info->can_insert_new_rows_in_partial_update) {
if (!_opts.rowset_ctx->partial_update_info->can_insert_new_rows_in_partial_update &&
!have_delete_sign) {
std::string error_column;
for (auto cid : _opts.rowset_ctx->partial_update_info->missing_cids) {
const TabletColumn& col = _tablet_schema->column(cid);
Expand Down
4 changes: 4 additions & 0 deletions be/src/pipeline/exec/result_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ struct ResultFileOptions {
std::string file_suffix;
//Bring BOM when exporting to CSV format
bool with_bom = false;
int64_t orc_writer_version = 0;

ResultFileOptions(const TResultFileSinkOptions& t_opt) {
file_path = t_opt.file_path;
Expand Down Expand Up @@ -108,6 +109,9 @@ struct ResultFileOptions {
if (t_opt.__isset.orc_compression_type) {
orc_compression_type = t_opt.orc_compression_type;
}
if (t_opt.__isset.orc_writer_version) {
orc_writer_version = t_opt.orc_writer_version;
}
}
};

Expand Down
5 changes: 0 additions & 5 deletions be/src/runtime/buffer_control_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size, int
: _fragment_id(id),
_is_close(false),
_is_cancelled(false),
_buffer_rows(0),
_buffer_limit(buffer_size),
_packet_num(0),
_batch_size(batch_size) {
Expand Down Expand Up @@ -135,7 +134,6 @@ Status BufferControlBlock::add_batch(RuntimeState* state,
_instance_rows_in_queue.emplace_back();
_fe_result_batch_queue.push_back(std::move(result));
}
_buffer_rows += num_rows;
_instance_rows[state->fragment_instance_id()] += num_rows;
_instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows;
} else {
Expand All @@ -162,7 +160,6 @@ Status BufferControlBlock::add_arrow_batch(RuntimeState* state,
// TODO: merge RocordBatch, ToStructArray -> Make again

_arrow_flight_batch_queue.push_back(std::move(result));
_buffer_rows += num_rows;
_instance_rows_in_queue.emplace_back();
_instance_rows[state->fragment_instance_id()] += num_rows;
_instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows;
Expand All @@ -187,7 +184,6 @@ void BufferControlBlock::get_batch(GetResultBatchCtx* ctx) {
// get result
std::unique_ptr<TFetchDataResult> result = std::move(_fe_result_batch_queue.front());
_fe_result_batch_queue.pop_front();
_buffer_rows -= result->result_batch.rows.size();
for (auto it : _instance_rows_in_queue.front()) {
_instance_rows[it.first] -= it.second;
}
Expand Down Expand Up @@ -228,7 +224,6 @@ Status BufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>*
if (!_arrow_flight_batch_queue.empty()) {
*result = std::move(_arrow_flight_batch_queue.front());
_arrow_flight_batch_queue.pop_front();
_buffer_rows -= (*result)->num_rows();
for (auto it : _instance_rows_in_queue.front()) {
_instance_rows[it.first] -= it.second;
}
Expand Down
1 change: 0 additions & 1 deletion be/src/runtime/buffer_control_block.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ class BufferControlBlock {
bool _is_close;
std::atomic_bool _is_cancelled;
Status _status;
std::atomic_int _buffer_rows;
const int _buffer_limit;
int64_t _packet_num;

Expand Down
18 changes: 18 additions & 0 deletions be/src/service/point_query_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@
#include "runtime/thread_context.h"
#include "util/key_util.h"
#include "util/runtime_profile.h"
#include "util/simd/bits.h"
#include "util/thrift_util.h"
#include "vec/columns/columns_number.h"
#include "vec/data_types/serde/data_type_serde.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
Expand Down Expand Up @@ -143,6 +145,9 @@ Status Reusable::init(const TDescriptorTable& t_desc_tbl, const std::vector<TExp
extract_slot_ref(expr->root(), tuple_desc(), output_slot_descs);
}

// get the delete sign idx in block
_delete_sign_idx = _col_uid_to_idx[schema.columns()[schema.delete_sign_idx()]->unique_id()];

if (schema.have_column(BeConsts::ROW_STORE_COL)) {
const auto& column = *DORIS_TRY(schema.column(BeConsts::ROW_STORE_COL));
_row_store_column_ids = column.unique_id();
Expand Down Expand Up @@ -483,6 +488,19 @@ Status PointQueryExecutor::_lookup_row_data() {
}
}
}
// filter rows by delete sign
if (_row_hits > 0 && _reusable->delete_sign_idx() != -1) {
vectorized::ColumnPtr delete_filter_columns =
_result_block->get_columns()[_reusable->delete_sign_idx()];
const auto& filter =
assert_cast<const vectorized::ColumnInt8*>(delete_filter_columns.get())->get_data();
size_t count = filter.size() - simd::count_zero_num((int8_t*)filter.data(), filter.size());
if (count == filter.size()) {
_result_block->clear();
} else if (count > 0) {
return Status::NotSupported("Not implemented since only single row at present");
}
}
return Status::OK();
}

Expand Down
5 changes: 5 additions & 0 deletions be/src/service/point_query_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ class Reusable {

RuntimeState* runtime_state() { return _runtime_state.get(); }

// delete sign idx in block
int32_t delete_sign_idx() const { return _delete_sign_idx; }

private:
// caching TupleDescriptor, output_expr, etc...
std::unique_ptr<RuntimeState> _runtime_state;
Expand All @@ -118,6 +121,8 @@ class Reusable {
std::unordered_set<int32_t> _missing_col_uids;
// included cids in rowstore(column group)
std::unordered_set<int32_t> _include_col_uids;
// delete sign idx in block
int32_t _delete_sign_idx = -1;
};

// RowCache is a LRU cache for row store
Expand Down
12 changes: 6 additions & 6 deletions be/src/util/mem_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,12 +320,12 @@ void MemInfo::init() {
// https://serverfault.com/questions/940196/why-is-memavailable-a-lot-less-than-memfreebufferscached
// https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit/?id=34e431b0ae398fc54ea69ff85ec700722c9da773
//
// upper sys_mem_available_low_water_mark, avoid wasting too much memory.
_s_sys_mem_available_low_water_mark = std::max<int64_t>(
std::min<int64_t>(std::min<int64_t>(_s_physical_mem - _s_mem_limit,
int64_t(_s_physical_mem * 0.05)),
config::max_sys_mem_available_low_water_mark_bytes),
0);
// smaller sys_mem_available_low_water_mark can avoid wasting too much memory.
_s_sys_mem_available_low_water_mark =
config::max_sys_mem_available_low_water_mark_bytes != -1
? config::max_sys_mem_available_low_water_mark_bytes
: std::min<int64_t>(_s_physical_mem - _s_mem_limit,
int64_t(_s_physical_mem * 0.05));
_s_sys_mem_available_warning_water_mark = _s_sys_mem_available_low_water_mark * 2;
}

Expand Down
5 changes: 3 additions & 2 deletions be/src/vec/data_types/data_type_decimal.h
Original file line number Diff line number Diff line change
Expand Up @@ -595,10 +595,11 @@ void convert_from_decimal(typename ToDataType::FieldType* dst,
dst[i] = static_cast<ToFieldType>(src[i].value) / multiplier.value;
}
}
FromDataType from_data_type(precision, scale);
if constexpr (narrow_integral) {
FromDataType from_data_type(precision, scale);
for (size_t i = 0; i < size; i++) {
if (dst[i] < min_result || dst[i] > max_result) {
if (std::isnan(dst[i]) || std::isinf(dst[i]) || dst[i] < min_result ||
dst[i] > max_result) {
THROW_DECIMAL_CONVERT_OVERFLOW_EXCEPTION(from_data_type.to_string(src[i]),
from_data_type.get_name(),
ToDataType {}.get_name());
Expand Down
1 change: 0 additions & 1 deletion be/src/vec/exprs/vexpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,6 @@ class VExpr {
uint32_t _index_unique_id = 0;
bool _can_fast_execute = false;
bool _enable_inverted_index_query = true;
uint32_t _in_list_value_count_threshold = 10;
};

} // namespace vectorized
Expand Down
Loading

0 comments on commit 5d544f8

Please sign in to comment.