Skip to content

Commit

Permalink
[fix](parquet)Fix the be core issue when reading parquet unsigned typ…
Browse files Browse the repository at this point in the history
…es. (#39926) (#40123)

bp #39926
  • Loading branch information
hubgeter authored Aug 29, 2024
1 parent 841565a commit a7156ee
Show file tree
Hide file tree
Showing 11 changed files with 673 additions and 26 deletions.
18 changes: 16 additions & 2 deletions be/src/vec/exec/format/parquet/parquet_column_convert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ ColumnPtr PhysicalToLogicalConverter::get_physical_column(tparquet::Type::type s
src_physical_type = tparquet::Type::INT32;
src_logical_type = TypeDescriptor(PrimitiveType::TYPE_INT);
}
if (is_consistent() && _logical_converter->is_consistent()) {

if (!_convert_params->is_type_compatibility && is_consistent() &&
_logical_converter->is_consistent()) {
if (_cached_src_physical_type == nullptr) {
_cached_src_physical_type = DataTypeFactory::instance().create_data_type(
src_logical_type, dst_logical_type->is_nullable());
Expand Down Expand Up @@ -246,7 +248,19 @@ std::unique_ptr<PhysicalToLogicalConverter> PhysicalToLogicalConverter::get_conv
}
PrimitiveType src_logical_primitive = src_logical_type.type;

if (is_parquet_native_type(src_logical_primitive)) {
if (field_schema->is_type_compatibility) {
if (src_logical_type == TYPE_SMALLINT) {
physical_converter.reset(new UnsignedIntegerConverter<TYPE_SMALLINT>());
} else if (src_logical_type == TYPE_INT) {
physical_converter.reset(new UnsignedIntegerConverter<TYPE_INT>());
} else if (src_logical_type == TYPE_BIGINT) {
physical_converter.reset(new UnsignedIntegerConverter<TYPE_BIGINT>());
} else if (src_logical_type == TYPE_LARGEINT) {
physical_converter.reset(new UnsignedIntegerConverter<TYPE_LARGEINT>());
} else {
physical_converter.reset(new UnsupportedConverter(src_physical_type, src_logical_type));
}
} else if (is_parquet_native_type(src_logical_primitive)) {
if (is_string_type(src_logical_primitive) &&
src_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
// for FixedSizeBinary
Expand Down
65 changes: 65 additions & 0 deletions be/src/vec/exec/format/parquet/parquet_column_convert.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ struct ConvertParams {
DecimalScaleParams decimal_scale;
FieldSchema* field_schema = nullptr;

//For UInt8 -> Int16,UInt16 -> Int32,UInt32 -> Int64,UInt64 -> Int128.
bool is_type_compatibility = false;

/**
* Some frameworks like paimon maybe writes non-standard parquet files. Timestamp field doesn't have
* logicalType or converted_type to indicates its precision. We have to reset the time mask.
Expand Down Expand Up @@ -108,6 +111,7 @@ struct ConvertParams {
t.from_unixtime(0, *ctz);
offset_days = t.day() == 31 ? -1 : 0;
}
is_type_compatibility = field_schema_->is_type_compatibility;
}

template <typename DecimalPrimitiveType>
Expand Down Expand Up @@ -273,6 +277,67 @@ class LittleIntPhysicalConverter : public PhysicalToLogicalConverter {
}
};

template <PrimitiveType type>
struct UnsignedTypeTraits;

template <>
struct UnsignedTypeTraits<TYPE_SMALLINT> {
using UnsignedCppType = UInt8;
//https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#unsigned-integers
//INT(8, false), INT(16, false), and INT(32, false) must annotate an int32 primitive type and INT(64, false)
//must annotate an int64 primitive type.
using StorageCppType = Int32;
using StorageColumnType = vectorized::ColumnInt32;
};

template <>
struct UnsignedTypeTraits<TYPE_INT> {
using UnsignedCppType = UInt16;
using StorageCppType = Int32;
using StorageColumnType = vectorized::ColumnInt32;
};

template <>
struct UnsignedTypeTraits<TYPE_BIGINT> {
using UnsignedCppType = UInt32;
using StorageCppType = Int32;
using StorageColumnType = vectorized::ColumnInt32;
};

template <>
struct UnsignedTypeTraits<TYPE_LARGEINT> {
using UnsignedCppType = UInt64;
using StorageCppType = Int64;
using StorageColumnType = vectorized::ColumnInt64;
};

template <PrimitiveType IntPrimitiveType>
class UnsignedIntegerConverter : public PhysicalToLogicalConverter {
Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override {
using UnsignedCppType = typename UnsignedTypeTraits<IntPrimitiveType>::UnsignedCppType;
using StorageCppType = typename UnsignedTypeTraits<IntPrimitiveType>::StorageCppType;
using StorageColumnType = typename UnsignedTypeTraits<IntPrimitiveType>::StorageColumnType;
using DstColumnType = typename PrimitiveTypeTraits<IntPrimitiveType>::ColumnType;

ColumnPtr from_col = remove_nullable(src_physical_col);
MutableColumnPtr to_col = remove_nullable(src_logical_column)->assume_mutable();
auto& src_data = static_cast<const StorageColumnType*>(from_col.get())->get_data();

size_t rows = src_data.size();
size_t start_idx = to_col->size();
to_col->resize(start_idx + rows);
auto& data = static_cast<DstColumnType&>(*to_col.get()).get_data();

for (int i = 0; i < rows; i++) {
StorageCppType src_value = src_data[i];
auto unsigned_value = static_cast<UnsignedCppType>(src_value);
data[start_idx + i] = unsigned_value;
}

return Status::OK();
}
};

class FixedSizeBinaryConverter : public PhysicalToLogicalConverter {
private:
int _type_length;
Expand Down
58 changes: 40 additions & 18 deletions be/src/vec/exec/format/parquet/schema_desc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,16 +191,19 @@ void FieldDescriptor::parse_physical_field(const tparquet::SchemaElement& physic
physical_field->physical_type = physical_schema.type;
_physical_fields.push_back(physical_field);
physical_field->physical_column_index = _physical_fields.size() - 1;
physical_field->type = get_doris_type(physical_schema);
auto type = get_doris_type(physical_schema);
physical_field->type = type.first;
physical_field->is_type_compatibility = type.second;
}

TypeDescriptor FieldDescriptor::get_doris_type(const tparquet::SchemaElement& physical_schema) {
TypeDescriptor type;
type.type = INVALID_TYPE;
std::pair<TypeDescriptor, bool> FieldDescriptor::get_doris_type(
const tparquet::SchemaElement& physical_schema) {
std::pair<TypeDescriptor, bool> ans = {INVALID_TYPE, false};
TypeDescriptor& type = ans.first;
if (physical_schema.__isset.logicalType) {
type = convert_to_doris_type(physical_schema.logicalType);
ans = convert_to_doris_type(physical_schema.logicalType);
} else if (physical_schema.__isset.converted_type) {
type = convert_to_doris_type(physical_schema);
ans = convert_to_doris_type(physical_schema);
}
// use physical type instead
if (type.type == INVALID_TYPE) {
Expand Down Expand Up @@ -233,7 +236,7 @@ TypeDescriptor FieldDescriptor::get_doris_type(const tparquet::SchemaElement& ph
break;
}
}
return type;
return ans;
}

// Copy from org.apache.iceberg.avro.AvroSchemaUtil#validAvroName
Expand Down Expand Up @@ -302,8 +305,11 @@ void FieldDescriptor::iceberg_sanitize(const std::vector<std::string>& read_colu
}
}

TypeDescriptor FieldDescriptor::convert_to_doris_type(tparquet::LogicalType logicalType) {
TypeDescriptor type;
std::pair<TypeDescriptor, bool> FieldDescriptor::convert_to_doris_type(
tparquet::LogicalType logicalType) {
std::pair<TypeDescriptor, bool> ans = {INVALID_TYPE, false};
TypeDescriptor& type = ans.first;
bool& is_type_compatibility = ans.second;
if (logicalType.__isset.STRING) {
type = TypeDescriptor(TYPE_STRING);
} else if (logicalType.__isset.DECIMAL) {
Expand All @@ -313,16 +319,25 @@ TypeDescriptor FieldDescriptor::convert_to_doris_type(tparquet::LogicalType logi
type = TypeDescriptor(TYPE_DATEV2);
} else if (logicalType.__isset.INTEGER) {
if (logicalType.INTEGER.isSigned) {
if (logicalType.INTEGER.bitWidth <= 32) {
if (logicalType.INTEGER.bitWidth <= 8) {
type = TypeDescriptor(TYPE_TINYINT);
} else if (logicalType.INTEGER.bitWidth <= 16) {
type = TypeDescriptor(TYPE_SMALLINT);
} else if (logicalType.INTEGER.bitWidth <= 32) {
type = TypeDescriptor(TYPE_INT);
} else {
type = TypeDescriptor(TYPE_BIGINT);
}
} else {
if (logicalType.INTEGER.bitWidth <= 16) {
is_type_compatibility = true;
if (logicalType.INTEGER.bitWidth <= 8) {
type = TypeDescriptor(TYPE_SMALLINT);
} else if (logicalType.INTEGER.bitWidth <= 16) {
type = TypeDescriptor(TYPE_INT);
} else {
} else if (logicalType.INTEGER.bitWidth <= 32) {
type = TypeDescriptor(TYPE_BIGINT);
} else {
type = TypeDescriptor(TYPE_LARGEINT);
}
}
} else if (logicalType.__isset.TIME) {
Expand All @@ -344,12 +359,14 @@ TypeDescriptor FieldDescriptor::convert_to_doris_type(tparquet::LogicalType logi
} else {
type = TypeDescriptor(INVALID_TYPE);
}
return type;
return ans;
}

TypeDescriptor FieldDescriptor::convert_to_doris_type(
std::pair<TypeDescriptor, bool> FieldDescriptor::convert_to_doris_type(
const tparquet::SchemaElement& physical_schema) {
TypeDescriptor type;
std::pair<TypeDescriptor, bool> ans = {INVALID_TYPE, false};
TypeDescriptor& type = ans.first;
bool& is_type_compatibility = ans.second;
switch (physical_schema.converted_type) {
case tparquet::ConvertedType::type::UTF8:
type = TypeDescriptor(TYPE_STRING);
Expand Down Expand Up @@ -378,28 +395,33 @@ TypeDescriptor FieldDescriptor::convert_to_doris_type(
type = TypeDescriptor(TYPE_TINYINT);
break;
case tparquet::ConvertedType::type::UINT_8:
is_type_compatibility = true;
[[fallthrough]];
case tparquet::ConvertedType::type::INT_16:
type = TypeDescriptor(TYPE_SMALLINT);
break;
case tparquet::ConvertedType::type::UINT_16:
is_type_compatibility = true;
[[fallthrough]];
case tparquet::ConvertedType::type::INT_32:
type = TypeDescriptor(TYPE_INT);
break;
case tparquet::ConvertedType::type::UINT_32:
[[fallthrough]];
case tparquet::ConvertedType::type::UINT_64:
is_type_compatibility = true;
[[fallthrough]];
case tparquet::ConvertedType::type::INT_64:
type = TypeDescriptor(TYPE_BIGINT);
break;
case tparquet::ConvertedType::type::UINT_64:
is_type_compatibility = true;
type = TypeDescriptor(TYPE_LARGEINT);
break;
default:
LOG(WARNING) << "Not supported parquet ConvertedType: " << physical_schema.converted_type;
type = TypeDescriptor(INVALID_TYPE);
break;
}
return type;
return ans;
}

Status FieldDescriptor::parse_group_field(const std::vector<tparquet::SchemaElement>& t_schemas,
Expand Down
10 changes: 7 additions & 3 deletions be/src/vec/exec/format/parquet/schema_desc.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ struct FieldSchema {
int16_t repeated_parent_def_level = 0;
std::vector<FieldSchema> children;

//For UInt8 -> Int16,UInt16 -> Int32,UInt32 -> Int64,UInt64 -> Int128.
bool is_type_compatibility = false;

FieldSchema() = default;
~FieldSchema() = default;
FieldSchema(const FieldSchema& fieldSchema) = default;
Expand Down Expand Up @@ -84,12 +87,13 @@ class FieldDescriptor {
Status parse_node_field(const std::vector<tparquet::SchemaElement>& t_schemas, size_t curr_pos,
FieldSchema* node_field);

TypeDescriptor convert_to_doris_type(tparquet::LogicalType logicalType);
std::pair<TypeDescriptor, bool> convert_to_doris_type(tparquet::LogicalType logicalType);

TypeDescriptor convert_to_doris_type(const tparquet::SchemaElement& physical_schema);
std::pair<TypeDescriptor, bool> convert_to_doris_type(
const tparquet::SchemaElement& physical_schema);

public:
TypeDescriptor get_doris_type(const tparquet::SchemaElement& physical_schema);
std::pair<TypeDescriptor, bool> get_doris_type(const tparquet::SchemaElement& physical_schema);

// org.apache.iceberg.avro.AvroSchemaUtil#sanitize will encode special characters,
// we have to decode these characters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,12 @@ id int Yes false \N NONE
9 1 string 27 false 5 true 1

-- !desc_s3 --
__add_5 int Yes false \N NONE
__bit_or_7 int Yes false \N NONE
__add_5 smallint Yes false \N NONE
__bit_or_7 tinyint Yes false \N NONE
__cast_3 bigint Yes false \N NONE
__greater_than_4 boolean Yes false \N NONE
__in_predicate_6 boolean Yes false \N NONE
__literal_1 int Yes false \N NONE
__literal_1 tinyint Yes false \N NONE
__literal_2 text Yes false \N NONE
id int Yes false \N NONE

Expand Down
Loading

0 comments on commit a7156ee

Please sign in to comment.