Skip to content

Commit

Permalink
Merge branch 'branch-2.0' into branch_2.0_clang_format
Browse files Browse the repository at this point in the history
  • Loading branch information
Vallishp authored Sep 19, 2024
2 parents 6de9201 + 3a7a80b commit f7b275e
Show file tree
Hide file tree
Showing 29 changed files with 26,726 additions and 5,201 deletions.
277 changes: 154 additions & 123 deletions be/src/exec/es/es_scroll_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,159 @@ Status insert_int_value(const rapidjson::Value& col, PrimitiveType type,
return parse_and_insert_data(col);
}

template <typename T>
Status handle_value(const rapidjson::Value& col, PrimitiveType sub_type, bool pure_doc_value,
T& val) {
RETURN_IF_ERROR(get_int_value<T>(col, sub_type, &val, pure_doc_value));
return Status::OK();
}

template <>
Status handle_value<float>(const rapidjson::Value& col, PrimitiveType sub_type, bool pure_doc_value,
float& val) {
RETURN_IF_ERROR(get_float_value<float>(col, sub_type, &val, pure_doc_value));
return Status::OK();
}

template <>
Status handle_value<double>(const rapidjson::Value& col, PrimitiveType sub_type,
bool pure_doc_value, double& val) {
RETURN_IF_ERROR(get_float_value<double>(col, sub_type, &val, pure_doc_value));
return Status::OK();
}

template <>
Status handle_value<std::string>(const rapidjson::Value& col, PrimitiveType sub_type,
bool pure_doc_value, std::string& val) {
RETURN_ERROR_IF_COL_IS_ARRAY(col, sub_type, true);
if (!col.IsString()) {
val = json_value_to_string(col);
} else {
val = col.GetString();
}
return Status::OK();
}

template <>
Status handle_value<bool>(const rapidjson::Value& col, PrimitiveType sub_type, bool pure_doc_value,
bool& val) {
if (col.IsBool()) {
val = col.GetBool();
return Status::OK();
}

if (col.IsNumber()) {
val = col.GetInt();
return Status::OK();
}

bool is_nested_str = false;
if (pure_doc_value && col.IsArray() && !col.Empty() && col[0].IsBool()) {
val = col[0].GetBool();
return Status::OK();
} else if (pure_doc_value && col.IsArray() && !col.Empty() && col[0].IsString()) {
is_nested_str = true;
} else if (pure_doc_value && col.IsArray()) {
return Status::InternalError(ERROR_INVALID_COL_DATA, "BOOLEAN");
}

const rapidjson::Value& str_col = is_nested_str ? col[0] : col;
const std::string& str_val = str_col.GetString();
size_t val_size = str_col.GetStringLength();
StringParser::ParseResult result;
val = StringParser::string_to_bool(str_val.c_str(), val_size, &result);
RETURN_ERROR_IF_PARSING_FAILED(result, str_col, sub_type);
return Status::OK();
}

template <typename T>
Status process_single_column(const rapidjson::Value& col, PrimitiveType sub_type,
bool pure_doc_value, vectorized::Array& array) {
T val;
RETURN_IF_ERROR(handle_value<T>(col, sub_type, pure_doc_value, val));
array.push_back(val);
return Status::OK();
}

template <typename T>
Status process_column_array(const rapidjson::Value& col, PrimitiveType sub_type,
bool pure_doc_value, vectorized::Array& array) {
for (const auto& sub_col : col.GetArray()) {
RETURN_IF_ERROR(process_single_column<T>(sub_col, sub_type, pure_doc_value, array));
}
return Status::OK();
}

template <typename T>
Status process_column(const rapidjson::Value& col, PrimitiveType sub_type, bool pure_doc_value,
vectorized::Array& array) {
if (!col.IsArray()) {
return process_single_column<T>(col, sub_type, pure_doc_value, array);
} else {
return process_column_array<T>(col, sub_type, pure_doc_value, array);
}
}

template <typename DateType, typename RT>
Status process_date_column(const rapidjson::Value& col, PrimitiveType sub_type, bool pure_doc_value,
vectorized::Array& array, const cctz::time_zone& time_zone) {
if (!col.IsArray()) {
RT data;
RETURN_IF_ERROR(
(get_date_int<DateType, RT>(col, sub_type, pure_doc_value, &data, time_zone)));
array.push_back(data);
} else {
for (const auto& sub_col : col.GetArray()) {
RT data;
RETURN_IF_ERROR((get_date_int<DateType, RT>(sub_col, sub_type, pure_doc_value, &data,
time_zone)));
array.push_back(data);
}
}
return Status::OK();
}

Status ScrollParser::parse_column(const rapidjson::Value& col, PrimitiveType sub_type,
bool pure_doc_value, vectorized::Array& array,
const cctz::time_zone& time_zone) {
switch (sub_type) {
case TYPE_CHAR:
case TYPE_VARCHAR:
case TYPE_STRING:
return process_column<std::string>(col, sub_type, pure_doc_value, array);
case TYPE_TINYINT:
return process_column<int8_t>(col, sub_type, pure_doc_value, array);
case TYPE_SMALLINT:
return process_column<int16_t>(col, sub_type, pure_doc_value, array);
case TYPE_INT:
return process_column<int32>(col, sub_type, pure_doc_value, array);
case TYPE_BIGINT:
return process_column<int64_t>(col, sub_type, pure_doc_value, array);
case TYPE_LARGEINT:
return process_column<__int128>(col, sub_type, pure_doc_value, array);
case TYPE_FLOAT:
return process_column<float>(col, sub_type, pure_doc_value, array);
case TYPE_DOUBLE:
return process_column<double>(col, sub_type, pure_doc_value, array);
case TYPE_BOOLEAN:
return process_column<bool>(col, sub_type, pure_doc_value, array);
// date/datetime v2 is the default type for catalog table,
// see https://github.com/apache/doris/pull/16304
// No need to support date and datetime types.
case TYPE_DATEV2: {
return process_date_column<vectorized::DateV2Value<vectorized::DateV2ValueType>, uint32_t>(
col, sub_type, pure_doc_value, array, time_zone);
}
case TYPE_DATETIMEV2: {
return process_date_column<vectorized::DateV2Value<vectorized::DateTimeV2ValueType>,
uint64_t>(col, sub_type, pure_doc_value, array, time_zone);
}
default:
LOG(ERROR) << "Do not support Array type: " << sub_type;
return Status::InternalError("Unsupported type");
}
}

ScrollParser::ScrollParser(bool doc_value_mode) : _size(0), _line_index(0) {}

ScrollParser::~ScrollParser() = default;
Expand Down Expand Up @@ -684,129 +837,7 @@ Status ScrollParser::fill_columns(const TupleDescriptor* tuple_desc,
case TYPE_ARRAY: {
vectorized::Array array;
const auto& sub_type = tuple_desc->slots()[i]->type().children[0].type;
RETURN_ERROR_IF_COL_IS_ARRAY(col, type, false);
for (const auto& sub_col : col.GetArray()) {
switch (sub_type) {
case TYPE_CHAR:
case TYPE_VARCHAR:
case TYPE_STRING: {
std::string val;
RETURN_ERROR_IF_COL_IS_ARRAY(sub_col, sub_type, true);
if (!sub_col.IsString()) {
val = json_value_to_string(sub_col);
} else {
val = sub_col.GetString();
}
array.push_back(val);
break;
}
case TYPE_TINYINT: {
int8_t val;
RETURN_IF_ERROR(get_int_value<int8_t>(sub_col, sub_type, &val, pure_doc_value));
array.push_back(val);
break;
}
case TYPE_SMALLINT: {
int16_t val;
RETURN_IF_ERROR(
get_int_value<int16_t>(sub_col, sub_type, &val, pure_doc_value));
array.push_back(val);
break;
}
case TYPE_INT: {
int32 val;
RETURN_IF_ERROR(get_int_value<int32>(sub_col, sub_type, &val, pure_doc_value));
array.push_back(val);
break;
}
case TYPE_BIGINT: {
int64_t val;
RETURN_IF_ERROR(
get_int_value<int64_t>(sub_col, sub_type, &val, pure_doc_value));
array.push_back(val);
break;
}
case TYPE_LARGEINT: {
__int128 val;
RETURN_IF_ERROR(
get_int_value<__int128>(sub_col, sub_type, &val, pure_doc_value));
array.push_back(val);
break;
}
case TYPE_FLOAT: {
float val {};
RETURN_IF_ERROR(
get_float_value<float>(sub_col, sub_type, &val, pure_doc_value));
array.push_back(val);
break;
}
case TYPE_DOUBLE: {
double val {};
RETURN_IF_ERROR(
get_float_value<double>(sub_col, sub_type, &val, pure_doc_value));
array.push_back(val);
break;
}
case TYPE_BOOLEAN: {
if (sub_col.IsBool()) {
array.push_back(sub_col.GetBool());
break;
}

if (sub_col.IsNumber()) {
array.push_back(sub_col.GetInt());
break;
}

bool is_nested_str = false;
if (pure_doc_value && sub_col.IsArray() && !sub_col.Empty() &&
sub_col[0].IsBool()) {
array.push_back(sub_col[0].GetBool());
break;
} else if (pure_doc_value && sub_col.IsArray() && !sub_col.Empty() &&
sub_col[0].IsString()) {
is_nested_str = true;
} else if (pure_doc_value && sub_col.IsArray()) {
return Status::InternalError(ERROR_INVALID_COL_DATA, "BOOLEAN");
}

const rapidjson::Value& str_col = is_nested_str ? sub_col[0] : sub_col;

const std::string& val = str_col.GetString();
size_t val_size = str_col.GetStringLength();
StringParser::ParseResult result;
bool b = StringParser::string_to_bool(val.c_str(), val_size, &result);
RETURN_ERROR_IF_PARSING_FAILED(result, str_col, type);
array.push_back(b);
break;
}
// date/datetime v2 is the default type for catalog table,
// see https://github.com/apache/doris/pull/16304
// No need to support date and datetime types.
case TYPE_DATEV2: {
uint32_t data;
RETURN_IF_ERROR(
(get_date_int<vectorized::DateV2Value<vectorized::DateV2ValueType>,
uint32_t>(sub_col, sub_type, pure_doc_value, &data,
time_zone)));
array.push_back(data);
break;
}
case TYPE_DATETIMEV2: {
uint64_t data;
RETURN_IF_ERROR(
(get_date_int<vectorized::DateV2Value<vectorized::DateTimeV2ValueType>,
uint64_t>(sub_col, sub_type, pure_doc_value, &data,
time_zone)));
array.push_back(data);
break;
}
default: {
LOG(ERROR) << "Do not support Array type: " << sub_type;
break;
}
}
}
RETURN_IF_ERROR(parse_column(col, sub_type, pure_doc_value, array, time_zone));
col_ptr->insert(array);
break;
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/es/es_scroll_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class ScrollParser {
int get_size() const;

private:
Status parse_column(const rapidjson::Value& col, PrimitiveType sub_type, bool pure_doc_value,
vectorized::Array& array, const cctz::time_zone& time_zone);
std::string _scroll_id;
int _size;
rapidjson::SizeType _line_index;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ void SegmentWriter::_serialize_block_to_row_column(vectorized::Block& block) {
// 3. set columns to data convertor and then write all columns
Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* block,
size_t row_pos, size_t num_rows) {
if (block->columns() <= _tablet_schema->num_key_columns() ||
if (block->columns() < _tablet_schema->num_key_columns() ||
block->columns() >= _tablet_schema->num_columns()) {
return Status::InternalError(
fmt::format("illegal partial update block columns: {}, num key columns: {}, total "
Expand Down
2 changes: 0 additions & 2 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -851,8 +851,6 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
pair.first.to_string(), max_rowset->end_version());
}
}
std::vector<RowsetSharedPtr> empty_vec;
new_tablet->modify_rowsets(empty_vec, rowsets_to_delete);
new_tablet->delete_rowsets(rowsets_to_delete, false);
// inherit cumulative_layer_point from base_tablet
// check if new_tablet.ce_point > base_tablet.ce_point?
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
{"name": "Andy", "sports": "soccer"}
{"name": "Betty", "sports": "pingpong ball"}
{"name": "Cindy", "sports": "武术"}
{"name": "David", "sports": ["volleyball"]}
{"name": "Emily", "sports": ["baseball", "golf", "hockey"]}
{"name": "Frank", "sports": ["rugby", "cricket", "boxing"]}
{"name": "Grace", "sports": ["table tennis", "badminton", "athletics"]}
{"name": "Henry", "sports": ["archery", "fencing", "weightlifting"]}
{"name": "Ivy", "sports": ["judo", "karate", "taekwondo"]}
{"name": "Jack", "sports": ["wrestling", "gymnastics", "surfing"]}
{"name": "Andy", "sports": "soccer", "scores": 100}
{"name": "Betty", "sports": "pingpong ball", "scores": 90}
{"name": "Cindy", "sports": "武术", "scores": 89}
{"name": "David", "sports": ["volleyball"], "scores": [77]}
{"name": "Emily", "sports": ["baseball", "golf", "hockey"], "scores": [56, 78, 99]}
{"name": "Frank", "sports": ["rugby", "cricket", "boxing"], "scores": [45, 67, 88]}
{"name": "Grace", "sports": ["table tennis", "badminton", "athletics"], "scores": [34, 56, 78]}
{"name": "Henry", "sports": ["archery", "fencing", "weightlifting"], "scores": [23, 45, 67]}
{"name": "Ivy", "sports": ["judo", "karate", "taekwondo"], "scores": [12, 34, 56]}
{"name": "Jack", "sports": ["wrestling", "gymnastics", "surfing"], "scores": [1, 23, 45]}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"_meta": {
"doris":{
"array_fields":[
"sports"
"sports",
"scores"
]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
"doc": {
"properties": {
"name": { "type": "keyword" },
"sports": { "type": "keyword", "doc_values": false}
"sports": { "type": "keyword", "doc_values": false},
"scores": { "type": "integer", "doc_values": false}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
"mappings": {
"properties": {
"name": { "type": "keyword" },
"sports": { "type": "keyword", "doc_values": false}
"sports": { "type": "keyword", "doc_values": false},
"scores": { "type": "integer", "doc_values": false}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,6 @@ public String toSql(OlapTable table, List<Long> partitionId) {
}
sb.append(")");

Optional.ofNullable(this.idToStoragePolicy.get(entry.getKey())).ifPresent(p -> {
if (!p.equals("")) {
sb.append("PROPERTIES (\"STORAGE POLICY\" = \"");
sb.append(p).append("\")");
}
});

if (partitionId != null) {
partitionId.add(entry.getKey());
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,6 @@ public String toSql(OlapTable table, List<Long> partitionId) {
sb.append(range.lowerEndpoint().toSql());
sb.append(", ").append(range.upperEndpoint().toSql()).append(")");

Optional.ofNullable(this.idToStoragePolicy.get(entry.getKey())).ifPresent(p -> {
if (!p.equals("")) {
sb.append("PROPERTIES (\"STORAGE POLICY\" = \"");
sb.append(p).append("\")");
}
});

if (partitionId != null) {
partitionId.add(entry.getKey());
break;
Expand Down
Loading

0 comments on commit f7b275e

Please sign in to comment.