Skip to content

Commit

Permalink
[Fix](orc-reader) Fix the issue when string col has mixed plain and d…
Browse files Browse the repository at this point in the history
…ict encoding in different stripes. (#34146) (#34248) (#34302)

backport #34146
  • Loading branch information
kaka11chen authored Apr 30, 2024
1 parent 1c55cf8 commit 3da43a1
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 72 deletions.
152 changes: 81 additions & 71 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ OrcReader::OrcReader(RuntimeProfile* profile, RuntimeState* state,
_is_hive(params.__isset.slot_name_to_schema_pos),
_io_ctx(io_ctx),
_enable_lazy_mat(enable_lazy_mat),
_is_dict_cols_converted(false),
_dict_cols_has_converted(false),
_unsupported_pushdown_types(unsupported_pushdown_types) {
TimezoneUtils::find_cctz_time_zone(ctz, _time_zone);
VecDateTimeValue t;
Expand All @@ -171,7 +171,7 @@ OrcReader::OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& r
_file_system(nullptr),
_io_ctx(io_ctx),
_enable_lazy_mat(enable_lazy_mat),
_is_dict_cols_converted(false) {
_dict_cols_has_converted(false) {
_init_system_properties();
_init_file_description();
}
Expand Down Expand Up @@ -1527,23 +1527,26 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
}
}

for (auto& dict_filter_cols : _dict_filter_cols) {
MutableColumnPtr dict_col_ptr = ColumnVector<Int32>::create();
size_t pos = block->get_position_by_name(dict_filter_cols.first);
auto& column_with_type_and_name = block->get_by_position(pos);
auto& column_type = column_with_type_and_name.type;
if (column_type->is_nullable()) {
block->get_by_position(pos).type =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
block->replace_by_position(
pos, ColumnNullable::create(std::move(dict_col_ptr),
ColumnUInt8::create(dict_col_ptr->size(), 0)));
} else {
block->get_by_position(pos).type = std::make_shared<DataTypeInt32>();
block->replace_by_position(pos, std::move(dict_col_ptr));
if (!_dict_cols_has_converted && !_dict_filter_cols.empty()) {
for (auto& dict_filter_cols : _dict_filter_cols) {
MutableColumnPtr dict_col_ptr = ColumnVector<Int32>::create();
size_t pos = block->get_position_by_name(dict_filter_cols.first);
auto& column_with_type_and_name = block->get_by_position(pos);
auto& column_type = column_with_type_and_name.type;
if (column_type->is_nullable()) {
block->get_by_position(pos).type =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
block->replace_by_position(
pos,
ColumnNullable::create(std::move(dict_col_ptr),
ColumnUInt8::create(dict_col_ptr->size(), 0)));
} else {
block->get_by_position(pos).type = std::make_shared<DataTypeInt32>();
block->replace_by_position(pos, std::move(dict_col_ptr));
}
}
_dict_cols_has_converted = true;
}
_is_dict_cols_converted = true;

std::vector<orc::ColumnVectorBatch*> batch_vec;
_fill_batch_vec(batch_vec, _batch.get(), 0);
Expand Down Expand Up @@ -1673,23 +1676,25 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s
Block* block = (Block*)arg;
size_t origin_column_num = block->columns();

for (auto& dict_filter_cols : _dict_filter_cols) {
MutableColumnPtr dict_col_ptr = ColumnVector<Int32>::create();
size_t pos = block->get_position_by_name(dict_filter_cols.first);
auto& column_with_type_and_name = block->get_by_position(pos);
auto& column_type = column_with_type_and_name.type;
if (column_type->is_nullable()) {
block->get_by_position(pos).type =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
block->replace_by_position(
pos, ColumnNullable::create(std::move(dict_col_ptr),
ColumnUInt8::create(dict_col_ptr->size(), 0)));
} else {
block->get_by_position(pos).type = std::make_shared<DataTypeInt32>();
block->replace_by_position(pos, std::move(dict_col_ptr));
if (!_dict_cols_has_converted && !_dict_filter_cols.empty()) {
for (auto& dict_filter_cols : _dict_filter_cols) {
MutableColumnPtr dict_col_ptr = ColumnVector<Int32>::create();
size_t pos = block->get_position_by_name(dict_filter_cols.first);
auto& column_with_type_and_name = block->get_by_position(pos);
auto& column_type = column_with_type_and_name.type;
if (column_type->is_nullable()) {
block->get_by_position(pos).type =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
block->replace_by_position(
pos, ColumnNullable::create(std::move(dict_col_ptr),
ColumnUInt8::create(dict_col_ptr->size(), 0)));
} else {
block->get_by_position(pos).type = std::make_shared<DataTypeInt32>();
block->replace_by_position(pos, std::move(dict_col_ptr));
}
}
_dict_cols_has_converted = true;
}
_is_dict_cols_converted = true;
std::vector<orc::ColumnVectorBatch*> batch_vec;
_fill_batch_vec(batch_vec, &data, 0);
std::vector<string> col_names;
Expand Down Expand Up @@ -1760,6 +1765,7 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s
block->get_by_name(col.first).column->assume_mutable()->clear();
}
Block::erase_useless_column(block, origin_column_num);
static_cast<void>(_convert_dict_cols_to_string_cols(block, nullptr));
}

uint16_t new_size = 0;
Expand Down Expand Up @@ -2088,52 +2094,56 @@ Status OrcReader::_rewrite_dict_conjuncts(std::vector<int32_t>& dict_codes, int

Status OrcReader::_convert_dict_cols_to_string_cols(
Block* block, const std::vector<orc::ColumnVectorBatch*>* batch_vec) {
if (!_is_dict_cols_converted) {
if (!_dict_cols_has_converted) {
return Status::OK();
}
for (auto& dict_filter_cols : _dict_filter_cols) {
size_t pos = block->get_position_by_name(dict_filter_cols.first);
ColumnWithTypeAndName& column_with_type_and_name = block->get_by_position(pos);
const ColumnPtr& column = column_with_type_and_name.column;
auto orc_col_idx = _colname_to_idx.find(dict_filter_cols.first);
if (orc_col_idx == _colname_to_idx.end()) {
return Status::InternalError("Wrong read column '{}' in orc file",
dict_filter_cols.first);
}
if (auto* nullable_column = check_and_get_column<ColumnNullable>(*column)) {
const ColumnPtr& nested_column = nullable_column->get_nested_column_ptr();
const ColumnInt32* dict_column = assert_cast<const ColumnInt32*>(nested_column.get());
DCHECK(dict_column);
const NullMap& null_map = nullable_column->get_null_map_data();

MutableColumnPtr string_column;
if (batch_vec != nullptr) {
string_column = _convert_dict_column_to_string_column(
dict_column, &null_map, (*batch_vec)[orc_col_idx->second],
_col_orc_type[orc_col_idx->second]);
} else {
string_column = ColumnString::create();
if (!_dict_filter_cols.empty()) {
for (auto& dict_filter_cols : _dict_filter_cols) {
size_t pos = block->get_position_by_name(dict_filter_cols.first);
ColumnWithTypeAndName& column_with_type_and_name = block->get_by_position(pos);
const ColumnPtr& column = column_with_type_and_name.column;
auto orc_col_idx = _colname_to_idx.find(dict_filter_cols.first);
if (orc_col_idx == _colname_to_idx.end()) {
return Status::InternalError("Wrong read column '{}' in orc file",
dict_filter_cols.first);
}
if (auto* nullable_column = check_and_get_column<ColumnNullable>(*column)) {
const ColumnPtr& nested_column = nullable_column->get_nested_column_ptr();
const ColumnInt32* dict_column =
assert_cast<const ColumnInt32*>(nested_column.get());
DCHECK(dict_column);
const NullMap& null_map = nullable_column->get_null_map_data();

MutableColumnPtr string_column;
if (batch_vec != nullptr) {
string_column = _convert_dict_column_to_string_column(
dict_column, &null_map, (*batch_vec)[orc_col_idx->second],
_col_orc_type[orc_col_idx->second]);
} else {
string_column = ColumnString::create();
}

column_with_type_and_name.type =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
block->replace_by_position(
pos, ColumnNullable::create(std::move(string_column),
nullable_column->get_null_map_column_ptr()));
} else {
const ColumnInt32* dict_column = assert_cast<const ColumnInt32*>(column.get());
MutableColumnPtr string_column;
if (batch_vec != nullptr) {
string_column = _convert_dict_column_to_string_column(
dict_column, nullptr, (*batch_vec)[orc_col_idx->second],
_col_orc_type[orc_col_idx->second]);
column_with_type_and_name.type =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
block->replace_by_position(
pos, ColumnNullable::create(std::move(string_column),
nullable_column->get_null_map_column_ptr()));
} else {
string_column = ColumnString::create();
}
const ColumnInt32* dict_column = assert_cast<const ColumnInt32*>(column.get());
MutableColumnPtr string_column;
if (batch_vec != nullptr) {
string_column = _convert_dict_column_to_string_column(
dict_column, nullptr, (*batch_vec)[orc_col_idx->second],
_col_orc_type[orc_col_idx->second]);
} else {
string_column = ColumnString::create();
}

column_with_type_and_name.type = std::make_shared<DataTypeString>();
block->replace_by_position(pos, std::move(string_column));
column_with_type_and_name.type = std::make_shared<DataTypeString>();
block->replace_by_position(pos, std::move(string_column));
}
}
_dict_cols_has_converted = false;
}
return Status::OK();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/format/orc/vorc_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ class OrcReader : public GenericReader {
std::vector<std::pair<std::string, int>> _dict_filter_cols;
std::shared_ptr<ObjectPool> _obj_pool;
std::unique_ptr<orc::StringDictFilter> _string_dict_filter;
bool _is_dict_cols_converted;
bool _dict_cols_has_converted = false;
bool _has_complex_type = false;
std::vector<orc::TypeKind>* _unsupported_pushdown_types;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1788,6 +1788,26 @@ LOCATION

msck repair table partition_table;

CREATE TABLE `string_col_dict_plain_mixed_orc`(
`col0` int,
`col1` string,
`col2` double,
`col3` boolean,
`col4` string,
`col5` int)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
'/user/doris/preinstalled_data/orc_table/string_col_dict_plain_mixed_orc'
TBLPROPERTIES (
'orc.compress'='ZLIB');

msck repair table string_col_dict_plain_mixed_orc;

show tables;


Expand Down
Binary file not shown.
9 changes: 9 additions & 0 deletions regression-test/data/external_table_p0/hive/test_hive_orc.out
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,15 @@ tablets tinyint_col 179 182 182 187 183 181 177 183 177 187 183 202 202 186 528
4 0.0000 0.000000 123.123456789876 12 0E-9 123
5 1.1234 12.123456 0E-12 0 1234.123456789 0

-- !string_col_dict_plain_mixed1 --
0

-- !string_col_dict_plain_mixed2 --
0

-- !string_col_dict_plain_mixed3 --
10240

-- !select_top50 --
4 55 999742610 400899305488827731 false 6.5976813E8 7.8723304616937395E17 \N base tennis pit vertical friday 2022-08-19T07:29:58 \N tablets smallint_col 2019-02-07 [7.53124931825377e+17] ["NbSSBtwzpxNSkkwga"] tablets smallint_col
2 49 999613702 105493714032727452 \N 6.3322381E8 9.8642324410240179E17 Unveil bright recruit participate. Suspect impression camera mathematical revelation. Fault live2 elbow debt west hydrogen current. how literary 2022-09-03T17:20:21 481707.1065 tablets boolean_col 2020-01-12 [] ["HoMrAnn", "wteEFvIwoZsVpVQdscMb", null, "zcGFmv", "kGEBBckbMtX", "hrEtCGFdPWZK"] tablets boolean_col
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ suite("test_hive_orc", "all_types,p0,external,hive,external_docker,external_dock
qt_decimals4 """select * from orc_decimal_table where id > 3 order by id;"""
}

// string col dict plain encoding mixed in different stripes
def string_col_dict_plain_mixed = {
qt_string_col_dict_plain_mixed1 """select count(col2) from string_col_dict_plain_mixed_orc where col4 = 'Additional data' and col1 like '%Test%' and col3 like '%2%';"""
qt_string_col_dict_plain_mixed2 """select count(col2) from string_col_dict_plain_mixed_orc where col4 = 'Additional data' and col3 like '%2%';"""
qt_string_col_dict_plain_mixed3 """select count(col2) from string_col_dict_plain_mixed_orc where col1 like '%Test%';"""
}

String enabled = context.config.otherConfigs.get("enableHiveTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
try {
Expand All @@ -95,6 +102,7 @@ suite("test_hive_orc", "all_types,p0,external,hive,external_docker,external_dock
search_mix()
only_partition_col()
decimals()
string_col_dict_plain_mixed()

sql """drop catalog if exists ${catalog_name}"""

Expand Down

0 comments on commit 3da43a1

Please sign in to comment.