Skip to content

Commit

Permalink
nested
Browse files Browse the repository at this point in the history
  • Loading branch information
eldenmoon committed Aug 7, 2024
1 parent 9039c60 commit e298997
Show file tree
Hide file tree
Showing 10 changed files with 266 additions and 71 deletions.
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ class EmptyColumnIterator : public ColumnIterator {

Status read_by_rowids(const rowid_t* rowids, const size_t count,
vectorized::MutableColumnPtr& dst) override {
return Status::NotSupported("Not supported read_by_rowids");
return Status::OK();
}

ordinal_t get_current_ordinal() const override {
Expand Down
96 changes: 72 additions & 24 deletions be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
#include "vec/columns/column_object.h"
#include "vec/columns/subcolumn_tree.h"
#include "vec/common/assert_cast.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/columns_with_type_and_name.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_array.h"
#include "vec/data_types/data_type_nullable.h"
Expand Down Expand Up @@ -119,59 +122,104 @@ class HierarchicalDataReader : public ColumnIterator {
auto type = root_var.get_root_type();
container_variant.add_sub_column({}, std::move(column), type);
}
bool nested = false;
// parent -> subcolumns
std::map<PathInData, PathsWithColumnAndType> nested_subcolumns;
PathsWithColumnAndType non_nested_subcolumns;
RETURN_IF_ERROR(tranverse([&](SubstreamReaderTree::Node& node) {
MutableColumnPtr column = node.data.column->get_ptr();
PathInData real_path = node.path.copy_pop_nfront(_path.get_parts().size());
bool add =
container_variant.add_sub_column(real_path, std::move(column), node.data.type);
if (!add) {
return Status::InternalError("Duplicated {}, type {}", node.path.get_path(),
node.data.type->get_name());
}

if (node.parent->is_nested()) {
nested = true;
CHECK_EQ(getTypeName(remove_nullable(node.data.type)->get_type_id()),
getTypeName(TypeIndex::Array));
nested_subcolumns[real_path.copy_pop_back()].emplace_back(
real_path, column->get_ptr(), node.data.type);
} else {
non_nested_subcolumns.emplace_back(real_path, column->get_ptr(), node.data.type);
}
return Status::OK();
}));

if (nested) {
container_variant.finalize_if_not();
for (auto& entry : non_nested_subcolumns) {
bool add = container_variant.add_sub_column(entry.path, entry.column->assume_mutable(),
entry.type);
if (!add) {
return Status::InternalError("Duplicated {}, type {}", entry.path.get_path(),
entry.type->get_name());
}
}
for (auto& entry : nested_subcolumns) {
MutableColumnPtr nested_object = ColumnObject::create(true, false);
MutableColumnPtr offset =
check_and_get_column<ColumnArray>(
*remove_nullable(container_variant.get_subcolumns()
.get_leaves()[0]
->data.get_finalized_column_ptr()))
check_and_get_column<ColumnArray>(*remove_nullable(entry.second[0].column))
->get_offsets_ptr()
->assume_mutable();
auto* nested_object_ptr = assert_cast<ColumnObject*>(nested_object.get());
// flatten nested arrays
for (const auto& entry : container_variant.get_subcolumns()) {
auto& column = entry->data.get_finalized_column_ptr();
const auto& type = entry->data.get_least_common_type();
for (const auto& subcolumn : entry.second) {
const auto& column = subcolumn.column;
const auto& type = subcolumn.type;
if (!remove_nullable(column)->is_column_array()) {
return Status::InvalidArgument(
"Meet none array column when flatten nested array, path {}, type {}",
entry->path.get_path(), entry->data.get_finalized_column().get_name());
subcolumn.path.get_path(), subcolumn.type->get_name());
}
MutableColumnPtr flattend_column =
check_and_get_column<ColumnArray>(
remove_nullable(entry->data.get_finalized_column_ptr()).get())
check_and_get_column<ColumnArray>(remove_nullable(subcolumn.column).get())
->get_data_ptr()
->assume_mutable();
DataTypePtr flattend_type =
check_and_get_data_type<DataTypeArray>(remove_nullable(type).get())
->get_nested_type();
nested_object_ptr->add_sub_column(entry->path, std::move(flattend_column),
std::move(flattend_type));
// add path without parent prefix
nested_object_ptr->add_sub_column(
subcolumn.path.copy_pop_nfront(entry.first.get_parts().size()),
std::move(flattend_column), std::move(flattend_type));
}
nested_object = make_nullable(nested_object->get_ptr())->assume_mutable();
auto array =
make_nullable(ColumnArray::create(std::move(nested_object), std::move(offset)));
container_variant.clear();
container_variant.create_root(ColumnObject::NESTED_TYPE, array->assume_mutable());
container_variant.add_sub_column(entry.first, array->assume_mutable(),
ColumnObject::NESTED_TYPE);
}
// if (has_nested) {
// // rewrite nested nodes
// container_variant.finalize_if_not();
// MutableColumnPtr nested_object = ColumnObject::create(true, false);
// MutableColumnPtr offset =
// check_and_get_column<ColumnArray>(
// *remove_nullable(container_variant.get_subcolumns()
// .get_leaves()[0]
// ->data.get_finalized_column_ptr()))
// ->get_offsets_ptr()
// ->assume_mutable();
// auto* nested_object_ptr = assert_cast<ColumnObject*>(nested_object.get());
// // flatten nested arrays
// for (const auto& entry : container_variant.get_subcolumns()) {
// auto& column = entry->data.get_finalized_column_ptr();
// const auto& type = entry->data.get_least_common_type();
// if (!remove_nullable(column)->is_column_array()) {
// return Status::InvalidArgument(
// "Meet none array column when flatten nested array, path {}, type {}",
// entry->path.get_path(), entry->data.get_finalized_column().get_name());
// }
// MutableColumnPtr flattend_column =
// check_and_get_column<ColumnArray>(
// remove_nullable(entry->data.get_finalized_column_ptr()).get())
// ->get_data_ptr()
// ->assume_mutable();
// DataTypePtr flattend_type =
// check_and_get_data_type<DataTypeArray>(remove_nullable(type).get())
// ->get_nested_type();
// nested_object_ptr->add_sub_column(entry->path, std::move(flattend_column),
// std::move(flattend_type));
// }
// nested_object = make_nullable(nested_object->get_ptr())->assume_mutable();
// auto array =
// make_nullable(ColumnArray::create(std::move(nested_object), std::move(offset)));
// container_variant.clear();
// container_variant.create_root(ColumnObject::NESTED_TYPE, array->assume_mutable());
// }

// TODO select v:b -> v.b / v.b.c but v.d maybe in v
// copy container variant to dst variant, todo avoid copy
Expand Down
78 changes: 77 additions & 1 deletion be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
#include "util/key_util.h"
#include "util/simd/bits.h"
#include "vec/columns/column.h"
#include "vec/columns/column_array.h"
#include "vec/columns/column_const.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_object.h"
Expand All @@ -81,17 +82,20 @@
#include "vec/common/schema_util.h"
#include "vec/common/string_ref.h"
#include "vec/common/typeid_cast.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/field.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_array.h"
#include "vec/data_types/data_type_factory.hpp"
#include "vec/data_types/data_type_number.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/exprs/vliteral.h"
#include "vec/exprs/vslot_ref.h"
#include "vec/functions/array/function_array_index.h"
#include "vec/functions/function_helpers.h"
#include "vec/json/path_in_data.h"

namespace doris {
Expand Down Expand Up @@ -2316,7 +2320,6 @@ Status SegmentIterator::next_batch(vectorized::Block* block) {
auto status = [&]() {
RETURN_IF_CATCH_EXCEPTION({
RETURN_IF_ERROR(_next_batch_internal(block));
RETURN_IF_ERROR(_fill_missing_columns(block));

// reverse block row order if read_orderby_key_reverse is true for key topn
// it should be processed for all success _next_batch_internal
Expand Down Expand Up @@ -2394,7 +2397,78 @@ Status SegmentIterator::copy_column_data_by_selector(vectorized::IColumn* input_
return input_col_ptr->filter_by_selector(sel_rowid_idx, select_size, output_col);
}

vectorized::SubcolumnsTree<const vectorized::ColumnArray::Offsets64*>
SegmentIterator::_get_nested_array_offsets_columns(const vectorized::Block& block) {
vectorized::SubcolumnsTree<const vectorized::ColumnArray::Offsets64*> offsets;
for (size_t i = 0; i < block.columns(); ++i) {
auto cid = _schema->column_id(i);
const auto* column_desc = _schema->column(cid);
if (column_desc->path() != nullptr && column_desc->path()->has_nested_part() &&
column_desc->type() == FieldType::OLAP_FIELD_TYPE_ARRAY) {
offsets.add(*column_desc->path(),
&vectorized::check_and_get_column<vectorized::ColumnArray>(
remove_nullable(block.get_by_position(i).column).get())
->get_offsets());
LOG(INFO) << "fuck";
}
}
return offsets;
}

Status SegmentIterator::_fill_missing_columns(vectorized::Block* block) {
vectorized::SubcolumnsTree<const vectorized::ColumnArray::Offsets64*> offsets =
_get_nested_array_offsets_columns(*block);
for (size_t i = 0; i < block->columns(); ++i) {
auto cid = _schema->column_id(i);
const auto* column_desc = _schema->column(cid);
int64_t current_size = block->get_by_position(i).column->size();
if (column_desc->path() != nullptr && column_desc->path()->has_nested_part() &&
current_size < block->rows()) {
const auto* leaf = offsets.get_leaf_of_the_same_nested(
*column_desc->path(), [](const auto& node) { return node.data->size(); },
current_size);
if (!leaf) {
VLOG_DEBUG << "Not found any subcolumns column_desc: "
<< column_desc->path()->get_path() << ", current_size: " << current_size
<< ", block_rows: " << block->rows();
block->get_by_position(i).column->assume_mutable()->insert_many_defaults(
block->rows() - current_size);
continue;
}
LOG(INFO) << "fuck";
const vectorized::ColumnArray::Offsets64* offset = leaf->data;
int64_t nested_padding_size = offset->back() - (*offset)[current_size - 1];
auto nested_column =
vectorized::check_and_get_data_type<vectorized::DataTypeArray>(
remove_nullable(block->get_by_position(i).type).get())
->get_nested_type()
->create_column_const_with_default_value(nested_padding_size);
auto nested_new_offset = vectorized::ColumnArray::ColumnOffsets::create();
nested_new_offset->reserve(block->rows() - current_size);
for (size_t i = current_size; i < block->rows(); ++i) {
nested_new_offset->get_data().push_back_without_reserve(
(*offset)[i] - (*offset)[current_size - 1]);
}
vectorized::ColumnPtr nested_padding_column =
vectorized::ColumnArray::create(nested_column, std::move(nested_new_offset));
if (block->get_by_position(i).column->is_nullable()) {
nested_padding_column = vectorized::make_nullable(nested_padding_column);
}
block->get_by_position(i).column->assume_mutable()->insert_range_from(
*nested_padding_column, 0, nested_padding_column->size());
}
}

#ifndef NDEBUG
// check offsets aligned
for (size_t i = 0; i < block->columns(); ++i) {
auto cid = _schema->column_id(i);
const auto* column_desc = _schema->column(cid);
if (column_desc->path() != nullptr && column_desc->path()->has_nested_part() &&
column_desc->type() == FieldType::OLAP_FIELD_TYPE_ARRAY) {
}
}
#endif
return Status::OK();
}

Expand Down Expand Up @@ -2662,6 +2736,8 @@ Status SegmentIterator::_next_batch_internal(vectorized::Block* block) {
// shrink char_type suffix zero data
block->shrink_char_type_column_suffix_zero(_char_type_idx);

RETURN_IF_ERROR(_fill_missing_columns(block));

#ifndef NDEBUG
size_t rows = block->rows();
for (const auto& entry : *block) {
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/rowset/segment_v2/segment_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "util/runtime_profile.h"
#include "util/slice.h"
#include "vec/columns/column.h"
#include "vec/columns/column_array.h"
#include "vec/common/schema_util.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
Expand Down Expand Up @@ -226,6 +227,8 @@ class SegmentIterator : public RowwiseIterator {
void _vec_init_char_column_id(vectorized::Block* block);
bool _has_char_type(const Field& column_desc);

vectorized::SubcolumnsTree<const vectorized::ColumnArray::Offsets64*>
_get_nested_array_offsets_columns(const vectorized::Block& block);
Status _fill_missing_columns(vectorized::Block* block);

uint32_t segment_id() const { return _segment->id(); }
Expand Down
36 changes: 3 additions & 33 deletions be/src/vec/columns/column_object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1866,42 +1866,12 @@ ColumnObject::Subcolumn ColumnObject::Subcolumn::cut(size_t start, size_t length

const ColumnObject::Subcolumns::Node* ColumnObject::get_leaf_of_the_same_nested(
const Subcolumns::NodePtr& entry) const {
if (!entry->path.has_nested_part()) {
return nullptr;
}

size_t old_size = entry->data.size();
const auto* current_node = subcolumns.find_leaf(entry->path);
const Subcolumns::Node* leaf = nullptr;

while (current_node) {
/// Try to find the first Nested up to the current node.
const auto* node_nested = Subcolumns::find_parent(
current_node, [](const auto& candidate) -> bool { return candidate.is_nested(); });

if (!node_nested) {
break;
}

/// Find the leaf with subcolumn that contains values
/// for the last rows.
/// If there are no leaves, skip current node and find
/// the next node up to the current.
leaf = Subcolumns::find_leaf(node_nested, [&](const auto& candidate) {
return candidate.data.size() > old_size;
});

if (leaf) {
break;
}

current_node = node_nested->parent;
}

const auto* leaf = subcolumns.get_leaf_of_the_same_nested(
entry->path, [](const Subcolumns::Node& node) { return node.data.size(); },
entry->data.size());
if (leaf && is_nothing(leaf->data.get_least_common_typeBase())) {
return nullptr;
}

return leaf;
}

Expand Down
Loading

0 comments on commit e298997

Please sign in to comment.