Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature](load) refactor CSV reading process during scanning, and support enclose and escape for stream load #22382

Closed
wants to merge 14 commits into from
Closed
2 changes: 1 addition & 1 deletion be/src/exec/line_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace doris {
namespace io {
class IOContext;
}
// This class is used for CSV scanner, to read content line by line
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add docs for enclose and escape.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll make it when when it is sure to be correct

// This class is used to read content line by line
class LineReader {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Split this PR into two PRS:
one pr for refactoring, the other PR for adding support enclose and escape for stream load.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to.

public:
virtual ~LineReader() = default;
Expand Down
6 changes: 6 additions & 0 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,12 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
if (!http_req->header(HTTP_LINE_DELIMITER).empty()) {
request.__set_line_delimiter(http_req->header(HTTP_LINE_DELIMITER));
}
if (!http_req->header(HTTP_ENCLOSE).empty() && http_req->header(HTTP_ENCLOSE).size() > 0) {
request.__set_enclose(http_req->header(HTTP_ENCLOSE)[0]);
}
if (!http_req->header(HTTP_ESCAPE).empty() && http_req->header(HTTP_ESCAPE).size() > 0) {
request.__set_escape(http_req->header(HTTP_ESCAPE)[0]);
}
if (!http_req->header(HTTP_PARTITIONS).empty()) {
request.__set_partitions(http_req->header(HTTP_PARTITIONS));
request.__set_isTempPartition(false);
Expand Down
2 changes: 2 additions & 0 deletions be/src/http/http_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ static const std::string HTTP_COLUMNS = "columns";
static const std::string HTTP_WHERE = "where";
static const std::string HTTP_COLUMN_SEPARATOR = "column_separator";
static const std::string HTTP_LINE_DELIMITER = "line_delimiter";
static const std::string HTTP_ENCLOSE = "enclose";
static const std::string HTTP_ESCAPE = "escape";
static const std::string HTTP_MAX_FILTER_RATIO = "max_filter_ratio";
static const std::string HTTP_TIMEOUT = "timeout";
static const std::string HTTP_PARTITIONS = "partitions";
Expand Down
173 changes: 63 additions & 110 deletions be/src/vec/exec/format/csv/csv_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
#include <glog/logging.h>

#include <algorithm>
#include <cstddef>
#include <map>
#include <memory>
#include <new>
#include <ostream>
#include <utility>
Expand Down Expand Up @@ -199,6 +201,12 @@ Status CsvReader::init_reader(bool is_load) {
_value_separator_length = _value_separator.size();
_line_delimiter = _params.file_attributes.text_params.line_delimiter;
_line_delimiter_length = _line_delimiter.size();
if (_params.file_attributes.text_params.__isset.enclose) {
_enclose = _params.file_attributes.text_params.enclose;
}
if (_params.file_attributes.text_params.__isset.escape) {
_escape = _params.file_attributes.text_params.escape;
}

//get array delimiter
_array_delimiter = _params.file_attributes.text_params.array_delimiter;
Expand All @@ -212,6 +220,10 @@ Status CsvReader::init_reader(bool is_load) {
// _decompressor may be nullptr if this is not a compressed file
RETURN_IF_ERROR(_create_decompressor());

_text_line_reader_ctx = std::make_shared<CsvLineReaderContext>(
_line_delimiter, _line_delimiter_length, _value_separator, _value_separator_length,
_enclose, _escape);

switch (_file_format_type) {
case TFileFormatType::FORMAT_CSV_PLAIN:
[[fallthrough]];
Expand All @@ -224,9 +236,9 @@ Status CsvReader::init_reader(bool is_load) {
case TFileFormatType::FORMAT_CSV_LZOP:
[[fallthrough]];
case TFileFormatType::FORMAT_CSV_DEFLATE:
_line_reader = NewPlainTextLineReader::create_unique(
_profile, _file_reader, _decompressor.get(), _size, _line_delimiter,
_line_delimiter_length, start_offset);
_line_reader =
NewPlainTextLineReader::create_unique(_profile, _file_reader, _decompressor.get(),
_text_line_reader_ctx, _size, start_offset);

break;
case TFileFormatType::FORMAT_PROTO:
Expand Down Expand Up @@ -453,11 +465,7 @@ Status CsvReader::_line_split_to_values(const Slice& line, bool* success) {
}
}

if (_value_separator_length == 1) {
_split_line_for_single_char_delimiter(line);
} else {
_split_line(line);
}
_split_line(line);

if (_is_load) {
// Only check for load task. For query task, the non exist column will be filled "null".
Expand Down Expand Up @@ -499,114 +507,49 @@ void CsvReader::_split_line_for_proto_format(const Slice& line) {
}
}

void CsvReader::_split_line_for_single_char_delimiter(const Slice& line) {
void CsvReader::_split_line(const Slice& line) {
_split_values.clear();
if (_file_format_type == TFileFormatType::FORMAT_PROTO) {
_split_line_for_proto_format(line);
} else {
const char* value = line.data;
size_t cur_pos = 0;
size_t start_field = 0;
const size_t size = line.size;
for (; cur_pos < size; ++cur_pos) {
if (*(value + cur_pos) == _value_separator[0]) {
size_t non_space = cur_pos;
if (_state != nullptr && _state->trim_tailing_spaces_for_external_table_query()) {
while (non_space > start_field && *(value + non_space - 1) == ' ') {
non_space--;
}
}
if (_trim_double_quotes && non_space > (start_field + 1) &&
*(value + start_field) == '\"' && *(value + non_space - 1) == '\"') {
start_field++;
non_space--;
}
_split_values.emplace_back(value + start_field, non_space - start_field);
start_field = cur_pos + 1;
}
}
return;
}

CHECK(cur_pos == line.size) << cur_pos << " vs " << line.size;
size_t non_space = cur_pos;
if (_state != nullptr && _state->trim_tailing_spaces_for_external_table_query()) {
while (non_space > start_field && *(value + non_space - 1) == ' ') {
non_space--;
}
}
if (_trim_double_quotes && non_space > (start_field + 1) &&
*(value + start_field) == '\"' && *(value + non_space - 1) == '\"') {
start_field++;
non_space--;
}
_split_values.emplace_back(value + start_field, non_space - start_field);
const char* data = line.data;
auto& column_sep_positions = _text_line_reader_ctx->column_sep_positions();
size_t value_start_offset = 0;
for (auto idx : column_sep_positions) {
_process_value_field(data, value_start_offset, idx - value_start_offset);
value_start_offset = idx + _value_separator_length;
}
// process the last column
_process_value_field(data, value_start_offset, line.size - value_start_offset);
}

void CsvReader::_split_line(const Slice& line) {
_split_values.clear();
if (_file_format_type == TFileFormatType::FORMAT_PROTO) {
_split_line_for_proto_format(line);
} else {
const char* value = line.data;
size_t start = 0; // point to the start pos of next col value.
size_t curpos = 0; // point to the start pos of separator matching sequence.
size_t p1 = 0; // point to the current pos of separator matching sequence.
size_t non_space = 0; // point to the last pos of non_space charactor.

// Separator: AAAA
//
// p1
// ▼
// AAAA
// 1000AAAA2000AAAA
// ▲ ▲
// Start │
// curpos

while (curpos < line.size) {
if (curpos + p1 == line.size || *(value + curpos + p1) != _value_separator[p1]) {
// Not match, move forward:
curpos += (p1 == 0 ? 1 : p1);
p1 = 0;
} else {
p1++;
if (p1 == _value_separator_length) {
// Match a separator
non_space = curpos;
// Trim tailing spaces. Be consistent with hive and trino's behavior.
if (_state != nullptr &&
_state->trim_tailing_spaces_for_external_table_query()) {
while (non_space > start && *(value + non_space - 1) == ' ') {
non_space--;
}
}
if (_trim_double_quotes && (non_space - 1) > start &&
*(value + start) == '\"' && *(value + non_space - 1) == '\"') {
start++;
non_space--;
}
_split_values.emplace_back(value + start, non_space - start);
start = curpos + _value_separator_length;
curpos = start;
p1 = 0;
non_space = 0;
}
}
void CsvReader::_process_value_field(const char* data, size_t start_offset, size_t value_len) {
const bool should_trim_tailing_spaces =
(_state != nullptr && _state->trim_tailing_spaces_for_external_table_query());
if (should_trim_tailing_spaces) {
while (value_len > 0 && *(data + start_offset + value_len - 1) == ' ') {
--value_len;
}
}

CHECK(curpos == line.size) << curpos << " vs " << line.size;
non_space = curpos;
if (_state != nullptr && _state->trim_tailing_spaces_for_external_table_query()) {
while (non_space > start && *(value + non_space - 1) == ' ') {
non_space--;
}
}
if (_trim_double_quotes && (non_space - 1) > start && *(value + start) == '\"' &&
*(value + non_space - 1) == '\"') {
start++;
non_space--;
}
_split_values.emplace_back(value + start, non_space - start);
// `should_not_trim` is to manage the case that: user do not expect to trim double quotes but enclose is double quotes
const bool should_not_trim = !_trim_double_quotes && _enclose == '\"';
if (!should_not_trim) {
_trim_ends(data, &start_offset, &value_len, _enclose);
_trim_ends(data, &start_offset, &value_len, '\"');
}
_split_values.emplace_back(data + start_offset, value_len);
}

void CsvReader::_trim_ends(const char* data, size_t* start_offset, size_t* value_len,
const char c) const {
const bool trim_cond = *value_len > 1 && *(data + *start_offset) == c &&
*(data + *start_offset + *value_len - 1) == c;
if (trim_cond) {
++(*start_offset);
*value_len -= 2;
}
}

Expand Down Expand Up @@ -688,6 +631,12 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) {
_value_separator_length = _value_separator.size();
_line_delimiter = _params.file_attributes.text_params.line_delimiter;
_line_delimiter_length = _line_delimiter.size();
if (_params.file_attributes.text_params.__isset.enclose) {
_enclose = _params.file_attributes.text_params.enclose;
}
if (_params.file_attributes.text_params.__isset.escape) {
_escape = _params.file_attributes.text_params.escape;
}

//get array delimiter
_array_delimiter = _params.file_attributes.text_params.array_delimiter;
Expand All @@ -697,9 +646,13 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) {
// _decompressor may be nullptr if this is not a compressed file
RETURN_IF_ERROR(_create_decompressor());

_line_reader = NewPlainTextLineReader::create_unique(
_profile, _file_reader, _decompressor.get(), _size, _line_delimiter,
_line_delimiter_length, start_offset);
_text_line_reader_ctx = std::make_shared<CsvLineReaderContext>(
_line_delimiter, _line_delimiter_length, _value_separator, _value_separator_length,
_enclose, _escape);

_line_reader =
NewPlainTextLineReader::create_unique(_profile, _file_reader, _decompressor.get(),
_text_line_reader_ctx, _size, start_offset);

return Status::OK();
}
Expand Down
7 changes: 6 additions & 1 deletion be/src/vec/exec/format/csv/csv_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "io/fs/file_reader_writer_fwd.h"
#include "util/slice.h"
#include "vec/data_types/data_type.h"
#include "vec/exec/format/file_reader/new_plain_text_line_reader.h"
#include "vec/exec/format/generic_reader.h"

namespace doris {
Expand Down Expand Up @@ -87,7 +88,6 @@ class CsvReader : public GenericReader {
std::vector<MutableColumnPtr>& columns, size_t* rows);
Status _line_split_to_values(const Slice& line, bool* success);
void _split_line(const Slice& line);
void _split_line_for_single_char_delimiter(const Slice& line);
void _split_line_for_proto_format(const Slice& line);
Status _check_array_format(std::vector<Slice>& split_values, bool* is_success);
bool _is_null(const Slice& slice);
Expand All @@ -102,6 +102,8 @@ class CsvReader : public GenericReader {
Status _parse_col_names(std::vector<std::string>* col_names);
// TODO(ftw): parse type
Status _parse_col_types(size_t col_nums, std::vector<TypeDescriptor>* col_types);
void _process_value_field(const char* data, size_t start_offset, size_t value_len);
void _trim_ends(const char* data, size_t* start_offset, size_t* value_len, const char c) const;

RuntimeState* _state;
RuntimeProfile* _profile;
Expand All @@ -127,6 +129,7 @@ class CsvReader : public GenericReader {
std::shared_ptr<io::FileSystem> _file_system;
io::FileReaderSPtr _file_reader;
std::unique_ptr<LineReader> _line_reader;
std::shared_ptr<CsvLineReaderContext> _text_line_reader_ctx;
bool _line_reader_eof;
std::unique_ptr<TextConverter> _text_converter;
std::unique_ptr<Decompressor> _decompressor;
Expand All @@ -143,6 +146,8 @@ class CsvReader : public GenericReader {
std::string _value_separator;
std::string _line_delimiter;
std::string _array_delimiter;
char _enclose = '\"';
char _escape = '\\';

int _value_separator_length;
int _line_delimiter_length;
Expand Down
Loading
Loading