Skip to content

Commit

Permalink
[feature](Paimon) support deletion vector for Paimon naive reader (#3…
Browse files Browse the repository at this point in the history
…4743) (#35241)

bp #34743
Co-authored-by: 苏小刚 <[email protected]>
  • Loading branch information
morningman authored May 22, 2024
1 parent 50f50cf commit adc364a
Show file tree
Hide file tree
Showing 20 changed files with 585 additions and 169 deletions.
2 changes: 1 addition & 1 deletion be/src/io/fs/hdfs_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ uint64 HdfsFileSystemCache::_hdfs_hash_code(const THdfsParams& hdfs_params,
} else if (hdfs_params.__isset.fs_name) {
hash_code ^= Fingerprint(hdfs_params.fs_name);
}

if (hdfs_params.__isset.user) {
hash_code ^= Fingerprint(hdfs_params.user);
}
Expand Down
81 changes: 81 additions & 0 deletions be/src/util/deletion_vector.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <algorithm>
#include <cstdint>
#include <cstring>
#include <stdexcept>

#include "common/status.h"
#include "roaring/roaring.hh"

namespace doris {
class DeletionVector {
public:
const static uint32_t MAGIC_NUMBER = 1581511376;
DeletionVector(roaring::Roaring roaring_bitmap) : _roaring_bitmap(std::move(roaring_bitmap)) {};
~DeletionVector() = default;

bool checked_delete(uint32_t postition) { return _roaring_bitmap.addChecked(postition); }

bool is_delete(uint32_t postition) const { return _roaring_bitmap.contains(postition); }

bool is_empty() const { return _roaring_bitmap.isEmpty(); }

uint32_t maximum() const { return _roaring_bitmap.maximum(); }

uint32_t minimum() const { return _roaring_bitmap.minimum(); }

static Result<DeletionVector> deserialize(const char* buf, size_t length) {
uint32_t actual_length;
std::memcpy(reinterpret_cast<char*>(&actual_length), buf, 4);
// change byte order to big endian
std::reverse(reinterpret_cast<char*>(&actual_length),
reinterpret_cast<char*>(&actual_length) + 4);
buf += 4;
if (actual_length != length - 4) {
return ResultError(
Status::RuntimeError("DeletionVector deserialize error: length not match, "
"actual length: {}, expect length: {}",
actual_length, length - 4));
}
uint32_t magic_number;
std::memcpy(reinterpret_cast<char*>(&magic_number), buf, 4);
// change byte order to big endian
std::reverse(reinterpret_cast<char*>(&magic_number),
reinterpret_cast<char*>(&magic_number) + 4);
buf += 4;
if (magic_number != MAGIC_NUMBER) {
return ResultError(Status::RuntimeError(
"DeletionVector deserialize error: invalid magic number {}", magic_number));
}
roaring::Roaring roaring_bitmap;
try {
roaring_bitmap = roaring::Roaring::readSafe(buf, length);
} catch (std::runtime_error) {
return ResultError(Status::RuntimeError(
"DeletionVector deserialize error: failed to deserialize roaring bitmap"));
}
return DeletionVector(roaring_bitmap);
}

private:
roaring::Roaring _roaring_bitmap;
};
} // namespace doris
20 changes: 15 additions & 5 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ void OrcReader::_collect_profile_before_close() {
COUNTER_UPDATE(_orc_profile.set_fill_column_time, _statistics.set_fill_column_time);
COUNTER_UPDATE(_orc_profile.decode_value_time, _statistics.decode_value_time);
COUNTER_UPDATE(_orc_profile.decode_null_map_time, _statistics.decode_null_map_time);
COUNTER_UPDATE(_orc_profile.filter_block_time, _statistics.filter_block_time);

if (_file_input_stream != nullptr) {
_file_input_stream->collect_profile_before_close();
Expand Down Expand Up @@ -227,6 +228,8 @@ void OrcReader::_init_profile() {
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DecodeValueTime", orc_profile, 1);
_orc_profile.decode_null_map_time =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DecodeNullMapTime", orc_profile, 1);
_orc_profile.filter_block_time =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "FilterBlockTime", orc_profile, 1);
}
}

Expand Down Expand Up @@ -1608,8 +1611,11 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
return Status::OK();
}
_execute_filter_position_delete_rowids(*_filter);

RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(block, columns_to_filter, *_filter));
{
SCOPED_RAW_TIMER(&_statistics.decode_null_map_time);
RETURN_IF_CATCH_EXCEPTION(
Block::filter_block_internal(block, columns_to_filter, *_filter));
}
if (!_not_single_slot_filter_conjuncts.empty()) {
static_cast<void>(_convert_dict_cols_to_string_cols(block, &batch_vec));
RETURN_IF_CATCH_EXCEPTION(
Expand Down Expand Up @@ -1736,8 +1742,11 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
return Status::OK();
}
_execute_filter_position_delete_rowids(result_filter);
RETURN_IF_CATCH_EXCEPTION(
Block::filter_block_internal(block, columns_to_filter, result_filter));
{
SCOPED_RAW_TIMER(&_statistics.filter_block_time);
RETURN_IF_CATCH_EXCEPTION(
Block::filter_block_internal(block, columns_to_filter, result_filter));
}
if (!_not_single_slot_filter_conjuncts.empty()) {
static_cast<void>(_convert_dict_cols_to_string_cols(block, &batch_vec));
RETURN_IF_CATCH_EXCEPTION(
Expand All @@ -1751,15 +1760,16 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
} else {
if (_delete_rows_filter_ptr) {
_execute_filter_position_delete_rowids(*_delete_rows_filter_ptr);
SCOPED_RAW_TIMER(&_statistics.filter_block_time);
RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(block, columns_to_filter,
(*_delete_rows_filter_ptr)));
} else {
std::unique_ptr<IColumn::Filter> filter(new IColumn::Filter(block->rows(), 1));
_execute_filter_position_delete_rowids(*filter);
SCOPED_RAW_TIMER(&_statistics.filter_block_time);
RETURN_IF_CATCH_EXCEPTION(
Block::filter_block_internal(block, columns_to_filter, (*filter)));
}

Block::erase_useless_column(block, column_to_keep);
static_cast<void>(_convert_dict_cols_to_string_cols(block, &batch_vec));
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exec/format/orc/vorc_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ class OrcReader : public GenericReader {
int64_t set_fill_column_time = 0;
int64_t decode_value_time = 0;
int64_t decode_null_map_time = 0;
int64_t filter_block_time = 0;
};

OrcReader(RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params,
Expand Down Expand Up @@ -222,6 +223,7 @@ class OrcReader : public GenericReader {
RuntimeProfile::Counter* set_fill_column_time = nullptr;
RuntimeProfile::Counter* decode_value_time = nullptr;
RuntimeProfile::Counter* decode_null_map_time = nullptr;
RuntimeProfile::Counter* filter_block_time = nullptr;
};

class ORCFilterImpl : public orc::ORCFilter {
Expand Down
11 changes: 0 additions & 11 deletions be/src/vec/exec/format/table/iceberg_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,17 +151,6 @@ Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool*
return _shrink_block_if_need(block);
}

Status IcebergTableReader::set_fill_columns(
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
partition_columns,
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) {
return _file_format_reader->set_fill_columns(partition_columns, missing_columns);
}

bool IcebergTableReader::fill_all_columns() const {
return _file_format_reader->fill_all_columns();
};

Status IcebergTableReader::get_columns(
std::unordered_map<std::string, TypeDescriptor>* name_to_type,
std::unordered_set<std::string>* missing_cols) {
Expand Down
7 changes: 0 additions & 7 deletions be/src/vec/exec/format/table/iceberg_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,6 @@ class IcebergTableReader : public TableFormatReader {

Status get_next_block(Block* block, size_t* read_rows, bool* eof) final;

Status set_fill_columns(
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
partition_columns,
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) final;

bool fill_all_columns() const final;

Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
std::unordered_set<std::string>* missing_cols) final;

Expand Down
94 changes: 94 additions & 0 deletions be/src/vec/exec/format/table/paimon_jni_reader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "paimon_jni_reader.h"

#include <map>
#include <ostream>

#include "runtime/descriptors.h"
#include "runtime/types.h"
#include "vec/core/types.h"

namespace doris {
class RuntimeProfile;
class RuntimeState;

namespace vectorized {
class Block;
} // namespace vectorized
} // namespace doris

namespace doris::vectorized {

const std::string PaimonJniReader::PAIMON_OPTION_PREFIX = "paimon_option_prefix.";

PaimonJniReader::PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_descs,
RuntimeState* state, RuntimeProfile* profile,
const TFileRangeDesc& range)
: _file_slot_descs(file_slot_descs), _state(state), _profile(profile) {
std::vector<std::string> column_names;
std::vector<std::string> column_types;
for (auto& desc : _file_slot_descs) {
column_names.emplace_back(desc->col_name());
column_types.emplace_back(JniConnector::get_jni_type(desc->type()));
}
std::map<String, String> params;
params["db_name"] = range.table_format_params.paimon_params.db_name;
params["table_name"] = range.table_format_params.paimon_params.table_name;
params["paimon_split"] = range.table_format_params.paimon_params.paimon_split;
params["paimon_column_names"] = range.table_format_params.paimon_params.paimon_column_names;
params["paimon_predicate"] = range.table_format_params.paimon_params.paimon_predicate;
params["ctl_id"] = std::to_string(range.table_format_params.paimon_params.ctl_id);
params["db_id"] = std::to_string(range.table_format_params.paimon_params.db_id);
params["tbl_id"] = std::to_string(range.table_format_params.paimon_params.tbl_id);
params["last_update_time"] =
std::to_string(range.table_format_params.paimon_params.last_update_time);
params["required_fields"] = join(column_names, ",");
params["columns_types"] = join(column_types, "#");

// Used to create paimon option
for (auto& kv : range.table_format_params.paimon_params.paimon_options) {
params[PAIMON_OPTION_PREFIX + kv.first] = kv.second;
}
_jni_connector = std::make_unique<JniConnector>("org/apache/doris/paimon/PaimonJniScanner",
params, column_names);
}

Status PaimonJniReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
RETURN_IF_ERROR(_jni_connector->get_next_block(block, read_rows, eof));
if (*eof) {
RETURN_IF_ERROR(_jni_connector->close());
}
return Status::OK();
}

Status PaimonJniReader::get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
std::unordered_set<std::string>* missing_cols) {
for (auto& desc : _file_slot_descs) {
name_to_type->emplace(desc->col_name(), desc->type());
}
return Status::OK();
}

Status PaimonJniReader::init_reader(
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
_colname_to_value_range = colname_to_value_range;
RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range));
return _jni_connector->open(_state, _profile);
}
} // namespace doris::vectorized
77 changes: 77 additions & 0 deletions be/src/vec/exec/format/table/paimon_jni_reader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <cstddef>
#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>

#include "common/status.h"
#include "exec/olap_common.h"
#include "vec/exec/format/generic_reader.h"
#include "vec/exec/format/table/table_format_reader.h"
#include "vec/exec/jni_connector.h"

namespace doris {
class RuntimeProfile;
class RuntimeState;
class SlotDescriptor;
namespace vectorized {
class Block;
} // namespace vectorized
struct TypeDescriptor;
} // namespace doris

namespace doris::vectorized {

/**
* The demo usage of JniReader, showing how to read data from java scanner.
* The java side is also a mock reader that provide values for each type.
* This class will only be retained during the functional testing phase to verify that
* the communication and data exchange with the jvm are correct.
*/
class PaimonJniReader : public GenericReader {
ENABLE_FACTORY_CREATOR(PaimonJniReader);

public:
static const std::string PAIMON_OPTION_PREFIX;
PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* state,
RuntimeProfile* profile, const TFileRangeDesc& range);

~PaimonJniReader() override = default;

Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;

Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
std::unordered_set<std::string>* missing_cols) override;

Status init_reader(
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);

private:
const std::vector<SlotDescriptor*>& _file_slot_descs;
RuntimeState* _state = nullptr;
RuntimeProfile* _profile = nullptr;
std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
std::unique_ptr<JniConnector> _jni_connector;
};

} // namespace doris::vectorized
Loading

0 comments on commit adc364a

Please sign in to comment.