Skip to content

Commit

Permalink
fix comment and fix column nothing clear
Browse files Browse the repository at this point in the history
  • Loading branch information
eldenmoon committed Aug 19, 2024
1 parent 7ab923b commit d389f37
Show file tree
Hide file tree
Showing 11 changed files with 102 additions and 97 deletions.
2 changes: 1 addition & 1 deletion be/src/olap/iterators.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ struct IteratorRowRef;
};

namespace segment_v2 {
struct StreamReader;
struct SubstreamIterator;
}

class StorageReadOptions {
Expand Down
2 changes: 0 additions & 2 deletions be/src/olap/rowset/segment_v2/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1672,8 +1672,6 @@ static void fill_nested_with_defaults(vectorized::MutableColumnPtr& dst,
vectorized::MutableColumnPtr& sibling_column, size_t nrows) {
const auto* sibling_array = vectorized::check_and_get_column<vectorized::ColumnArray>(
remove_nullable(sibling_column->get_ptr()));
CHECK(sibling_array) << "Expected array column, but mmet " << sibling_column->get_name();
;
const auto* dst_array = vectorized::check_and_get_column<vectorized::ColumnArray>(
remove_nullable(dst->get_ptr()));
if (!dst_array || !sibling_array) {
Expand Down
10 changes: 5 additions & 5 deletions be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ Status HierarchicalDataReader::create(std::unique_ptr<ColumnIterator>* reader,
if (read_type == ReadType::MERGE_SPARSE) {
ColumnIterator* it;
RETURN_IF_ERROR(root->data.reader->new_iterator(&it));
stream_iter->set_root(std::make_unique<StreamReader>(
stream_iter->set_root(std::make_unique<SubstreamIterator>(
root->data.file_column_type->create_column(), std::unique_ptr<ColumnIterator>(it),
root->data.file_column_type));
}
Expand Down Expand Up @@ -97,7 +97,7 @@ Status HierarchicalDataReader::seek_to_ordinal(ordinal_t ord) {
Status HierarchicalDataReader::next_batch(size_t* n, vectorized::MutableColumnPtr& dst,
bool* has_null) {
return process_read(
[&](StreamReader& reader, const vectorized::PathInData& path,
[&](SubstreamIterator& reader, const vectorized::PathInData& path,
const vectorized::DataTypePtr& type) {
CHECK(reader.inited);
RETURN_IF_ERROR(reader.iterator->next_batch(n, reader.column, has_null));
Expand All @@ -112,7 +112,7 @@ Status HierarchicalDataReader::next_batch(size_t* n, vectorized::MutableColumnPt
Status HierarchicalDataReader::read_by_rowids(const rowid_t* rowids, const size_t count,
vectorized::MutableColumnPtr& dst) {
return process_read(
[&](StreamReader& reader, const vectorized::PathInData& path,
[&](SubstreamIterator& reader, const vectorized::PathInData& path,
const vectorized::DataTypePtr& type) {
CHECK(reader.inited);
RETURN_IF_ERROR(reader.iterator->read_by_rowids(rowids, count, reader.column));
Expand All @@ -134,8 +134,8 @@ Status HierarchicalDataReader::add_stream(const SubcolumnColumnReaders::Node* no
RETURN_IF_ERROR(node->data.reader->new_iterator(&it));
std::unique_ptr<ColumnIterator> it_ptr;
it_ptr.reset(it);
StreamReader reader(node->data.file_column_type->create_column(), std::move(it_ptr),
node->data.file_column_type);
SubstreamIterator reader(node->data.file_column_type->create_column(), std::move(it_ptr),
node->data.file_column_type);
bool added = _substream_reader.add(node->path, std::move(reader));
if (!added) {
return Status::InternalError("Failed to add node path {}", node->path.get_path());
Expand Down
25 changes: 15 additions & 10 deletions be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ class HierarchicalDataReader : public ColumnIterator {

Status add_stream(const SubcolumnColumnReaders::Node* node);

void set_root(std::unique_ptr<StreamReader>&& root) { _root_reader = std::move(root); }
void set_root(std::unique_ptr<SubstreamIterator>&& root) { _root_reader = std::move(root); }

private:
SubstreamReaderTree _substream_reader;
std::unique_ptr<StreamReader> _root_reader;
std::unique_ptr<SubstreamIterator> _root_reader;
size_t _rows_read = 0;
vectorized::PathInData _path;

Expand Down Expand Up @@ -132,17 +132,18 @@ class HierarchicalDataReader : public ColumnIterator {
PathsWithColumnAndType non_nested_subcolumns;
RETURN_IF_ERROR(tranverse([&](SubstreamReaderTree::Node& node) {
MutableColumnPtr column = node.data.column->get_ptr();
PathInData real_path = node.path.copy_pop_nfront(_path.get_parts().size());
PathInData relative_path = node.path.copy_pop_nfront(_path.get_parts().size());

if (node.path.has_nested_part()) {
CHECK_EQ(getTypeName(remove_nullable(node.data.type)->get_type_id()),
getTypeName(TypeIndex::Array));
PathInData parent_path = node.path.get_nested_prefix_path().copy_pop_nfront(
_path.get_parts().size());
nested_subcolumns[parent_path].emplace_back(real_path, column->get_ptr(),
nested_subcolumns[parent_path].emplace_back(relative_path, column->get_ptr(),
node.data.type);
} else {
non_nested_subcolumns.emplace_back(real_path, column->get_ptr(), node.data.type);
non_nested_subcolumns.emplace_back(relative_path, column->get_ptr(),
node.data.type);
}
return Status::OK();
}));
Expand All @@ -156,6 +157,10 @@ class HierarchicalDataReader : public ColumnIterator {
entry.type->get_name());
}
}
// Iterate nested subcolumns and flatten them, the entry contains the nested subcolumns of the same nested parent
// first we pick the first subcolumn as base array and using it's offset info. Then we flatten all nested subcolumns
// into a new object column and wrap it with array column using the first element offsets.The wrapped array column
// will type the type of ColumnObject::NESTED_TYPE, whih is Nullable<ColumnArray<NULLABLE(ColumnObject)>>.
for (auto& entry : nested_subcolumns) {
MutableColumnPtr nested_object = ColumnObject::create(true, false);
const auto* base_array =
Expand All @@ -173,15 +178,15 @@ class HierarchicalDataReader : public ColumnIterator {
}
const auto* target_array =
check_and_get_column<ColumnArray>(remove_nullable(subcolumn.column).get());
#ifndef NDEBUG
if (!base_array->has_equal_offsets(*target_array)) {
return Status::InvalidArgument(
"Meet none equal offsets array when flatten nested array, path {}, "
"type {}",
subcolumn.path.get_path(), subcolumn.type->get_name());
}
MutableColumnPtr flattend_column = check_and_get_column<ColumnArray>(target_array)
->get_data_ptr()
->assume_mutable();
#endif
MutableColumnPtr flattend_column = target_array->get_data_ptr()->assume_mutable();
DataTypePtr flattend_type =
check_and_get_data_type<DataTypeArray>(remove_nullable(type).get())
->get_nested_type();
Expand Down Expand Up @@ -256,7 +261,7 @@ class HierarchicalDataReader : public ColumnIterator {
// encodes sparse columns that are not materialized
class ExtractReader : public ColumnIterator {
public:
ExtractReader(const TabletColumn& col, std::unique_ptr<StreamReader>&& root_reader,
ExtractReader(const TabletColumn& col, std::unique_ptr<SubstreamIterator>&& root_reader,
vectorized::DataTypePtr target_type_hint)
: _col(col),
_root_reader(std::move(root_reader)),
Expand All @@ -280,7 +285,7 @@ class ExtractReader : public ColumnIterator {

TabletColumn _col;
// may shared among different column iterators
std::unique_ptr<StreamReader> _root_reader;
std::unique_ptr<SubstreamIterator> _root_reader;
vectorized::DataTypePtr _target_type_hint;
};

Expand Down
83 changes: 43 additions & 40 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -568,9 +568,9 @@ Status Segment::_new_iterator_with_variant_root(const TabletColumn& tablet_colum
RETURN_IF_ERROR(root->data.reader->new_iterator(&it));
auto stream_iter = new ExtractReader(
tablet_column,
std::make_unique<StreamReader>(root->data.file_column_type->create_column(),
std::unique_ptr<ColumnIterator>(it),
root->data.file_column_type),
std::make_unique<SubstreamIterator>(root->data.file_column_type->create_column(),
std::unique_ptr<ColumnIterator>(it),
root->data.file_column_type),
target_type_hint);
iter->reset(stream_iter);
return Status::OK();
Expand Down Expand Up @@ -604,42 +604,40 @@ Status Segment::new_column_iterator_with_path(const TabletColumn& tablet_column,
type == ReaderType::READER_FULL_COMPACTION || type == ReaderType::READER_CHECKSUM;
};

auto new_default_iter = [&]() {
if (tablet_column.is_nested_subcolumn() &&
type_to_read_flat_leaves(opt->io_ctx.reader_type)) {
// We find node that represents the same Nested type as path.
const auto* parent = _sub_column_tree.find_best_match(*tablet_column.path_info_ptr());
VLOG_DEBUG << "find with path " << tablet_column.path_info_ptr()->get_path()
<< " parent " << (parent ? parent->path.get_path() : "nullptr") << ", type "
<< ", parent is nested " << (parent ? parent->is_nested() : false) << ", "
<< TabletColumn::get_string_by_field_type(tablet_column.type());
// find it's common parent with nested part
// why not use parent->path->has_nested_part? because parent may not be a leaf node
// none leaf node may not contain path info
// Example:
// {"payload" : {"commits" : [{"issue" : {"id" : 123, "email" : "a@b"}}]}}
// nested node path : payload.commits(NESTED)
// tablet_column path_info : payload.commits.issue.id(SCALAR
// parent path node : payload.commits.issue(TUPLE)
// leaf path_info : payload.commits.issue.email(SCALAR)
if (parent && SubcolumnColumnReaders::find_parent(
parent, [](const auto& node) { return node.is_nested(); })) {
/// Find any leaf of Nested subcolumn.
const auto* leaf = SubcolumnColumnReaders::find_leaf(
parent, [](const auto& node) { return node.path.has_nested_part(); });
assert(leaf);
std::unique_ptr<ColumnIterator> sibling_iter;
ColumnIterator* sibling_iter_ptr;
RETURN_IF_ERROR(leaf->data.reader->new_iterator(&sibling_iter_ptr));
sibling_iter.reset(sibling_iter_ptr);
*iter = std::make_unique<DefaultNestedColumnIterator>(std::move(sibling_iter),
leaf->data.file_column_type);
} else {
*iter = std::make_unique<DefaultNestedColumnIterator>(nullptr, nullptr);
}
return Status::OK();
// find the sibling of the nested column to fill the target nested column
auto new_default_iter_with_same_nested = [&](const TabletColumn& tablet_column,
std::unique_ptr<ColumnIterator>* iter) {
// We find node that represents the same Nested type as path.
const auto* parent = _sub_column_tree.find_best_match(*tablet_column.path_info_ptr());
VLOG_DEBUG << "find with path " << tablet_column.path_info_ptr()->get_path() << " parent "
<< (parent ? parent->path.get_path() : "nullptr") << ", type "
<< ", parent is nested " << (parent ? parent->is_nested() : false) << ", "
<< TabletColumn::get_string_by_field_type(tablet_column.type());
// find it's common parent with nested part
// why not use parent->path->has_nested_part? because parent may not be a leaf node
// none leaf node may not contain path info
// Example:
// {"payload" : {"commits" : [{"issue" : {"id" : 123, "email" : "a@b"}}]}}
// nested node path : payload.commits(NESTED)
// tablet_column path_info : payload.commits.issue.id(SCALAR)
// parent path node : payload.commits.issue(TUPLE)
// leaf path_info : payload.commits.issue.email(SCALAR)
if (parent && SubcolumnColumnReaders::find_parent(
parent, [](const auto& node) { return node.is_nested(); })) {
/// Find any leaf of Nested subcolumn.
const auto* leaf = SubcolumnColumnReaders::find_leaf(
parent, [](const auto& node) { return node.path.has_nested_part(); });
assert(leaf);
std::unique_ptr<ColumnIterator> sibling_iter;
ColumnIterator* sibling_iter_ptr;
RETURN_IF_ERROR(leaf->data.reader->new_iterator(&sibling_iter_ptr));
sibling_iter.reset(sibling_iter_ptr);
*iter = std::make_unique<DefaultNestedColumnIterator>(std::move(sibling_iter),
leaf->data.file_column_type);
} else {
*iter = std::make_unique<DefaultNestedColumnIterator>(nullptr, nullptr);
}
return new_default_iterator(tablet_column, iter);
return Status::OK();
};

if (opt != nullptr && type_to_read_flat_leaves(opt->io_ctx.reader_type)) {
Expand All @@ -653,7 +651,12 @@ Status Segment::new_column_iterator_with_path(const TabletColumn& tablet_column,
RETURN_IF_ERROR(_new_iterator_with_variant_root(
tablet_column, iter, root, sparse_node->data.file_column_type));
} else {
RETURN_IF_ERROR(new_default_iter());
if (tablet_column.is_nested_subcolumn()) {
// using the sibling of the nested column to fill the target nested column
RETURN_IF_ERROR(new_default_iter_with_same_nested(tablet_column, iter));
} else {
RETURN_IF_ERROR(new_default_iterator(tablet_column, iter));
}
}
return Status::OK();
}
Expand Down Expand Up @@ -690,7 +693,7 @@ Status Segment::new_column_iterator_with_path(const TabletColumn& tablet_column,
sparse_node->data.file_column_type));
} else {
// No such variant column in this segment, get a default one
RETURN_IF_ERROR(new_default_iter());
RETURN_IF_ERROR(new_default_iterator(tablet_column, iter));
}
}

Expand Down
16 changes: 11 additions & 5 deletions be/src/olap/rowset/segment_v2/stream_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,27 @@

namespace doris::segment_v2 {

struct StreamReader {
// This file Defined ColumnIterator and ColumnReader for reading variant subcolumns. The types from read schema and from storage are
// different, so we need to wrap the ColumnIterator from execution phase and storage column reading phase.And we also
// maintain the tree structure to get the full JSON structure for variants.

// Wrapped ColumnIterator from execution phase, the type is from read schema
struct SubstreamIterator {
vectorized::MutableColumnPtr column;
std::unique_ptr<ColumnIterator> iterator;
std::shared_ptr<const vectorized::IDataType> type;
bool inited = false;
size_t rows_read = 0;
StreamReader() = default;
StreamReader(vectorized::MutableColumnPtr&& col, std::unique_ptr<ColumnIterator>&& it,
std::shared_ptr<const vectorized::IDataType> t)
SubstreamIterator() = default;
SubstreamIterator(vectorized::MutableColumnPtr&& col, std::unique_ptr<ColumnIterator>&& it,
std::shared_ptr<const vectorized::IDataType> t)
: column(std::move(col)), iterator(std::move(it)), type(t) {}
};

// path -> StreamReader
using SubstreamReaderTree = vectorized::SubcolumnsTree<StreamReader>;
using SubstreamReaderTree = vectorized::SubcolumnsTree<SubstreamIterator>;

// Reader for the storage layer, the file_column_type indicates the read type of the column in segment file
struct SubcolumnReader {
std::unique_ptr<ColumnReader> reader;
std::shared_ptr<const vectorized::IDataType> file_column_type;
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/columns/column_dummy.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class IColumnDummy : public IColumn {

void insert_data(const char*, size_t) override { ++s; }

void clear() override {};
void clear() override { s = 0; }

StringRef serialize_value_into_arena(size_t /*n*/, Arena& arena,
char const*& begin) const override {
Expand Down
14 changes: 8 additions & 6 deletions be/src/vec/columns/column_object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ Array create_empty_array_field(size_t num_dimensions) {
static ColumnPtr recreate_column_with_default_values(const ColumnPtr& column, TypeIndex scalar_type,
size_t num_dimensions) {
const auto* column_array = check_and_get_column<ColumnArray>(remove_nullable(column).get());
if (column_array && num_dimensions) {
if (column_array != nullptr && num_dimensions != 0) {
return make_nullable(ColumnArray::create(
recreate_column_with_default_values(column_array->get_data_ptr(), scalar_type,
num_dimensions - 1),
Expand All @@ -496,7 +496,7 @@ static ColumnPtr recreate_column_with_default_values(const ColumnPtr& column, Ty
->clone_resized(column->size());
}

ColumnObject::Subcolumn ColumnObject::Subcolumn::recreate_with_default_values(
ColumnObject::Subcolumn ColumnObject::Subcolumn::clone_with_default_values(
const FieldInfo& field_info) const {
Subcolumn new_subcolumn(*this);
new_subcolumn.least_common_type =
Expand Down Expand Up @@ -998,7 +998,7 @@ void ColumnObject::get(size_t n, Field& res) const {
for (const auto& entry : subcolumns) {
Field field;
entry->data.get(n, field);
// Notice: we treat null as empty field
// Notice: we treat null as empty field, since we do not distinguish null and empty for Variant type.
if (field.get_type() != Field::Types::Null) {
object.try_emplace(entry->path.get_path(), field);
}
Expand All @@ -1019,13 +1019,15 @@ void ColumnObject::add_nested_subcolumn(const PathInData& key, const FieldInfo&
/// We find node that represents the same Nested type as @key.
const auto* nested_node = subcolumns.find_best_match(key);

if (nested_node && nested_node->is_nested()) {
// If we have a nested subcolumn and it contains nested node in it's path
if (nested_node &&
Subcolumns::find_parent(nested_node, [](const auto& node) { return node.is_nested(); })) {
/// Find any leaf of Nested subcolumn.
const auto* leaf = Subcolumns::find_leaf(nested_node, [&](const auto&) { return true; });
assert(leaf);

/// Recreate subcolumn with default values and the same sizes of arrays.
auto new_subcolumn = leaf->data.recreate_with_default_values(field_info);
auto new_subcolumn = leaf->data.clone_with_default_values(field_info);

/// It's possible that we have already inserted value from current row
/// to this subcolumn. So, adjust size to expected.
Expand Down Expand Up @@ -1970,7 +1972,7 @@ bool ColumnObject::try_insert_many_defaults_from_nested(const Subcolumns::NodePt
/// and replace scalar values to the correct
/// default values for given entry.
auto new_subcolumn = leaf->data.cut(old_size, leaf->data.size() - old_size)
.recreate_with_default_values(field_info);
.clone_with_default_values(field_info);

entry->data.insert_range_from(new_subcolumn, 0, new_subcolumn.size());
return true;
Expand Down
Loading

0 comments on commit d389f37

Please sign in to comment.