Skip to content

Commit

Permalink
[Feature](partition) Support OLAP table null partition (apache#31827)
Browse files Browse the repository at this point in the history
    for auto partition, support nullable partition column.
    for auto list partition, support create real null partition for null values.
    for auto range partition, null value will raise a error now. but maybe we can improve this in the future
  • Loading branch information
zclllyybb committed Apr 11, 2024
1 parent 029b0be commit b391857
Show file tree
Hide file tree
Showing 36 changed files with 569 additions and 412 deletions.
253 changes: 135 additions & 118 deletions be/src/exec/tablet_info.cpp

Large diffs are not rendered by default.

6 changes: 2 additions & 4 deletions be/src/exec/tablet_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class VOlapTablePartKeyComparator {

// return true if lhs < rhs
// 'row' is -1 mean maximal boundary
bool operator()(const BlockRowWithIndicator lhs, const BlockRowWithIndicator rhs) const;
bool operator()(const BlockRowWithIndicator& lhs, const BlockRowWithIndicator& rhs) const;

private:
const std::vector<uint16_t>& _slot_locs;
Expand All @@ -168,7 +168,6 @@ class VOlapTablePartitionParam {
int64_t version() const { return _t_param.version; }

// return true if we found this block_row in partition
//TODO: use virtual function to refactor it
ALWAYS_INLINE bool find_partition(vectorized::Block* block, int row,
VOlapTablePartition*& partition) const {
auto it = _is_in_partition ? _partitions_map->find(std::tuple {block, row, true})
Expand Down Expand Up @@ -275,8 +274,6 @@ class VOlapTablePartitionParam {
private:
Status _create_partition_keys(const std::vector<TExprNode>& t_exprs, BlockRow* part_key);

Status _create_partition_key(const TExprNode& t_expr, BlockRow* part_key, uint16_t pos);

// check if this partition contain this key
bool _part_contains(VOlapTablePartition* part, BlockRowWithIndicator key) const;

Expand All @@ -295,6 +292,7 @@ class VOlapTablePartitionParam {
std::vector<VOlapTablePartition*> _partitions;
// For all partition value rows saved in this map, indicator is false. whenever we use a value to find in it, the param is true.
// so that we can distinguish which column index to use (origin slots or transformed slots).
// For range partition we ONLY SAVE RIGHT ENDS. when we find a part's RIGHT by a value, check if part's left cover it then.
std::unique_ptr<
std::map<BlockRowWithIndicator, VOlapTablePartition*, VOlapTablePartKeyComparator>>
_partitions_map;
Expand Down
17 changes: 8 additions & 9 deletions be/src/olap/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,15 +261,14 @@ constexpr bool is_numeric_type(const FieldType& field_type) {
}

// Util used to get string name of thrift enum item
#define EnumToString(enum_type, index, out) \
do { \
std::map<int, const char*>::const_iterator it = \
_##enum_type##_VALUES_TO_NAMES.find(index); \
if (it == _##enum_type##_VALUES_TO_NAMES.end()) { \
out = "NULL"; \
} else { \
out = it->second; \
} \
#define EnumToString(enum_type, index, out) \
do { \
auto it = _##enum_type##_VALUES_TO_NAMES.find(index); \
if (it == _##enum_type##_VALUES_TO_NAMES.end()) { \
out = "NULL"; \
} else { \
out = it->second; \
} \
} while (0)

struct RowLocation {
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/columns/column_const.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ ColumnPtr ColumnConst::index(const IColumn& indexes, size_t limit) const {
}

std::pair<ColumnPtr, size_t> check_column_const_set_readability(const IColumn& column,
const size_t row_num) noexcept {
size_t row_num) noexcept {
std::pair<ColumnPtr, size_t> result;
if (is_column_const(column)) {
result.first = static_cast<const ColumnConst&>(column).get_data_column_ptr();
Expand Down
85 changes: 41 additions & 44 deletions be/src/vec/columns/column_const.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@
#pragma once

#include <glog/logging.h>
#include <stdint.h>
#include <sys/types.h>

#include <concepts>
#include <cstddef>
#include <cstdint>
#include <functional>
#include <initializer_list>
#include <ostream>
Expand All @@ -48,14 +47,43 @@

class SipHash;

namespace doris {
namespace vectorized {
namespace doris::vectorized {

class Arena;
class Block;
} // namespace vectorized
} // namespace doris

namespace doris::vectorized {
/*
* @return first : pointer to column itself if it's not ColumnConst, else to column's data column.
* second : zero if column is ColumnConst, else itself.
*/
std::pair<ColumnPtr, size_t> check_column_const_set_readability(const IColumn& column,
size_t row_num) noexcept;

/*
* @warning use this function sometimes cause performance problem in GCC.
*/
template <typename T>
requires std::is_integral_v<T>
T index_check_const(T arg, bool constancy) noexcept {
return constancy ? 0 : arg;
}

/*
* @return first : data_column_ptr for ColumnConst, itself otherwise.
* second : whether it's ColumnConst.
*/
std::pair<const ColumnPtr&, bool> unpack_if_const(const ColumnPtr&) noexcept;

/*
* For the functions that some columns of arguments are almost but not completely always const, we use this function to preprocessing its parameter columns
* (which are not data columns). When we have two or more columns which only provide parameter, use this to deal with corner case. So you can specialize you
* implementations for all const or all parameters const, without considering some of parameters are const.
* Do the transformation only for the columns whose arg_indexes in parameters.
*/
void default_preprocess_parameter_columns(ColumnPtr* columns, const bool* col_const,
const std::initializer_list<size_t>& parameters,
Block& block, const ColumnNumbers& arg_indexes) noexcept;

/** ColumnConst contains another column with single element,
* but looks like a column with arbitrary amount of same elements.
Expand Down Expand Up @@ -142,7 +170,7 @@ class ColumnConst final : public COWHelper<IColumn, ColumnConst> {
}

const char* deserialize_and_insert_from_arena(const char* pos) override {
auto res = data->deserialize_and_insert_from_arena(pos);
const auto* res = data->deserialize_and_insert_from_arena(pos);
data->pop_back(1);
++s;
return res;
Expand Down Expand Up @@ -204,8 +232,9 @@ class ColumnConst final : public COWHelper<IColumn, ColumnConst> {
int compare_at(size_t, size_t, const IColumn& rhs, int nan_direction_hint) const override {
auto rhs_const_column = assert_cast<const ColumnConst&>(rhs);

auto* this_nullable = check_and_get_column<ColumnNullable>(data.get());
auto* rhs_nullable = check_and_get_column<ColumnNullable>(rhs_const_column.data.get());
const auto* this_nullable = check_and_get_column<ColumnNullable>(data.get());
const auto* rhs_nullable =
check_and_get_column<ColumnNullable>(rhs_const_column.data.get());
if (this_nullable && rhs_nullable) {
return data->compare_at(0, 0, *rhs_const_column.data, nan_direction_hint);
} else if (this_nullable) {
Expand All @@ -228,8 +257,9 @@ class ColumnConst final : public COWHelper<IColumn, ColumnConst> {
void for_each_subcolumn(ColumnCallback callback) override { callback(data); }

bool structure_equals(const IColumn& rhs) const override {
if (auto rhs_concrete = typeid_cast<const ColumnConst*>(&rhs))
if (const auto* rhs_concrete = typeid_cast<const ColumnConst*>(&rhs)) {
return data->structure_equals(*rhs_concrete->data);
}
return false;
}

Expand Down Expand Up @@ -264,37 +294,4 @@ class ColumnConst final : public COWHelper<IColumn, ColumnConst> {
LOG(FATAL) << "should not call the method in column const";
}
};

/*
* @return first : pointer to column itself if it's not ColumnConst, else to column's data column.
* second : zero if column is ColumnConst, else itself.
*/
std::pair<ColumnPtr, size_t> check_column_const_set_readability(const IColumn& column,
const size_t row_num) noexcept;

/*
* @warning use this function sometimes cause performance problem in GCC.
*/
template <typename T>
requires std::is_integral_v<T>
T index_check_const(T arg, bool constancy) noexcept {
return constancy ? 0 : arg;
}

/*
* @return first : data_column_ptr for ColumnConst, itself otherwise.
* second : whether it's ColumnConst.
*/
std::pair<const ColumnPtr&, bool> unpack_if_const(const ColumnPtr&) noexcept;

/*
* For the functions that some columns of arguments are almost but not completely always const, we use this function to preprocessing its parameter columns
* (which are not data columns). When we have two or more columns which only provide parameter, use this to deal with corner case. So you can specialize you
* implementations for all const or all parameters const, without considering some of parameters are const.
* Do the transformation only for the columns whose arg_indexes in parameters.
*/
void default_preprocess_parameter_columns(ColumnPtr* columns, const bool* col_const,
const std::initializer_list<size_t>& parameters,
Block& block, const ColumnNumbers& arg_indexes) noexcept;
} // namespace doris::vectorized
2 changes: 1 addition & 1 deletion be/src/vec/columns/column_nullable.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class ColumnNullable final : public COWHelper<IColumn, ColumnNullable> {
std::string get_name() const override { return "Nullable(" + nested_column->get_name() + ")"; }
MutableColumnPtr clone_resized(size_t size) const override;
size_t size() const override { return assert_cast<const ColumnUInt8&>(*null_map).size(); }
bool is_null_at(size_t n) const override {
PURE bool is_null_at(size_t n) const override {
return assert_cast<const ColumnUInt8&>(*null_map).get_data()[n] != 0;
}
bool is_default_at(size_t n) const override { return is_null_at(n); }
Expand Down
13 changes: 7 additions & 6 deletions be/src/vec/common/assert_cast.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,24 @@

#pragma once

#include <string>
#include <type_traits>
#include <typeindex>
#include <typeinfo>

#include "common/logging.h"
#include "fmt/format.h"
#include "vec/common/demangle.h"

/** Perform static_cast in release build.
* Checks type by comparing typeid and throw an exception in debug build.
* The exact match of the type is checked. That is, cast to the ancestor will be unsuccessful.
*/
template <typename To, typename From>
To assert_cast(From&& from) {
PURE To assert_cast(From&& from) {
#ifndef NDEBUG
try {
if constexpr (std::is_pointer_v<To>) {
if (typeid(*from) == typeid(std::remove_pointer_t<To>)) return static_cast<To>(from);
if (typeid(*from) == typeid(std::remove_pointer_t<To>)) {
return static_cast<To>(from);
}
if constexpr (std::is_pointer_v<std::remove_reference_t<From>>) {
if (auto ptr = dynamic_cast<To>(from); ptr != nullptr) {
return ptr;
Expand All @@ -48,7 +47,9 @@ To assert_cast(From&& from) {
demangle(typeid(To).name()));
}
} else {
if (typeid(from) == typeid(To)) return static_cast<To>(from);
if (typeid(from) == typeid(To)) {
return static_cast<To>(from);
}
}
} catch (const std::exception& e) {
LOG(FATAL) << "assert cast err:" << e.what();
Expand Down
60 changes: 37 additions & 23 deletions be/src/vec/sink/vrow_distribution.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include <cstdint>
#include <memory>
#include <sstream>

#include "common/status.h"
#include "runtime/client_cache.h"
Expand All @@ -33,6 +34,7 @@
#include "vec/columns/column_const.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_vector.h"
#include "vec/common/assert_cast.h"
#include "vec/data_types/data_type.h"
#include "vec/sink/writer/vtablet_writer.h"

Expand All @@ -43,17 +45,24 @@ VRowDistribution::_get_partition_function() {
return {_vpartition->get_part_func_ctx(), _vpartition->get_partition_function()};
}

Status VRowDistribution::_save_missing_values(std::vector<std::vector<std::string>>& col_strs,
int col_size, Block* block,
std::vector<int64_t> filter) {
Status VRowDistribution::_save_missing_values(
std::vector<std::vector<std::string>>& col_strs, // non-const ref for move
int col_size, Block* block, std::vector<int64_t> filter,
const std::vector<const NullMap*>& col_null_maps) {
// de-duplication for new partitions but save all rows.
_batching_block->add_rows(block, filter);
std::vector<TStringLiteral> cur_row_values;
std::vector<TNullableStringLiteral> cur_row_values;
for (int row = 0; row < col_strs[0].size(); ++row) {
cur_row_values.clear();
for (int col = 0; col < col_size; ++col) {
TStringLiteral node;
node.value = std::move(col_strs[col][row]);
TNullableStringLiteral node;
const auto* null_map = col_null_maps[col]; // null map for this col
node.__set_is_null((null_map && (*null_map)[filter[row]])
? true
: node.is_null); // if not, dont change(default false)
if (!node.is_null) {
node.__set_value(col_strs[col][row]);
}
cur_row_values.push_back(node);
}
//For duplicate cur_values, they will be filtered in FE
Expand Down Expand Up @@ -293,7 +302,6 @@ Status VRowDistribution::_generate_rows_distribution_for_auto_partition(
auto num_rows = block->rows();
std::vector<uint16_t> partition_keys = _vpartition->get_partition_keys();

//TODO: use loop to create missing_vals for multi column.
auto partition_col = block->get_by_position(partition_keys[0]);
_missing_map.clear();
_missing_map.reserve(partition_col.column->size());
Expand All @@ -313,29 +321,34 @@ Status VRowDistribution::_generate_rows_distribution_for_auto_partition(

if (!_missing_map.empty()) {
// for missing partition keys, calc the missing partition and save in _partitions_need_create
auto [part_ctxs, part_funcs] = _get_partition_function();
auto funcs_size = part_funcs.size();
auto [part_ctxs, part_exprs] = _get_partition_function();
auto part_col_num = part_exprs.size();
// the two vectors are in column-first-order
std::vector<std::vector<std::string>> col_strs;
col_strs.resize(funcs_size);

for (int i = 0; i < funcs_size; ++i) {
auto return_type = part_funcs[i]->data_type();
// expose the data column
vectorized::ColumnPtr range_left_col =
block->get_by_position(partition_cols_idx[i]).column;
if (const auto* nullable =
check_and_get_column<vectorized::ColumnNullable>(*range_left_col)) {
range_left_col = nullable->get_nested_column_ptr();
return_type = assert_cast<const vectorized::DataTypeNullable*>(return_type.get())
->get_nested_type();
std::vector<const NullMap*> col_null_maps;
col_strs.resize(part_col_num);
col_null_maps.reserve(part_col_num);

for (int i = 0; i < part_col_num; ++i) {
auto return_type = part_exprs[i]->data_type();
// expose the data column. the return type would be nullable
const auto& [range_left_col, col_const] =
unpack_if_const(block->get_by_position(partition_cols_idx[i]).column);
if (range_left_col->is_nullable()) {
col_null_maps.push_back(&(assert_cast<const ColumnNullable*>(range_left_col.get())
->get_null_map_data()));
} else {
col_null_maps.push_back(nullptr);
}
for (auto row : _missing_map) {
col_strs[i].push_back(return_type->to_string(*range_left_col, row));
col_strs[i].push_back(
return_type->to_string(*range_left_col, index_check_const(row, col_const)));
}
}

// calc the end value and save them. in the end of sending, we will create partitions for them and deal them.
RETURN_IF_ERROR(_save_missing_values(col_strs, funcs_size, block, _missing_map));
RETURN_IF_ERROR(
_save_missing_values(col_strs, part_col_num, block, _missing_map, col_null_maps));

size_t new_bt_rows = _batching_block->rows();
size_t new_bt_bytes = _batching_block->bytes();
Expand Down Expand Up @@ -420,6 +433,7 @@ Status VRowDistribution::generate_rows_distribution(
auto func_size = part_funcs.size();
for (int i = 0; i < func_size; ++i) {
int result_idx = -1;
// we just calc left range here. leave right to FE to avoid dup calc.
RETURN_IF_ERROR(part_funcs[i]->execute(part_ctxs[i].get(), block.get(), &result_idx));
VLOG_DEBUG << "Partition-calculated block:" << block->dump_data();
partition_cols_idx.push_back(result_idx);
Expand Down
5 changes: 3 additions & 2 deletions be/src/vec/sink/vrow_distribution.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ class VRowDistribution {
std::pair<vectorized::VExprContextSPtrs, vectorized::VExprSPtrs> _get_partition_function();

Status _save_missing_values(std::vector<std::vector<std::string>>& col_strs, int col_size,
Block* block, std::vector<int64_t> filter);
Block* block, std::vector<int64_t> filter,
const std::vector<const NullMap*>& col_null_maps);

void _get_tablet_ids(vectorized::Block* block, int32_t index_idx,
std::vector<int64_t>& tablet_ids);
Expand Down Expand Up @@ -173,7 +174,7 @@ class VRowDistribution {
int _batch_size = 0;

// for auto partitions
std::vector<std::vector<TStringLiteral>> _partitions_need_create;
std::vector<std::vector<TNullableStringLiteral>> _partitions_need_create;

public:
std::unique_ptr<MutableBlock> _batching_block;
Expand Down
Loading

0 comments on commit b391857

Please sign in to comment.