Skip to content

Commit

Permalink
xx
Browse files Browse the repository at this point in the history
  • Loading branch information
eldenmoon committed Aug 7, 2024
1 parent e298997 commit 5820cda
Show file tree
Hide file tree
Showing 30 changed files with 787 additions and 254 deletions.
21 changes: 0 additions & 21 deletions be/src/olap/merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,34 +219,13 @@ void Merger::vertical_split_columns(const TabletSchema& tablet_schema,
auto&& cluster_key_idxes = tablet_schema.cluster_key_idxes();

std::vector<uint32_t> value_columns;
std::string nested_prefix;

for (uint32_t i = num_key_cols; i < total_cols; ++i) {
if (i == sequence_col_idx || i == delete_sign_idx ||
cluster_key_idxes.end() !=
std::find(cluster_key_idxes.begin(), cluster_key_idxes.end(), i)) {
continue;
}
// merge all subcolumns of single nested columns into one group, subcolumns need to do offsets alignment
if (tablet_schema.column(i).is_nested_subcolumn()) {
if (nested_prefix != tablet_schema.column(i).path_info_ptr()->get_nested_prefix()) {
nested_prefix = tablet_schema.column(i).path_info_ptr()->get_nested_prefix();
if (!value_columns.empty()) {
column_groups->push_back(value_columns);
value_columns.clear();
}
}
value_columns.push_back(i);
continue;
}

if (!nested_prefix.empty()) {
column_groups->push_back(value_columns);
value_columns.clear();
value_columns.push_back(i);
nested_prefix.clear();
continue;
}

if (!value_columns.empty() &&
value_columns.size() % config::vertical_compaction_num_columns_per_group == 0) {
Expand Down
49 changes: 49 additions & 0 deletions be/src/olap/rowset/segment_v2/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1663,4 +1663,53 @@ Status VariantRootColumnIterator::read_by_rowids(const rowid_t* rowids, const si
return Status::OK();
}

Status DefaultNestedColumnIterator::next_batch(size_t* n, vectorized::MutableColumnPtr& dst) {
bool has_null = false;
return next_batch(n, dst, &has_null);
}

static void fill_nested_with_defaults(vectorized::MutableColumnPtr& dst,
vectorized::MutableColumnPtr& sibling_column, size_t nrows) {
const auto* sibling_array = vectorized::check_and_get_column<vectorized::ColumnArray>(
remove_nullable(sibling_column->get_ptr()));
CHECK(sibling_array) << "Expected array column, but mmet " << sibling_column->get_name();
;
const auto* dst_array = vectorized::check_and_get_column<vectorized::ColumnArray>(
remove_nullable(dst->get_ptr()));
if (!dst_array || !sibling_array) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"Expected array column, but met %s and %s", dst->get_name(),
sibling_column->get_name());
}
auto new_nested =
dst_array->get_data_ptr()->clone_resized(sibling_array->get_data_ptr()->size());
auto new_array = make_nullable(vectorized::ColumnArray::create(
new_nested->assume_mutable(), sibling_array->get_offsets_ptr()->assume_mutable()));
dst->insert_range_from(*new_array, 0, new_array->size());
}

Status DefaultNestedColumnIterator::next_batch(size_t* n, vectorized::MutableColumnPtr& dst,
bool* has_null) {
if (_sibling_iter) {
vectorized::MutableColumnPtr sibling_column = _file_column_type->create_column();
RETURN_IF_ERROR(_sibling_iter->next_batch(n, sibling_column, has_null));
fill_nested_with_defaults(dst, sibling_column, *n);
} else {
dst->insert_many_defaults(*n);
}
return Status::OK();
}

Status DefaultNestedColumnIterator::read_by_rowids(const rowid_t* rowids, const size_t count,
vectorized::MutableColumnPtr& dst) {
if (_sibling_iter) {
vectorized::MutableColumnPtr sibling_column = _file_column_type->create_column();
RETURN_IF_ERROR(_sibling_iter->read_by_rowids(rowids, count, sibling_column));
fill_nested_with_defaults(dst, sibling_column, count);
} else {
dst->insert_many_defaults(count);
}
return Status::OK();
}

} // namespace doris::segment_v2
54 changes: 38 additions & 16 deletions be/src/olap/rowset/segment_v2/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include "vec/columns/column.h"
#include "vec/columns/column_array.h" // ColumnArray
#include "vec/columns/subcolumn_tree.h"
#include "vec/common/hash_table/hash_map_context_creator.h"
#include "vec/data_types/data_type.h"
#include "vec/json/path_in_data.h"

Expand Down Expand Up @@ -718,37 +719,58 @@ class DefaultValueColumnIterator : public ColumnIterator {
};

// This iterator is used to read default value column
class EmptyColumnIterator : public ColumnIterator {
class DefaultNestedColumnIterator : public ColumnIterator {
public:
EmptyColumnIterator() = default;
DefaultNestedColumnIterator(std::unique_ptr<ColumnIterator>&& sibling,
DataTypePtr file_column_type)
: _sibling_iter(std::move(sibling)), _file_column_type(std::move(file_column_type)) {}

Status init(const ColumnIteratorOptions& opts) override { return Status::OK(); }
Status init(const ColumnIteratorOptions& opts) override {
if (_sibling_iter) {
return _sibling_iter->init(opts);
}
return Status::OK();
}

Status seek_to_first() override { return Status::NotSupported("Not supported seek_to_first"); }
Status seek_to_first() override {
_current_rowid = 0;
if (_sibling_iter) {
return _sibling_iter->seek_to_first();
}
return Status::OK();
}

Status seek_to_ordinal(ordinal_t ord_idx) override {
return Status::NotSupported("Not supported seek_to_ordinal");
_current_rowid = ord_idx;
if (_sibling_iter) {
return _sibling_iter->seek_to_ordinal(ord_idx);
}
return Status::OK();
}

Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst) { return Status::OK(); }
Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst);

Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool* has_null) override {
return Status::OK();
}
Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool* has_null) override;

Status read_by_rowids(const rowid_t* rowids, const size_t count,
vectorized::MutableColumnPtr& dst) override;

Status next_batch_of_zone_map(size_t* n, vectorized::MutableColumnPtr& dst) override {
return Status::NotSupported("Not supported next_batch_of_zone_map");
}

Status read_by_rowids(const rowid_t* rowids, const size_t count,
vectorized::MutableColumnPtr& dst) override {
return Status::OK();
}

ordinal_t get_current_ordinal() const override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"Not supported get_current_ordinal");
if (_sibling_iter) {
return _sibling_iter->get_current_ordinal();
}
return _current_rowid;
}

private:
std::unique_ptr<ColumnIterator> _sibling_iter;
std::shared_ptr<const vectorized::IDataType> _file_column_type;
// current rowid
ordinal_t _current_rowid = 0;
};

} // namespace segment_v2
Expand Down
40 changes: 1 addition & 39 deletions be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class HierarchicalDataReader : public ColumnIterator {
auto type = root_var.get_root_type();
container_variant.add_sub_column({}, std::move(column), type);
}
// parent -> subcolumns
// parent path -> subcolumns
std::map<PathInData, PathsWithColumnAndType> nested_subcolumns;
PathsWithColumnAndType non_nested_subcolumns;
RETURN_IF_ERROR(tranverse([&](SubstreamReaderTree::Node& node) {
Expand Down Expand Up @@ -182,44 +182,6 @@ class HierarchicalDataReader : public ColumnIterator {
container_variant.add_sub_column(entry.first, array->assume_mutable(),
ColumnObject::NESTED_TYPE);
}
// if (has_nested) {
// // rewrite nested nodes
// container_variant.finalize_if_not();
// MutableColumnPtr nested_object = ColumnObject::create(true, false);
// MutableColumnPtr offset =
// check_and_get_column<ColumnArray>(
// *remove_nullable(container_variant.get_subcolumns()
// .get_leaves()[0]
// ->data.get_finalized_column_ptr()))
// ->get_offsets_ptr()
// ->assume_mutable();
// auto* nested_object_ptr = assert_cast<ColumnObject*>(nested_object.get());
// // flatten nested arrays
// for (const auto& entry : container_variant.get_subcolumns()) {
// auto& column = entry->data.get_finalized_column_ptr();
// const auto& type = entry->data.get_least_common_type();
// if (!remove_nullable(column)->is_column_array()) {
// return Status::InvalidArgument(
// "Meet none array column when flatten nested array, path {}, type {}",
// entry->path.get_path(), entry->data.get_finalized_column().get_name());
// }
// MutableColumnPtr flattend_column =
// check_and_get_column<ColumnArray>(
// remove_nullable(entry->data.get_finalized_column_ptr()).get())
// ->get_data_ptr()
// ->assume_mutable();
// DataTypePtr flattend_type =
// check_and_get_data_type<DataTypeArray>(remove_nullable(type).get())
// ->get_nested_type();
// nested_object_ptr->add_sub_column(entry->path, std::move(flattend_column),
// std::move(flattend_type));
// }
// nested_object = make_nullable(nested_object->get_ptr())->assume_mutable();
// auto array =
// make_nullable(ColumnArray::create(std::move(nested_object), std::move(offset)));
// container_variant.clear();
// container_variant.create_root(ColumnObject::NESTED_TYPE, array->assume_mutable());
// }

// TODO select v:b -> v.b / v.b.c but v.d maybe in v
// copy container variant to dst variant, todo avoid copy
Expand Down
32 changes: 27 additions & 5 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -441,16 +441,20 @@ Status Segment::_load_index_impl() {
// Return the storage datatype of related column to field.
// Return nullptr meaning no such storage infomation for this column
vectorized::DataTypePtr Segment::get_data_type_of(vectorized::PathInDataPtr path, bool is_nullable,
bool ignore_children) const {
bool read_flat_leaves) const {
// Path has higher priority
if (path != nullptr && !path->empty()) {
auto node = _sub_column_tree.find_leaf(*path);
auto sparse_node = _sparse_column_tree.find_exact(*path);
if (node) {
if (ignore_children || (node->children.empty() && sparse_node == nullptr)) {
if (read_flat_leaves || (node->children.empty() && sparse_node == nullptr)) {
return node->data.file_column_type;
}
}
// missing in storage, treat it using input data type
if (read_flat_leaves && !node && !sparse_node) {
return nullptr;
}
// it contains children or column missing in storage, so treat it as variant
return is_nullable
? vectorized::make_nullable(std::make_shared<vectorized::DataTypeObject>())
Expand Down Expand Up @@ -609,7 +613,25 @@ Status Segment::new_column_iterator_with_path(const TabletColumn& tablet_column,
auto new_default_iter = [&]() {
if (tablet_column.is_nested_subcolumn() &&
type_to_read_flat_leaves(opt->io_ctx.reader_type)) {
*iter = std::make_unique<EmptyColumnIterator>();
// We find node that represents the same Nested type as path.
const auto* parent = _sub_column_tree.find_best_match(*tablet_column.path_info_ptr());
VLOG_DEBUG << "find with path " << tablet_column.path_info_ptr()->get_path()
<< " parent " << (parent ? parent->path.get_path() : "nullptr") << ", type "
<< TabletColumn::get_string_by_field_type(tablet_column.type());
if (parent && parent->is_nested()) {
/// Find any leaf of Nested subcolumn.
const auto* leaf = SubcolumnColumnReaders::find_leaf(
parent, [](const auto& node) { return node.path.has_nested_part(); });
assert(leaf);
std::unique_ptr<ColumnIterator> sibling_iter;
ColumnIterator* sibling_iter_ptr;
RETURN_IF_ERROR(leaf->data.reader->new_iterator(&sibling_iter_ptr));
sibling_iter.reset(sibling_iter_ptr);
*iter = std::make_unique<DefaultNestedColumnIterator>(std::move(sibling_iter),
leaf->data.file_column_type);
} else {
*iter = std::make_unique<DefaultNestedColumnIterator>(nullptr, nullptr);
}
return Status::OK();
}
return new_default_iterator(tablet_column, iter);
Expand Down Expand Up @@ -874,9 +896,9 @@ Status Segment::read_key_by_rowid(uint32_t row_id, std::string* key) {
}

bool Segment::same_with_storage_type(int32_t cid, const Schema& schema,
bool ignore_children) const {
bool read_flat_leaves) const {
auto file_column_type = get_data_type_of(schema.column(cid)->path(),
schema.column(cid)->is_nullable(), ignore_children);
schema.column(cid)->is_nullable(), read_flat_leaves);
auto expected_type = Schema::get_data_type_ptr(*schema.column(cid));
#ifndef NDEBUG
if (file_column_type && !file_column_type->equals(*expected_type)) {
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/segment_v2/segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,10 @@ class Segment : public std::enable_shared_from_this<Segment> {
// nullptr will returned if storage type does not contains such column
std::shared_ptr<const vectorized::IDataType> get_data_type_of(vectorized::PathInDataPtr path,
bool is_nullable,
bool ignore_children) const;
bool read_flat_leaves) const;

// Check is schema read type equals storage column type
bool same_with_storage_type(int32_t cid, const Schema& schema, bool ignore_children) const;
bool same_with_storage_type(int32_t cid, const Schema& schema, bool read_flat_leaves) const;

// If column in segment is the same type in schema, then it is safe to apply predicate
template <typename Predicate>
Expand Down
78 changes: 1 addition & 77 deletions be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "olap/rowset/segment_v2/segment_iterator.h"

#include <assert.h>
#include <fmt/core.h>
#include <gen_cpp/Exprs_types.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/olap_file.pb.h>
Expand Down Expand Up @@ -2397,81 +2398,6 @@ Status SegmentIterator::copy_column_data_by_selector(vectorized::IColumn* input_
return input_col_ptr->filter_by_selector(sel_rowid_idx, select_size, output_col);
}

vectorized::SubcolumnsTree<const vectorized::ColumnArray::Offsets64*>
SegmentIterator::_get_nested_array_offsets_columns(const vectorized::Block& block) {
vectorized::SubcolumnsTree<const vectorized::ColumnArray::Offsets64*> offsets;
for (size_t i = 0; i < block.columns(); ++i) {
auto cid = _schema->column_id(i);
const auto* column_desc = _schema->column(cid);
if (column_desc->path() != nullptr && column_desc->path()->has_nested_part() &&
column_desc->type() == FieldType::OLAP_FIELD_TYPE_ARRAY) {
offsets.add(*column_desc->path(),
&vectorized::check_and_get_column<vectorized::ColumnArray>(
remove_nullable(block.get_by_position(i).column).get())
->get_offsets());
LOG(INFO) << "fuck";
}
}
return offsets;
}

Status SegmentIterator::_fill_missing_columns(vectorized::Block* block) {
vectorized::SubcolumnsTree<const vectorized::ColumnArray::Offsets64*> offsets =
_get_nested_array_offsets_columns(*block);
for (size_t i = 0; i < block->columns(); ++i) {
auto cid = _schema->column_id(i);
const auto* column_desc = _schema->column(cid);
int64_t current_size = block->get_by_position(i).column->size();
if (column_desc->path() != nullptr && column_desc->path()->has_nested_part() &&
current_size < block->rows()) {
const auto* leaf = offsets.get_leaf_of_the_same_nested(
*column_desc->path(), [](const auto& node) { return node.data->size(); },
current_size);
if (!leaf) {
VLOG_DEBUG << "Not found any subcolumns column_desc: "
<< column_desc->path()->get_path() << ", current_size: " << current_size
<< ", block_rows: " << block->rows();
block->get_by_position(i).column->assume_mutable()->insert_many_defaults(
block->rows() - current_size);
continue;
}
LOG(INFO) << "fuck";
const vectorized::ColumnArray::Offsets64* offset = leaf->data;
int64_t nested_padding_size = offset->back() - (*offset)[current_size - 1];
auto nested_column =
vectorized::check_and_get_data_type<vectorized::DataTypeArray>(
remove_nullable(block->get_by_position(i).type).get())
->get_nested_type()
->create_column_const_with_default_value(nested_padding_size);
auto nested_new_offset = vectorized::ColumnArray::ColumnOffsets::create();
nested_new_offset->reserve(block->rows() - current_size);
for (size_t i = current_size; i < block->rows(); ++i) {
nested_new_offset->get_data().push_back_without_reserve(
(*offset)[i] - (*offset)[current_size - 1]);
}
vectorized::ColumnPtr nested_padding_column =
vectorized::ColumnArray::create(nested_column, std::move(nested_new_offset));
if (block->get_by_position(i).column->is_nullable()) {
nested_padding_column = vectorized::make_nullable(nested_padding_column);
}
block->get_by_position(i).column->assume_mutable()->insert_range_from(
*nested_padding_column, 0, nested_padding_column->size());
}
}

#ifndef NDEBUG
// check offsets aligned
for (size_t i = 0; i < block->columns(); ++i) {
auto cid = _schema->column_id(i);
const auto* column_desc = _schema->column(cid);
if (column_desc->path() != nullptr && column_desc->path()->has_nested_part() &&
column_desc->type() == FieldType::OLAP_FIELD_TYPE_ARRAY) {
}
}
#endif
return Status::OK();
}

Status SegmentIterator::_next_batch_internal(vectorized::Block* block) {
bool is_mem_reuse = block->mem_reuse();
DCHECK(is_mem_reuse);
Expand Down Expand Up @@ -2736,8 +2662,6 @@ Status SegmentIterator::_next_batch_internal(vectorized::Block* block) {
// shrink char_type suffix zero data
block->shrink_char_type_column_suffix_zero(_char_type_idx);

RETURN_IF_ERROR(_fill_missing_columns(block));

#ifndef NDEBUG
size_t rows = block->rows();
for (const auto& entry : *block) {
Expand Down
Loading

0 comments on commit 5820cda

Please sign in to comment.