Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](hive) support null_format and escape_char for hive text #40291

Merged
merged 13 commits into from
Sep 27, 2024
3 changes: 3 additions & 0 deletions be/src/vec/data_types/serde/data_type_array_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ Status DataTypeArraySerDe::deserialize_one_cell_from_hive_text(
for (int idx = 0, start = 0; idx <= slice.size; idx++) {
char c = (idx == slice.size) ? collection_delimiter : slice[idx];
if (c == collection_delimiter) {
if (options.escape_char != 0 && idx > 0 && slice[idx - 1] == options.escape_char) {
continue;
}
slices.emplace_back(slice.data + start, idx - start);
start = idx + 1;
}
Expand Down
10 changes: 7 additions & 3 deletions be/src/vec/data_types/serde/data_type_map_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,17 @@ Status DataTypeMapSerDe::deserialize_one_cell_from_hive_text(
*
* So i use 'kv <= from' in order to get _map_kv_delimiter that appears first.
* */
if (i < slice.size && slice[i] == map_kv_delimiter && kv <= from) {
if (i < slice.size && slice[i] == map_kv_delimiter && kv <= from &&
(options.escape_char == 0 || i == 0 || slice[i - 1] != options.escape_char)) {
kv = i;
continue;
}
if ((i == slice.size || slice[i] == collection_delimiter) && i >= kv + 1) {
key_slices.push_back({slice.data + from, kv - from});
value_slices.push_back({slice.data + kv + 1, i - 1 - kv});
if (options.escape_char != 0 && i > 0 && slice[i - 1] == options.escape_char) {
continue;
}
key_slices.emplace_back(slice.data + from, kv - from);
value_slices.emplace_back(slice.data + kv + 1, i - 1 - kv);
from = i + 1;
kv = from;
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/data_types/serde/data_type_nullable_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ Status DataTypeNullableSerDe::serialize_one_cell_to_hive_text(

const auto& col_null = assert_cast<const ColumnNullable&>(*ptr);
if (col_null.is_null_at(row_num)) {
bw.write(NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str(), 2);
bw.write(options.null_format, options.null_len);
} else {
RETURN_IF_ERROR(nested_serde->serialize_one_cell_to_hive_text(
col_null.get_nested_column(), row_num, bw, options,
Expand All @@ -101,7 +101,7 @@ Status DataTypeNullableSerDe::deserialize_one_cell_from_hive_text(
IColumn& column, Slice& slice, const FormatOptions& options,
int hive_text_complex_type_delimiter_level) const {
auto& null_column = assert_cast<ColumnNullable&>(column);
if (slice.size == 2 && slice[0] == '\\' && slice[1] == 'N') {
if (slice.compare(Slice(options.null_format, options.null_len)) == 0) {
null_column.insert_data(nullptr, 0);
return Status::OK();
}
Expand Down
17 changes: 11 additions & 6 deletions be/src/vec/data_types/serde/data_type_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ class DataTypeSerDe {
bool converted_from_string = false;

char escape_char = 0;
/**
* flags for each byte to indicate if escape is needed.
*/
bool need_escape[256] = {false};

/**
* only used for export data
Expand All @@ -148,8 +152,8 @@ class DataTypeSerDe {
* NULL
* null
*/
const char* null_format;
int null_len;
const char* null_format = "\\N";
int null_len = 2;

/**
* The wrapper char for string type in nested type.
Expand All @@ -166,7 +170,7 @@ class DataTypeSerDe {
CHECK(0 <= hive_text_complex_type_delimiter_level &&
hive_text_complex_type_delimiter_level <= 153);

char ans = '\002';
char ans;
//https://github.com/apache/hive/blob/master/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySerDeParameters.java#L250
//use only control chars that are very unlikely to be part of the string
// the following might/likely to be used in text files for strings
Expand All @@ -175,8 +179,9 @@ class DataTypeSerDe {
// 12 (form feed, FF, \f, ^L),
// 13 (carriage return, CR, \r, ^M),
// 27 (escape, ESC, \e [GCC only], ^[).

if (hive_text_complex_type_delimiter_level == 1) {
if (hive_text_complex_type_delimiter_level == 0) {
ans = field_delim[0];
} else if (hive_text_complex_type_delimiter_level == 1) {
ans = collection_delim;
} else if (hive_text_complex_type_delimiter_level == 2) {
ans = map_key_delim;
Expand All @@ -192,7 +197,7 @@ class DataTypeSerDe {
} else if (hive_text_complex_type_delimiter_level <= 25) {
// [22, 25] -> [28, 31]
ans = hive_text_complex_type_delimiter_level + 6;
} else if (hive_text_complex_type_delimiter_level <= 153) {
} else {
// [26, 153] -> [-128, -1]
ans = hive_text_complex_type_delimiter_level + (-26 - 128);
}
Expand Down
42 changes: 42 additions & 0 deletions be/src/vec/data_types/serde/data_type_string_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,27 @@ class DataTypeStringSerDeBase : public DataTypeSerDe {
return Status::OK();
}

Status serialize_one_cell_to_hive_text(
const IColumn& column, int row_num, BufferWritable& bw, FormatOptions& options,
int hive_text_complex_type_delimiter_level = 1) const override {
auto result = check_column_const_set_readability(column, row_num);
ColumnPtr ptr = result.first;
row_num = result.second;
const auto& value = assert_cast<const ColumnType&>(*ptr).get_data_at(row_num);
if constexpr (std::is_same_v<ColumnType, ColumnString>) {
if (options.escape_char != 0) {
StringRef str_ref = value;
write_with_escaped_char_to_hive_text(str_ref, bw, options.escape_char,
options.need_escape);
} else {
bw.write(value.data, value.size);
}
} else {
bw.write(value.data, value.size);
}
return Status::OK();
}

inline void write_with_escaped_char_to_json(StringRef value, BufferWritable& bw) const {
for (char it : value) {
switch (it) {
Expand Down Expand Up @@ -126,6 +147,17 @@ class DataTypeStringSerDeBase : public DataTypeSerDe {
}
}

inline void write_with_escaped_char_to_hive_text(StringRef value, BufferWritable& bw,
char escape_char,
const bool need_escape[]) const {
for (char it : value) {
if (need_escape[it & 0xff]) {
bw.write(escape_char);
}
bw.write(it);
}
}

Status serialize_column_to_json(const IColumn& column, int start_idx, int end_idx,
BufferWritable& bw, FormatOptions& options) const override {
SERIALIZE_COLUMN_TO_JSON();
Expand Down Expand Up @@ -154,6 +186,16 @@ class DataTypeStringSerDeBase : public DataTypeSerDe {
return Status::OK();
}

Status deserialize_one_cell_from_hive_text(
IColumn& column, Slice& slice, const FormatOptions& options,
int hive_text_complex_type_delimiter_level = 1) const override {
if (options.escape_char != 0) {
escape_string(slice.data, slice.size, options.escape_char);
}
assert_cast<ColumnType&>(column).insert_data(slice.data, slice.size);
return Status::OK();
}

Status deserialize_column_from_json_vector(IColumn& column, std::vector<Slice>& slices,
int* num_deserialized,
const FormatOptions& options) const override {
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/data_types/serde/data_type_struct_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,9 @@ Status DataTypeStructSerDe::deserialize_one_cell_from_hive_text(
char* data = slice.data;
for (size_t i = 0, from = 0; i <= slice.size; i++) {
if (i == slice.size || data[i] == struct_delimiter) {
if (options.escape_char != 0 && i > 0 && data[i - 1] == options.escape_char) {
continue;
}
slices.push_back({data + from, i - from});
from = i + 1;
}
Expand Down
34 changes: 31 additions & 3 deletions be/src/vec/exec/format/csv/csv_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,23 @@ void PlainCsvTextFieldSplitter::do_split(const Slice& line, std::vector<Slice>*
}
}

void HiveCsvTextFieldSplitter::do_split(const Slice& line, std::vector<Slice>* splitted_values) {
suxiaogang223 marked this conversation as resolved.
Show resolved Hide resolved
const char* data = line.data;
const size_t size = line.size;
size_t value_start = 0;
for (size_t i = 0; i < size; ++i) {
if (data[i] == _value_sep[0]) {
// hive will escape the field separator in string
if (_escape_char != 0 && i > 0 && data[i - 1] == _escape_char) {
continue;
}
process_value_func(data, value_start, i - value_start, _trimming_char, splitted_values);
value_start = i + _value_sep_len;
}
}
process_value_func(data, value_start, size - value_start, _trimming_char, splitted_values);
}

CsvReader::CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter,
const TFileScanRangeParams& params, const TFileRangeDesc& range,
const std::vector<SlotDescriptor*>& file_slot_descs, io::IOContext* io_ctx)
Expand Down Expand Up @@ -352,6 +369,12 @@ Status CsvReader::init_reader(bool is_load) {
} else {
_options.map_key_delim = _params.file_attributes.text_params.mapkv_delimiter[0];
}

if (_params.file_attributes.text_params.__isset.null_format) {
_options.null_format = _params.file_attributes.text_params.null_format.data();
_options.null_len = _params.file_attributes.text_params.null_format.length();
}

_use_nullable_string_opt.resize(_file_slot_descs.size());
for (int i = 0; i < _file_slot_descs.size(); ++i) {
auto data_type_ptr = _file_slot_descs[i]->get_data_type_ptr();
Expand All @@ -376,9 +399,14 @@ Status CsvReader::init_reader(bool is_load) {
if (_enclose == 0) {
text_line_reader_ctx = std::make_shared<PlainTextLineReaderCtx>(
_line_delimiter, _line_delimiter_length, _keep_cr);

_fields_splitter = std::make_unique<PlainCsvTextFieldSplitter>(
_trim_tailing_spaces, false, _value_separator, _value_separator_length, -1);
if (_text_serde_type == TTextSerdeType::HIVE_TEXT_SERDE) {
_fields_splitter = std::make_unique<HiveCsvTextFieldSplitter>(
_trim_tailing_spaces, false, _value_separator, _value_separator_length, -1,
_escape);
} else {
_fields_splitter = std::make_unique<PlainCsvTextFieldSplitter>(
_trim_tailing_spaces, false, _value_separator, _value_separator_length, -1);
}
} else {
text_line_reader_ctx = std::make_shared<EncloseCsvLineReaderContext>(
_line_delimiter, _line_delimiter_length, _value_separator, _value_separator_length,
Expand Down
16 changes: 16 additions & 0 deletions be/src/vec/exec/format/csv/csv_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,22 @@ class PlainCsvTextFieldSplitter : public BaseCsvTextFieldSplitter<PlainCsvTextFi
std::string _value_sep;
};

class HiveCsvTextFieldSplitter : public BaseCsvTextFieldSplitter<HiveCsvTextFieldSplitter> {
public:
explicit HiveCsvTextFieldSplitter(bool trim_tailing_space, bool trim_ends,
const string& value_sep, size_t value_sep_len = 1,
char trimming_char = 0, char escape_char = 0)
: BaseCsvTextFieldSplitter(trim_tailing_space, trim_ends, value_sep_len, trimming_char),
_value_sep(value_sep),
_escape_char(escape_char) {}

void do_split(const Slice& line, std::vector<Slice>* splitted_values);

private:
std::string _value_sep;
char _escape_char;
};

class CsvReader : public GenericReader {
ENABLE_FACTORY_CREATOR(CsvReader);

Expand Down
15 changes: 13 additions & 2 deletions be/src/vec/runtime/vcsv_transformer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,21 @@ VCSVTransformer::VCSVTransformer(RuntimeState* state, doris::io::FileWriter* fil
}

if (_is_text_format) {
_options.field_delim = hive_serde_properties->field_delim;
_options.collection_delim = hive_serde_properties->collection_delim[0];
_options.map_key_delim = hive_serde_properties->mapkv_delim[0];
_options.escape_char = hive_serde_properties->escape_char[0];
_options.null_format = hive_serde_properties->null_format.c_str();
if (hive_serde_properties->__isset.escape_char) {
_options.escape_char = hive_serde_properties->escape_char[0];
}
_options.null_format = hive_serde_properties->null_format.data();
_options.null_len = hive_serde_properties->null_format.length();
// The list of separators + escapeChar are the bytes required to be escaped.
if (_options.escape_char != 0) {
_options.need_escape[_options.escape_char & 0xff] = true;
}
for (int i = 0; i <= 153; i++) {
_options.need_escape[_options.get_collection_delimiter(i) & 0xff] = true;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ CREATE TABLE IF NOT EXISTS `text_all_types`(
`t_decimal_precision_38` decimal(38,16),
`t_binary` binary
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION
'/user/doris/preinstalled_data/text/text_all_types';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,31 +88,21 @@ CREATE TABLE `serde_test7`(
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'escape.delim' = '|'
)
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';

CREATE TABLE `serde_test8`(
`id` int,
`name` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'field.delim' = 'a',
'escape.delim' = '|',
'serialization.null.format' = 'null'
)
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';

CREATE TABLE `serde_test8` like `serde_test7`;

insert into serde_test1 values(1, "abc"),(2, "def");
insert into serde_test2 values(1, "abc"),(2, "def");
insert into serde_test3 values(1, "abc"),(2, "def");
insert into serde_test4 values(1, "abc"),(2, "def");
insert into serde_test5 values(1, "abc"),(2, "def");
insert into serde_test6 values(1, "abc"),(2, "def");
insert into serde_test7 values(1, "abc"),(2, "def");
insert into serde_test8 values(1, "abc"),(2, "def");
insert into serde_test7 values(1, null),(2, "|||"),(3, "aaa"),(4, "\"null\"");
Original file line number Diff line number Diff line change
Expand Up @@ -462,21 +462,24 @@ protected TFileAttributes getFileAttributes() throws UserException {
if (serdeParams.containsKey(PROP_QUOTE_CHAR)) {
textParams.setEnclose(serdeParams.get(PROP_QUOTE_CHAR).getBytes()[0]);
}

// TODO: support escape char and null format in csv_reader
Optional<String> escapeChar = HiveMetaStoreClientHelper.getSerdeProperty(hmsTable.getRemoteTable(),
// 6. set escape delimiter
Optional<String> escapeDelim = HiveMetaStoreClientHelper.getSerdeProperty(hmsTable.getRemoteTable(),
PROP_ESCAPE_DELIMITER);
if (escapeChar.isPresent() && !escapeChar.get().equals(DEFAULT_ESCAPE_DELIMIER)) {
throw new UserException(
"not support serde prop " + PROP_ESCAPE_DELIMITER + " in hive text reading");
if (escapeDelim.isPresent()) {
String escape = HiveMetaStoreClientHelper.getByte(
escapeDelim.get());
if (escape != null) {
textParams
.setEscape(escape.getBytes()[0]);
} else {
textParams.setEscape(DEFAULT_ESCAPE_DELIMIER.getBytes()[0]);
}
}

// 7. set null format
Optional<String> nullFormat = HiveMetaStoreClientHelper.getSerdeProperty(hmsTable.getRemoteTable(),
PROP_NULL_FORMAT);
if (nullFormat.isPresent() && !nullFormat.get().equals(DEFAULT_NULL_FORMAT)) {
throw new UserException(
"not support serde prop " + PROP_NULL_FORMAT + " in hive text reading");
}
textParams.setNullFormat(HiveMetaStoreClientHelper.firstPresentOrDefault(
DEFAULT_NULL_FORMAT, nullFormat));

TFileAttributes fileAttributes = new TFileAttributes();
fileAttributes.setTextParams(textParams);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,16 @@ private void setSerDeProperties(THiveTableSink tSink) {
// 5. set escape delimiter
Optional<String> escapeDelim = HiveMetaStoreClientHelper.getSerdeProperty(targetTable.getRemoteTable(),
PROP_ESCAPE_DELIMITER);
serDeProperties
.setEscapeChar(HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault(
DEFAULT_ESCAPE_DELIMIER, escapeDelim)));
if (escapeDelim.isPresent()) {
String escape = HiveMetaStoreClientHelper.getByte(
escapeDelim.get());
if (escape != null) {
serDeProperties
.setEscapeChar(escape);
} else {
serDeProperties.setEscapeChar(DEFAULT_ESCAPE_DELIMIER);
}
}
// 6. set null format
Optional<String> nullFormat = HiveMetaStoreClientHelper.getSerdeProperty(targetTable.getRemoteTable(),
PROP_NULL_FORMAT);
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/PlanNodes.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ struct TFileTextScanRangeParams {
4: optional string mapkv_delimiter;
5: optional i8 enclose;
6: optional i8 escape;
7: optional string null_format;
}

struct TFileScanSlotInfo {
Expand Down

Large diffs are not rendered by default.

Loading
Loading