Skip to content

Commit

Permalink
add table property
Browse files Browse the repository at this point in the history
  • Loading branch information
eldenmoon committed Aug 12, 2024
1 parent fe3897e commit 1efe0be
Show file tree
Hide file tree
Showing 36 changed files with 167 additions and 108 deletions.
4 changes: 4 additions & 0 deletions be/src/cloud/pb_convert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ void doris_tablet_schema_to_cloud(TabletSchemaCloudPB* out, const TabletSchemaPB
out->set_is_dynamic_schema(in.is_dynamic_schema());
out->mutable_row_store_column_unique_ids()->CopyFrom(in.row_store_column_unique_ids());
out->set_inverted_index_storage_format(in.inverted_index_storage_format());
out->set_enable_variant_flatten_nested(in.variant_enable_flatten_nested());
}

void doris_tablet_schema_to_cloud(TabletSchemaCloudPB* out, TabletSchemaPB&& in) {
Expand All @@ -303,6 +304,7 @@ void doris_tablet_schema_to_cloud(TabletSchemaCloudPB* out, TabletSchemaPB&& in)
out->set_is_dynamic_schema(in.is_dynamic_schema());
out->mutable_row_store_column_unique_ids()->Swap(in.mutable_row_store_column_unique_ids());
out->set_inverted_index_storage_format(in.inverted_index_storage_format());
out->set_enable_variant_flatten_nested(in.variant_enable_flatten_nested());
}

TabletSchemaPB cloud_tablet_schema_to_doris(const TabletSchemaCloudPB& in) {
Expand Down Expand Up @@ -342,6 +344,7 @@ void cloud_tablet_schema_to_doris(TabletSchemaPB* out, const TabletSchemaCloudPB
out->set_is_dynamic_schema(in.is_dynamic_schema());
out->mutable_row_store_column_unique_ids()->CopyFrom(in.row_store_column_unique_ids());
out->set_inverted_index_storage_format(in.inverted_index_storage_format());
out->set_variant_enable_flatten_nested(in.enable_variant_flatten_nested());
}

void cloud_tablet_schema_to_doris(TabletSchemaPB* out, TabletSchemaCloudPB&& in) {
Expand Down Expand Up @@ -369,6 +372,7 @@ void cloud_tablet_schema_to_doris(TabletSchemaPB* out, TabletSchemaCloudPB&& in)
out->set_is_dynamic_schema(in.is_dynamic_schema());
out->mutable_row_store_column_unique_ids()->Swap(in.mutable_row_store_column_unique_ids());
out->set_inverted_index_storage_format(in.inverted_index_storage_format());
out->set_variant_enable_flatten_nested(in.enable_variant_flatten_nested());
}

TabletMetaCloudPB doris_tablet_meta_to_cloud(const TabletMetaPB& in) {
Expand Down
1 change: 0 additions & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,6 @@ DEFINE_mInt64(workload_group_scan_task_wait_timeout_ms, "10000");
// Will remove after fully test.
DEFINE_Bool(enable_index_apply_preds_except_leafnode_of_andnode, "true");

DEFINE_mBool(variant_enable_flatten_nested, "false");
DEFINE_mDouble(variant_ratio_of_defaults_as_sparse_column, "1");
DEFINE_mInt64(variant_threshold_rows_to_estimate_sparse_column, "1000");
DEFINE_mBool(variant_throw_exeception_on_invalid_json, "false");
Expand Down
3 changes: 0 additions & 3 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1190,9 +1190,6 @@ DECLARE_mInt64(lookup_connection_cache_capacity);

// level of compression when using LZ4_HC, whose defalut value is LZ4HC_CLEVEL_DEFAULT
DECLARE_mInt64(LZ4_HC_compression_level);
// Whether flatten nested arrays in variant column
// Notice: TEST ONLY
DECLARE_mBool(variant_enable_flatten_nested);
// Threshold of a column as sparse column
// Notice: TEST ONLY
DECLARE_mDouble(variant_ratio_of_defaults_as_sparse_column);
Expand Down
8 changes: 5 additions & 3 deletions be/src/olap/rowset/segment_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "vec/core/columns_with_type_and_name.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type.h"
#include "vec/json/json_parser.h"

namespace doris {
using namespace ErrorCode;
Expand Down Expand Up @@ -99,9 +100,10 @@ Status SegmentFlusher::_parse_variant_columns(vectorized::Block& block) {
return Status::OK();
}

vectorized::schema_util::ParseContext ctx;
ctx.record_raw_json_column = _context.tablet_schema->has_row_store_for_all_columns();
RETURN_IF_ERROR(vectorized::schema_util::parse_variant_columns(block, variant_column_pos, ctx));
vectorized::ParseConfig config;
config.enable_flatten_nested = _context.tablet_schema->variant_flatten_nested();
RETURN_IF_ERROR(
vectorized::schema_util::parse_variant_columns(block, variant_column_pos, config));
return Status::OK();
}

Expand Down
10 changes: 0 additions & 10 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -635,20 +635,10 @@ Status Segment::new_column_iterator_with_path(const TabletColumn& tablet_column,
*iter = std::make_unique<DefaultNestedColumnIterator>(std::move(sibling_iter),
leaf->data.file_column_type);
} else {
if (tablet_column.path_info_ptr()->get_path().find("uploader.node_id") !=
std::string::npos) {
VLOG_DEBUG << "find with path " << tablet_column.path_info_ptr()->get_path()
<< ", fuck1 in default iter";
}
*iter = std::make_unique<DefaultNestedColumnIterator>(nullptr, nullptr);
}
return Status::OK();
}
if (tablet_column.path_info_ptr()->get_path().find("uploader.node_id") !=
std::string::npos) {
VLOG_DEBUG << "find with path " << tablet_column.path_info_ptr()->get_path()
<< ", fuck2 in default iter";
}
return new_default_iterator(tablet_column, iter);
};

Expand Down
7 changes: 0 additions & 7 deletions be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -752,13 +752,6 @@ Status VerticalSegmentWriter::_append_block_with_variant_subcolumns(RowsInBlock&
// already handled by parent column
continue;
}
if (entry->path.get_path().find("xx") != std::string::npos &&
entry->path.get_path().find("nested") == std::string::npos) {
LOG(WARNING) << "path contains xx: " << entry->path.get_path();
}
VLOG_DEBUG << "dump column " << entry->path.get_path()
<< vectorized::Block::dump_column(entry->data.get_finalized_column_ptr(),
entry->data.get_least_common_type());
CHECK(entry->data.is_finalized());
int current_column_id = column_id++;
TabletColumn tablet_column = generate_column_info(entry);
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/tablet_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,10 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id
schema->set_disable_auto_compaction(tablet_schema.disable_auto_compaction);
}

if (tablet_schema.__isset.variant_enable_flatten_nested) {
schema->set_variant_enable_flatten_nested(tablet_schema.variant_enable_flatten_nested);
}

if (tablet_schema.__isset.enable_single_replica_compaction) {
schema->set_enable_single_replica_compaction(
tablet_schema.enable_single_replica_compaction);
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/tablet_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,7 @@ void TabletSchema::init_from_pb(const TabletSchemaPB& schema, bool ignore_extrac

_row_store_column_unique_ids.assign(schema.row_store_column_unique_ids().begin(),
schema.row_store_column_unique_ids().end());
_variant_enable_flatten_nested = schema.variant_enable_flatten_nested();
}

void TabletSchema::copy_from(const TabletSchema& tablet_schema) {
Expand Down Expand Up @@ -1063,6 +1064,7 @@ void TabletSchema::build_current_tablet_schema(int64_t index_id, int32_t version
_sort_type = ori_tablet_schema.sort_type();
_sort_col_num = ori_tablet_schema.sort_col_num();
_row_store_page_size = ori_tablet_schema.row_store_page_size();
_variant_enable_flatten_nested = ori_tablet_schema.variant_flatten_nested();

// copy from table_schema_param
_schema_version = version;
Expand Down Expand Up @@ -1221,6 +1223,7 @@ void TabletSchema::to_schema_pb(TabletSchemaPB* tablet_schema_pb) const {
tablet_schema_pb->set_inverted_index_storage_format(_inverted_index_storage_format);
tablet_schema_pb->mutable_row_store_column_unique_ids()->Assign(
_row_store_column_unique_ids.begin(), _row_store_column_unique_ids.end());
tablet_schema_pb->set_variant_enable_flatten_nested(_variant_enable_flatten_nested);
}

size_t TabletSchema::row_size() const {
Expand Down Expand Up @@ -1542,6 +1545,7 @@ bool operator==(const TabletSchema& a, const TabletSchema& b) {
if (a._store_row_column != b._store_row_column) return false;
if (a._row_store_page_size != b._row_store_page_size) return false;
if (a._skip_write_index_on_load != b._skip_write_index_on_load) return false;
if (a._variant_enable_flatten_nested != b._variant_enable_flatten_nested) return false;
return true;
}

Expand Down
5 changes: 5 additions & 0 deletions be/src/olap/tablet_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,10 @@ class TabletSchema {
_disable_auto_compaction = disable_auto_compaction;
}
bool disable_auto_compaction() const { return _disable_auto_compaction; }
void set_variant_enable_flatten_nested(bool flatten_nested) {
_variant_enable_flatten_nested = flatten_nested;
}
bool variant_flatten_nested() const { return _variant_enable_flatten_nested; }
void set_enable_single_replica_compaction(bool enable_single_replica_compaction) {
_enable_single_replica_compaction = enable_single_replica_compaction;
}
Expand Down Expand Up @@ -538,6 +542,7 @@ class TabletSchema {
// Contains column ids of which columns should be encoded into row store.
// ATTN: For compability reason empty cids means all columns of tablet schema are encoded to row column
std::vector<int32_t> _row_store_column_unique_ids;
bool _variant_enable_flatten_nested = false;
};

bool operator==(const TabletSchema& a, const TabletSchema& b);
Expand Down
10 changes: 6 additions & 4 deletions be/src/vec/common/schema_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
#include "vec/data_types/get_least_supertype.h"
#include "vec/functions/function.h"
#include "vec/functions/simple_function_factory.h"
#include "vec/json/json_parser.h"
#include "vec/json/parse2column.h"
#include "vec/json/path_in_data.h"

Expand Down Expand Up @@ -468,7 +469,7 @@ Status get_least_common_schema(const std::vector<TabletSchemaSPtr>& schemas,
}

Status _parse_variant_columns(Block& block, const std::vector<int>& variant_pos,
const ParseContext& ctx) {
const ParseConfig& config) {
for (int i = 0; i < variant_pos.size(); ++i) {
auto column_ref = block.get_by_position(variant_pos[i]).column;
bool is_nullable = column_ref->is_nullable();
Expand Down Expand Up @@ -506,7 +507,7 @@ Status _parse_variant_columns(Block& block, const std::vector<int>& variant_pos,
if (scalar_root_column->is_column_string()) {
variant_column = ColumnObject::create(true);
parse_json_to_variant(*variant_column.get(),
assert_cast<const ColumnString&>(*scalar_root_column));
assert_cast<const ColumnString&>(*scalar_root_column), config);
} else {
// Root maybe other types rather than string like ColumnObject(Int32).
// In this case, we should finlize the root and cast to JSON type
Expand All @@ -529,10 +530,11 @@ Status _parse_variant_columns(Block& block, const std::vector<int>& variant_pos,
}

Status parse_variant_columns(Block& block, const std::vector<int>& variant_pos,
const ParseContext& ctx) {
const ParseConfig& config) {
try {
// Parse each variant column from raw string column
RETURN_IF_ERROR(vectorized::schema_util::_parse_variant_columns(block, variant_pos, ctx));
RETURN_IF_ERROR(
vectorized::schema_util::_parse_variant_columns(block, variant_pos, config));
} catch (const doris::Exception& e) {
// TODO more graceful, max_filter_ratio
LOG(WARNING) << "encounter execption " << e.to_string();
Expand Down
8 changes: 2 additions & 6 deletions be/src/vec/common/schema_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "vec/core/field.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type.h"
#include "vec/json/json_parser.h"
#include "vec/json/path_in_data.h"

namespace doris {
Expand Down Expand Up @@ -79,17 +80,12 @@ struct ExtraInfo {
TabletColumn get_column_by_type(const vectorized::DataTypePtr& data_type, const std::string& name,
const ExtraInfo& ext_info);

struct ParseContext {
// record an extract json column, used for encoding row store
bool record_raw_json_column = false;
};

// three steps to parse and encode variant columns into flatterned columns
// 1. parse variant from raw json string
// 2. finalize variant column to each subcolumn least commn types, default ignore sparse sub columns
// 3. encode sparse sub columns
Status parse_variant_columns(Block& block, const std::vector<int>& variant_pos,
const ParseContext& ctx);
const ParseConfig& config);
Status encode_variant_sparse_subcolumns(ColumnObject& column);

// Pick the tablet schema with the highest schema version as the reference.
Expand Down
9 changes: 5 additions & 4 deletions be/src/vec/json/json_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,15 @@ bool JSONDataParser<ParserImpl, parse_nested>::extract_key(MutableColumns& colum
}

template <typename ParserImpl, bool parse_nested>
std::optional<ParseResult> JSONDataParser<ParserImpl, parse_nested>::parse(const char* begin,
size_t length) {
std::optional<ParseResult> JSONDataParser<ParserImpl, parse_nested>::parse(
const char* begin, size_t length, const ParseConfig& config) {
std::string_view json {begin, length};
Element document;
if (!parser.parse(json, document)) {
return {};
}
ParseContext context;
context.enable_flatten_nested = config.enable_flatten_nested;
traverse(document, context);
ParseResult result;
result.values = std::move(context.values);
Expand All @@ -97,7 +98,7 @@ void JSONDataParser<ParserImpl, parse_nested>::traverse(const Element& element,
} else if (element.isArray()) {
has_nested = false;
checkHasNested(element);
if (has_nested && !parse_nested && !config::variant_enable_flatten_nested) {
if (has_nested && !parse_nested && !ctx.enable_flatten_nested) {
// Parse nested arrays to JsonbField
JsonbWriter writer;
traverseArrayAsJsonb(element.getArray(), writer);
Expand Down Expand Up @@ -203,7 +204,7 @@ void JSONDataParser<ParserImpl, parse_nested>::traverseArrayElement(const Elemen
ParseArrayContext& ctx) {
ParseContext element_ctx;
traverse(element, element_ctx);
auto& [_, paths, values] = element_ctx;
auto& [_, paths, values, flatten_nested] = element_ctx;
size_t size = paths.size();
size_t keys_to_update = ctx.arrays_by_path.size();
for (size_t i = 0; i < size; ++i) {
Expand Down
7 changes: 6 additions & 1 deletion be/src/vec/json/json_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,17 @@ enum class ExtractType {
ToString = 0,
// ...
};

struct ParseConfig {
bool enable_flatten_nested = false;
};
template <typename ParserImpl, bool parse_nested = false>
class JSONDataParser {
public:
using Element = typename ParserImpl::Element;
using JSONObject = typename ParserImpl::Object;
using JSONArray = typename ParserImpl::Array;
std::optional<ParseResult> parse(const char* begin, size_t length);
std::optional<ParseResult> parse(const char* begin, size_t length, const ParseConfig& config);

// extract keys's element into columns
bool extract_key(MutableColumns& columns, StringRef json, const std::vector<StringRef>& keys,
Expand All @@ -137,6 +141,7 @@ class JSONDataParser {
PathInDataBuilder builder;
std::vector<PathInData::Parts> paths;
std::vector<Field> values;
bool enable_flatten_nested = false;
};
using PathPartsWithArray = std::pair<PathInData::Parts, Array>;
using PathToArray = phmap::flat_hash_map<UInt128, PathPartsWithArray, UInt128TrivialHash>;
Expand Down
41 changes: 8 additions & 33 deletions be/src/vec/json/parse2column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,13 @@ class FieldVisitorReplaceScalars : public StaticVisitor<Field> {

template <typename ParserImpl>
void parse_json_to_variant(IColumn& column, const char* src, size_t length,
JSONDataParser<ParserImpl>* parser) {
JSONDataParser<ParserImpl>* parser, const ParseConfig& config) {
auto& column_object = assert_cast<ColumnObject&>(column);
std::optional<ParseResult> result;
/// Treat empty string as an empty object
/// for better CAST from String to Object.
if (length > 0) {
result = parser->parse(src, length);
result = parser->parse(src, length, config);
} else {
result = ParseResult {};
}
Expand Down Expand Up @@ -193,44 +193,19 @@ void parse_json_to_variant(IColumn& column, const char* src, size_t length,
column_object.incr_num_rows();
}

bool extract_key(MutableColumns& columns, StringRef json, const std::vector<StringRef>& keys,
const std::vector<ExtractType>& types, JsonParser* parser) {
return parser->extract_key(columns, json, keys, types);
}

// exposed interfaces
void parse_json_to_variant(IColumn& column, const StringRef& json, JsonParser* parser) {
return parse_json_to_variant(column, json.data, json.size, parser);
void parse_json_to_variant(IColumn& column, const StringRef& json, JsonParser* parser,
const ParseConfig& config) {
return parse_json_to_variant(column, json.data, json.size, parser, config);
}

void parse_json_to_variant(IColumn& column, const ColumnString& raw_json_column) {
void parse_json_to_variant(IColumn& column, const ColumnString& raw_json_column,
const ParseConfig& config) {
auto parser = parsers_pool.get([] { return new JsonParser(); });
for (size_t i = 0; i < raw_json_column.size(); ++i) {
StringRef raw_json = raw_json_column.get_data_at(i);
parse_json_to_variant(column, raw_json.data, raw_json.size, parser.get());
}
}

bool extract_key(MutableColumns& columns, const std::vector<StringRef>& jsons,
const std::vector<StringRef>& keys, const std::vector<ExtractType>& types) {
auto parser = parsers_pool.get([] { return new JsonParser(); });
for (StringRef json : jsons) {
if (!extract_key(columns, json, keys, types, parser.get())) {
return false;
}
}
return true;
}

bool extract_key(MutableColumns& columns, const ColumnString& json_column,
const std::vector<StringRef>& keys, const std::vector<ExtractType>& types) {
auto parser = parsers_pool.get([] { return new JsonParser(); });
for (size_t x = 0; x < json_column.size(); ++x) {
if (!extract_key(columns, json_column.get_data_at(x), keys, types, parser.get())) {
return false;
}
parse_json_to_variant(column, raw_json.data, raw_json.size, parser.get(), config);
}
return true;
}

} // namespace doris::vectorized
7 changes: 5 additions & 2 deletions be/src/vec/json/parse2column.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include "vec/columns/column.h"
#include "vec/common/string_ref.h"
#include "vec/json/json_parser.h"

namespace doris::vectorized {

Expand All @@ -35,10 +36,12 @@ using ColumnString = ColumnStr<UInt32>;
using JsonParser = JSONDataParser<SimdJSONParser, false>;

// parse a batch of json strings into column object, throws doris::Execption when failed
void parse_json_to_variant(IColumn& column, const ColumnString& raw_json_column);
void parse_json_to_variant(IColumn& column, const ColumnString& raw_json_column,
const ParseConfig& config);

// parse a single json, throws doris::Execption when failed
void parse_json_to_variant(IColumn& column, const StringRef& jsons, JsonParser* parser);
void parse_json_to_variant(IColumn& column, const StringRef& jsons, JsonParser* parser,
const ParseConfig& config);

// extract keys columns from json strings into columns
bool extract_key(MutableColumns& columns, const std::vector<StringRef>& jsons,
Expand Down
3 changes: 2 additions & 1 deletion be/test/olap/test_data/header_without_inc_rs.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@
"store_row_column": false,
"is_dynamic_schema": false,
"is_partial_update": false,
"enable_single_replica_compaction": false
"enable_single_replica_compaction": false,
"variant_enable_flatten_nested": false
},
"rs_metas": [
{
Expand Down
Loading

0 comments on commit 1efe0be

Please sign in to comment.