Skip to content

Commit

Permalink
FIRST
Browse files Browse the repository at this point in the history
  • Loading branch information
zhiqiang-hhhh committed Aug 7, 2024
1 parent 44ac169 commit 8e48df9
Show file tree
Hide file tree
Showing 70 changed files with 518 additions and 310 deletions.
4 changes: 3 additions & 1 deletion be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "util/crc32c.h"
#include "util/debug_points.h"
#include "util/doris_metrics.h"
#include "vec/common/assert_cast.h"
#include "vec/common/schema_util.h"
#include "vec/data_types/data_type_factory.hpp"
#include "vec/jsonb/serialize.h"
Expand Down Expand Up @@ -994,7 +995,8 @@ Status BaseTablet::generate_new_block_for_partial_update(
if (rs_column.has_default_value()) {
mutable_column->insert_from(*mutable_default_value_columns[i].get(), 0);
} else if (rs_column.is_nullable()) {
assert_cast<vectorized::ColumnNullable*>(mutable_column.get())
assert_cast<vectorized::ColumnNullable*, TypeCheck::Disable>(
mutable_column.get())
->insert_null_elements(1);
} else {
mutable_column->insert_default();
Expand Down
8 changes: 5 additions & 3 deletions be/src/olap/bloom_filter_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_vector.h"
#include "vec/columns/predicate_column.h"
#include "vec/common/assert_cast.h"
#include "vec/exprs/vruntimefilter_wrapper.h"

namespace doris {
Expand Down Expand Up @@ -69,13 +70,14 @@ class BloomFilterColumnPredicate : public ColumnPredicate {

uint16_t new_size = 0;
if (column.is_column_dictionary()) {
const auto* dict_col = assert_cast<const vectorized::ColumnDictI32*>(&column);
const auto* dict_col =
assert_cast<const vectorized::ColumnDictI32*, TypeCheck::Disable>(&column);
new_size = _specific_filter->template find_dict_olap_engine<is_nullable>(
dict_col, null_map, sel, size);
} else {
const auto& data =
assert_cast<const vectorized::PredicateColumnType<PredicateEvaluateType<T>>*>(
&column)
assert_cast<const vectorized::PredicateColumnType<PredicateEvaluateType<T>>*,
TypeCheck::Disable>(&column)
->get_data();
new_size = _specific_filter->find_fixed_len_olap_engine((char*)data.data(), null_map,
sel, size, data.size() != size);
Expand Down
6 changes: 4 additions & 2 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,8 @@ Status BlockChanger::change_block(vectorized::Block* ref_block,
// not nullable to nullable
if (new_col_nullable) {
auto* new_nullable_col =
assert_cast<vectorized::ColumnNullable*>(new_col->assume_mutable().get());
assert_cast<vectorized::ColumnNullable*, TypeCheck::Disable>(
new_col->assume_mutable().get());

new_nullable_col->change_nested_column(ref_col);
new_nullable_col->get_null_map_data().resize_fill(ref_col->size());
Expand All @@ -356,7 +357,8 @@ Status BlockChanger::change_block(vectorized::Block* ref_block,
// the cast expr of schema change is `CastExpr(CAST String to Nullable(Int32))`,
// so need to handle nullable to not nullable here
auto* ref_nullable_col =
assert_cast<vectorized::ColumnNullable*>(ref_col->assume_mutable().get());
assert_cast<vectorized::ColumnNullable*, TypeCheck::Disable>(
ref_col->assume_mutable().get());

new_col = ref_nullable_col->get_nested_column_ptr();
}
Expand Down
72 changes: 40 additions & 32 deletions be/src/vec/aggregate_functions/aggregate_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,32 +262,34 @@ class IAggregateFunctionHelper : public IAggregateFunction {
}
auto iter = place_rows.begin();
while (iter != place_rows.end()) {
assert_cast<const Derived*>(this)->add_many(iter->first, columns, iter->second,
arena);
assert_cast<const Derived*, TypeCheck::Disable>(this)->add_many(
iter->first, columns, iter->second, arena);
iter++;
}
return;
}
}

for (size_t i = 0; i < batch_size; ++i) {
assert_cast<const Derived*>(this)->add(places[i] + place_offset, columns, i, arena);
assert_cast<const Derived*, TypeCheck::Disable>(this)->add(places[i] + place_offset,
columns, i, arena);
}
}

void add_batch_selected(size_t batch_size, AggregateDataPtr* places, size_t place_offset,
const IColumn** columns, Arena* arena) const override {
for (size_t i = 0; i < batch_size; ++i) {
if (places[i]) {
assert_cast<const Derived*>(this)->add(places[i] + place_offset, columns, i, arena);
assert_cast<const Derived*, TypeCheck::Disable>(this)->add(places[i] + place_offset,
columns, i, arena);
}
}
}

void add_batch_single_place(size_t batch_size, AggregateDataPtr place, const IColumn** columns,
Arena* arena) const override {
for (size_t i = 0; i < batch_size; ++i) {
assert_cast<const Derived*>(this)->add(place, columns, i, arena);
assert_cast<const Derived*, TypeCheck::Disable>(this)->add(place, columns, i, arena);
}
}
//now this is use for sum/count/avg/min/max win function, other win function should override this function in class
Expand All @@ -298,28 +300,30 @@ class IAggregateFunctionHelper : public IAggregateFunction {
frame_start = std::max<int64_t>(frame_start, partition_start);
frame_end = std::min<int64_t>(frame_end, partition_end);
for (int64_t i = frame_start; i < frame_end; ++i) {
assert_cast<const Derived*>(this)->add(place, columns, i, arena);
assert_cast<const Derived*, TypeCheck::Disable>(this)->add(place, columns, i, arena);
}
}

void add_batch_range(size_t batch_begin, size_t batch_end, AggregateDataPtr place,
const IColumn** columns, Arena* arena, bool has_null) override {
for (size_t i = batch_begin; i <= batch_end; ++i) {
assert_cast<const Derived*>(this)->add(place, columns, i, arena);
assert_cast<const Derived*, TypeCheck::Disable>(this)->add(place, columns, i, arena);
}
}

void insert_result_into_vec(const std::vector<AggregateDataPtr>& places, const size_t offset,
IColumn& to, const size_t num_rows) const override {
for (size_t i = 0; i != num_rows; ++i) {
assert_cast<const Derived*>(this)->insert_result_into(places[i] + offset, to);
assert_cast<const Derived*, TypeCheck::Disable>(this)->insert_result_into(
places[i] + offset, to);
}
}

void serialize_vec(const std::vector<AggregateDataPtr>& places, size_t offset,
BufferWritable& buf, const size_t num_rows) const override {
for (size_t i = 0; i != num_rows; ++i) {
assert_cast<const Derived*>(this)->serialize(places[i] + offset, buf);
assert_cast<const Derived*, TypeCheck::Disable>(this)->serialize(places[i] + offset,
buf);
buf.commit();
}
}
Expand All @@ -334,10 +338,11 @@ class IAggregateFunctionHelper : public IAggregateFunction {
const size_t num_rows, Arena* arena) const override {
std::vector<char> place(size_of_data());
for (size_t i = 0; i != num_rows; ++i) {
assert_cast<const Derived*>(this)->create(place.data());
assert_cast<const Derived*, TypeCheck::Disable>(this)->create(place.data());
DEFER({ assert_cast<const Derived*>(this)->destroy(place.data()); });
assert_cast<const Derived*>(this)->add(place.data(), columns, i, arena);
assert_cast<const Derived*>(this)->serialize(place.data(), buf);
assert_cast<const Derived*, TypeCheck::Disable>(this)->add(place.data(), columns, i,
arena);
assert_cast<const Derived*, TypeCheck::Disable>(this)->serialize(place.data(), buf);
buf.commit();
}
}
Expand All @@ -362,12 +367,13 @@ class IAggregateFunctionHelper : public IAggregateFunction {
try {
auto place = places + size_of_data * i;
VectorBufferReader buffer_reader(column->get_data_at(i));
assert_cast<const Derived*>(this)->create(place);
assert_cast<const Derived*>(this)->deserialize(place, buffer_reader, arena);
assert_cast<const Derived*, TypeCheck::Disable>(this)->create(place);
assert_cast<const Derived*, TypeCheck::Disable>(this)->deserialize(
place, buffer_reader, arena);
} catch (...) {
for (int j = 0; j < i; ++j) {
auto place = places + size_of_data * j;
assert_cast<const Derived*>(this)->destroy(place);
assert_cast<const Derived*, TypeCheck::Disable>(this)->destroy(place);
}
throw;
}
Expand All @@ -383,8 +389,8 @@ class IAggregateFunctionHelper : public IAggregateFunction {
try {
auto rhs_place = rhs + size_of_data * i;
VectorBufferReader buffer_reader(column_string->get_data_at(i));
assert_cast<const Derived*>(this)->create(rhs_place);
assert_cast<const Derived*>(this)->deserialize_and_merge(
assert_cast<const Derived*, TypeCheck::Disable>(this)->create(rhs_place);
assert_cast<const Derived*, TypeCheck::Disable>(this)->deserialize_and_merge(
places[i] + offset, rhs_place, buffer_reader, arena);
} catch (...) {
for (int j = 0; j < i; ++j) {
Expand All @@ -394,7 +400,7 @@ class IAggregateFunctionHelper : public IAggregateFunction {
throw;
}
}
assert_cast<const Derived*>(this)->destroy_vec(rhs, num_rows);
assert_cast<const Derived*, TypeCheck::Disable>(this)->destroy_vec(rhs, num_rows);
}

void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset,
Expand All @@ -406,15 +412,15 @@ class IAggregateFunctionHelper : public IAggregateFunction {
try {
auto rhs_place = rhs + size_of_data * i;
VectorBufferReader buffer_reader(column_string->get_data_at(i));
assert_cast<const Derived*>(this)->create(rhs_place);
assert_cast<const Derived*, TypeCheck::Disable>(this)->create(rhs_place);
if (places[i]) {
assert_cast<const Derived*>(this)->deserialize_and_merge(
assert_cast<const Derived*, TypeCheck::Disable>(this)->deserialize_and_merge(
places[i] + offset, rhs_place, buffer_reader, arena);
}
} catch (...) {
for (int j = 0; j < i; ++j) {
auto place = rhs + size_of_data * j;
assert_cast<const Derived*>(this)->destroy(place);
assert_cast<const Derived*, TypeCheck::Disable>(this)->destroy(place);
}
throw;
}
Expand All @@ -431,8 +437,8 @@ class IAggregateFunctionHelper : public IAggregateFunction {
Arena* arena, const size_t num_rows) const override {
const auto size_of_data = assert_cast<const Derived*>(this)->size_of_data();
for (size_t i = 0; i != num_rows; ++i) {
assert_cast<const Derived*>(this)->merge(places[i] + offset, rhs + size_of_data * i,
arena);
assert_cast<const Derived*, TypeCheck::Disable>(this)->merge(
places[i] + offset, rhs + size_of_data * i, arena);
}
}

Expand All @@ -442,8 +448,8 @@ class IAggregateFunctionHelper : public IAggregateFunction {
const auto size_of_data = assert_cast<const Derived*>(this)->size_of_data();
for (size_t i = 0; i != num_rows; ++i) {
if (places[i]) {
assert_cast<const Derived*>(this)->merge(places[i] + offset, rhs + size_of_data * i,
arena);
assert_cast<const Derived*, TypeCheck::Disable>(this)->merge(
places[i] + offset, rhs + size_of_data * i, arena);
}
}
}
Expand All @@ -457,11 +463,13 @@ class IAggregateFunctionHelper : public IAggregateFunction {
auto* deserialized_place = (AggregateDataPtr)deserialized_data.data();
for (size_t i = begin; i <= end; ++i) {
VectorBufferReader buffer_reader(
(assert_cast<const ColumnString&>(column)).get_data_at(i));
assert_cast<const Derived*>(this)->create(deserialized_place);
DEFER({ assert_cast<const Derived*>(this)->destroy(deserialized_place); });
assert_cast<const Derived*>(this)->deserialize_and_merge(place, deserialized_place,
buffer_reader, arena);
(assert_cast<const ColumnString&, TypeCheck::Disable>(column)).get_data_at(i));
assert_cast<const Derived*, TypeCheck::Disable>(this)->create(deserialized_place);
DEFER({
assert_cast<const Derived*, TypeCheck::Disable>(this)->destroy(deserialized_place);
});
assert_cast<const Derived*, TypeCheck::Disable>(this)->deserialize_and_merge(
place, deserialized_place, buffer_reader, arena);
}
}

Expand All @@ -475,8 +483,8 @@ class IAggregateFunctionHelper : public IAggregateFunction {

void deserialize_and_merge(AggregateDataPtr __restrict place, AggregateDataPtr __restrict rhs,
BufferReadable& buf, Arena* arena) const override {
assert_cast<const Derived*>(this)->deserialize(rhs, buf, arena);
assert_cast<const Derived*>(this)->merge(place, rhs, arena);
assert_cast<const Derived*, TypeCheck::Disable>(this)->deserialize(rhs, buf, arena);
assert_cast<const Derived*, TypeCheck::Disable>(this)->merge(place, rhs, arena);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/aggregate_functions/aggregate_function_simple_factory.h"
#include "vec/columns/columns_number.h"
#include "vec/common/assert_cast.h"
#include "vec/common/string_ref.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type_number.h"
Expand Down Expand Up @@ -98,12 +99,13 @@ class AggregateFunctionApproxCountDistinct final
void add(AggregateDataPtr __restrict place, const IColumn** columns, ssize_t row_num,
Arena*) const override {
if constexpr (IsFixLenColumnType<ColumnDataType>::value) {
auto column = assert_cast<const ColumnDataType*>(columns[0]);
auto column = assert_cast<const ColumnDataType*, TypeCheck::Disable>(columns[0]);
auto value = column->get_element(row_num);
this->data(place).add(
HashUtil::murmur_hash64A((char*)&value, sizeof(value), HashUtil::MURMUR_SEED));
} else {
auto value = assert_cast<const ColumnDataType*>(columns[0])->get_data_at(row_num);
auto value = assert_cast<const ColumnDataType*, TypeCheck::Disable>(columns[0])
->get_data_at(row_num);
uint64_t hash_value =
HashUtil::murmur_hash64A(value.data, value.size, HashUtil::MURMUR_SEED);
this->data(place).add(hash_value);
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/aggregate_functions/aggregate_function_avg.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class AggregateFunctionAvg final
#ifdef __clang__
#pragma clang fp reassociate(on)
#endif
const auto& column = assert_cast<const ColVecType&>(*columns[0]);
const auto& column = assert_cast<const ColVecType&, TypeCheck::Disable>(*columns[0]);
if constexpr (IsDecimalNumber<T>) {
this->data(place).sum += column.get_data()[row_num].value;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ class AggregateFunctionAvgWeight final

void add(AggregateDataPtr __restrict place, const IColumn** columns, ssize_t row_num,
Arena*) const override {
const auto& column = assert_cast<const ColVecType&>(*columns[0]);
const auto& weight = assert_cast<const ColumnFloat64&>(*columns[1]);
const auto& column = assert_cast<const ColVecType&, TypeCheck::Disable>(*columns[0]);
const auto& weight = assert_cast<const ColumnFloat64&, TypeCheck::Disable>(*columns[1]);
this->data(place).add(column.get_data()[row_num], weight.get_element(row_num));
}

Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/aggregate_functions/aggregate_function_bit.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <memory>

#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/common/assert_cast.h"
#include "vec/core/types.h"
#include "vec/io/io_helper.h"

Expand Down Expand Up @@ -114,7 +115,7 @@ class AggregateFunctionBitwise final

void add(AggregateDataPtr __restrict place, const IColumn** columns, ssize_t row_num,
Arena*) const override {
const auto& column = assert_cast<const ColumnVector<T>&>(*columns[0]);
const auto& column = assert_cast<const ColumnVector<T>&, TypeCheck::Disable>(*columns[0]);
this->data(place).add(column.get_data()[row_num]);
}

Expand Down
15 changes: 8 additions & 7 deletions be/src/vec/aggregate_functions/aggregate_function_bitmap.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,10 @@ class AggregateFunctionBitmapSerializationHelper
col.resize(num_rows);
auto* data = col.get_data().data();
for (size_t i = 0; i != num_rows; ++i) {
assert_cast<const Derived*>(this)->create(place);
DEFER({ assert_cast<const Derived*>(this)->destroy(place); });
assert_cast<const Derived*>(this)->add(place, columns, i, arena);
assert_cast<const Derived*, TypeCheck::Disable>(this)->create(place);
DEFER({ assert_cast<const Derived*, TypeCheck::Disable>(this)->destroy(place); });
assert_cast<const Derived*, TypeCheck::Disable>(this)->add(place, columns, i,
arena);
data[i] = std::move(this->data(place).value);
}
} else {
Expand Down Expand Up @@ -304,7 +305,7 @@ class AggregateFunctionBitmapOp final

void add(AggregateDataPtr __restrict place, const IColumn** columns, ssize_t row_num,
Arena*) const override {
const auto& column = assert_cast<const ColVecType&>(*columns[0]);
const auto& column = assert_cast<const ColVecType&, TypeCheck::Disable>(*columns[0]);
this->data(place).add(column.get_data()[row_num]);
}

Expand Down Expand Up @@ -367,12 +368,12 @@ class AggregateFunctionBitmapCount final
if constexpr (arg_is_nullable) {
auto& nullable_column = assert_cast<const ColumnNullable&>(*columns[0]);
if (!nullable_column.is_null_at(row_num)) {
const auto& column =
assert_cast<const ColVecType&>(nullable_column.get_nested_column());
const auto& column = assert_cast<const ColVecType&, TypeCheck::Disable>(
nullable_column.get_nested_column());
this->data(place).add(column.get_data()[row_num]);
}
} else {
const auto& column = assert_cast<const ColVecType&>(*columns[0]);
const auto& column = assert_cast<const ColVecType&, TypeCheck::Disable>(*columns[0]);
this->data(place).add(column.get_data()[row_num]);
}
}
Expand Down
Loading

0 comments on commit 8e48df9

Please sign in to comment.