From 98934eaa3e0f765338fee28de7f5c78382176adf Mon Sep 17 00:00:00 2001 From: tsy Date: Wed, 2 Aug 2023 10:06:03 +0800 Subject: [PATCH] try to enhance the performance --- be/src/vec/exec/format/csv/csv_reader.cpp | 16 +- be/src/vec/exec/format/csv/csv_reader.h | 1 + .../new_plain_text_line_reader.cpp | 172 ++++++------------ .../file_reader/new_plain_text_line_reader.h | 13 +- 4 files changed, 77 insertions(+), 125 deletions(-) diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index b798bc25f26a4c7..d0da5989cd4d580 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -98,7 +98,7 @@ CsvReader::CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounte } _size = _range.size; - _split_values.reserve(sizeof(Slice) * _file_slot_descs.size()); + _split_values.reserve(_file_slot_descs.size()); _init_system_properties(); _init_file_description(); } @@ -211,7 +211,7 @@ Status CsvReader::init_reader(bool is_load) { _text_line_reader_ctx = std::make_shared( _line_delimiter, _line_delimiter_length, _value_separator, _value_separator_length, - _enclose, _escape); + _file_slot_descs.size() - 1, _enclose, _escape); //get array delimiter _array_delimiter = _params.file_attributes.text_params.array_delimiter; @@ -220,6 +220,8 @@ Status CsvReader::init_reader(bool is_load) { if (_params.file_attributes.__isset.trim_double_quotes) { _trim_double_quotes = _params.file_attributes.trim_double_quotes; } + bool _trim_tailing_spaces = + (_state != nullptr && _state->trim_tailing_spaces_for_external_table_query()); // create decompressor. // _decompressor may be nullptr if this is not a compressed file @@ -424,8 +426,8 @@ Status CsvReader::_fill_dest_columns(const Slice& line, Block* block, col_idx < _split_values.size() ? _split_values[col_idx] : _s_null_slice; // For load task, we always read "string" from file, so use "write_string_column" // TODO(tsy): use escape = true - _text_converter->write_string_column(src_slot_desc, &columns[i], value.data, - value.size, true); + _text_converter->write_string_column(src_slot_desc, &columns[i], value.data, value.size, + _escape != 0); } } else { // if _split_values.size > _file_slot_descs.size() @@ -528,9 +530,7 @@ void CsvReader::_split_line(const Slice& line) { } void CsvReader::_process_value_field(const char* data, size_t start_offset, size_t value_len) { - const bool should_trim_tailing_spaces = - (_state != nullptr && _state->trim_tailing_spaces_for_external_table_query()); - if (should_trim_tailing_spaces) { + if (_trim_tailing_spaces) { while (value_len > 0 && *(data + start_offset + value_len - 1) == ' ') { --value_len; } @@ -653,7 +653,7 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) { _text_line_reader_ctx = std::make_shared( _line_delimiter, _line_delimiter_length, _value_separator, _value_separator_length, - _enclose, _escape); + _file_slot_descs.size() - 1, _enclose, _escape); _line_reader = NewPlainTextLineReader::create_unique(_profile, _file_reader, _decompressor.get(), diff --git a/be/src/vec/exec/format/csv/csv_reader.h b/be/src/vec/exec/format/csv/csv_reader.h index 5ee83823deb42b2..3ca421e8cc388a8 100644 --- a/be/src/vec/exec/format/csv/csv_reader.h +++ b/be/src/vec/exec/format/csv/csv_reader.h @@ -152,6 +152,7 @@ class CsvReader : public GenericReader { int _value_separator_length; int _line_delimiter_length; bool _trim_double_quotes = false; + bool _trim_tailing_spaces = false; io::IOContext* _io_ctx; diff --git a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp index afeeb418a83651a..cf32c79ed0df33b 100644 --- a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp +++ b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp @@ -21,11 +21,13 @@ #include #include +#include #include #include #include "common/compiler_util.h" #include "exec/decompressor.h" +#include "gutil/strings/memutil.h" #include "io/fs/file_reader.h" #include "util/slice.h" @@ -56,6 +58,10 @@ const uint8_t* CsvLineReaderContext::read_line(const uint8_t* start, const size_ on_match_column_sep(start, len); break; } + case ReaderState::FOUND_LINE: { + on_found_line(start, len); + break; + } case ReaderState::PRE_MATCH_ENCLOSE: { on_pre_match_enclose(start, len); break; @@ -76,10 +82,6 @@ const uint8_t* CsvLineReaderContext::read_line(const uint8_t* start, const size_ on_match_escape(start, len); break; } - case ReaderState::FOUND_LINE: { - on_found_line(start, len); - break; - } } } if (UNLIKELY(_state == ReaderState::FOUND_LINE && _delimiter_match_len == line_delimiter_len)) { @@ -90,139 +92,81 @@ const uint8_t* CsvLineReaderContext::read_line(const uint8_t* start, const size_ } void CsvLineReaderContext::on_start(const uint8_t* start, size_t len) { - DCHECK_EQ(_delimiter_match_len, 0); - if (start[_idx] == _enclose) { + if (UNLIKELY(start[_idx] == _enclose)) { + _left_enclose_pos = _idx - 1; _state.forward_to(ReaderState::PRE_MATCH_ENCLOSE); - } else if (UNLIKELY(start[_idx] == _column_sep[0])) { - ++_delimiter_match_len; - _state.forward_to(ReaderState::MATCH_COLUMN_SEP); - } else if (UNLIKELY(start[_idx] == line_delimiter[0])) { - ++_delimiter_match_len; - _state.forward_to(ReaderState::FOUND_LINE); + ++_idx; } else { _state.forward_to(ReaderState::NORMAL); } - ++_idx; } void CsvLineReaderContext::on_normal(const uint8_t* start, size_t len) { - DCHECK_EQ(_delimiter_match_len, 0); - if (UNLIKELY(start[_idx] == _column_sep[0])) { - ++_delimiter_match_len; - _state.forward_to(ReaderState::MATCH_COLUMN_SEP); - } else if (UNLIKELY(start[_idx] == line_delimiter[0])) { - ++_delimiter_match_len; - _state.forward_to(ReaderState::FOUND_LINE); + const uint8_t* curr_start = start + _idx; + const size_t curr_len = len - _idx; + if (LIKELY(_look_for_column_sep(curr_start, curr_len))) { + return; } - ++_idx; -} - -void CsvLineReaderContext::on_match_column_sep(const uint8_t* start, size_t len) { - DCHECK_NE(_delimiter_match_len, 0); - if (_delimiter_match_len == _column_sep_len) { - // found colum sep - _column_sep_positions.push_back(_idx - _column_sep_len); - _delimiter_match_len = 0; - _state.forward_to(ReaderState::START); - } else if (start[_idx] == _column_sep[_delimiter_match_len]) { - // matching multi-char col sep - DCHECK_GT(_column_sep_len, 1); - ++_delimiter_match_len; - ++_idx; + if (_look_for_line_delim(curr_start, curr_len)) { + _idx = _result - start + line_delimiter_len; } else { - // not found - _delimiter_match_len = 0; - _state.forward_to(ReaderState::NORMAL); + _idx = len; } } -void CsvLineReaderContext::on_pre_match_enclose(const uint8_t* start, size_t len) { - if (_state.prev_state == ReaderState::START) { - DCHECK_GT(_idx, 0); - _left_enclose_pos = _idx - 1; - _state.forward_to(_state.curr_state); - return; - } - if (start[_idx] == _escape) { - _state.forward_to(ReaderState::MATCH_ESCAPE); - } else if (start[_idx] == _enclose) { - _state.forward_to(ReaderState::MATCH_ENCLOSE); +bool CsvLineReaderContext::_look_for_column_sep(const uint8_t* curr_start, size_t curr_len) { + const uint8_t* col_sep_pos = + (uint8_t*)memmem(curr_start, curr_len, _column_sep.c_str(), _column_sep_len); + if (LIKELY(col_sep_pos != nullptr)) { + const size_t forward_distance = col_sep_pos - curr_start; + _idx += forward_distance; + _column_sep_positions.push_back(_idx); + _idx += _column_sep_len; + _state.forward_to(ReaderState::START); + return true; } - ++_idx; + return false; } -void CsvLineReaderContext::on_match_enclose(const uint8_t* start, size_t len) { - if (LIKELY(start[_idx] == _column_sep[0])) { - _state.forward_to(ReaderState::POST_ENCLOSE_COLUMN_SEP); - ++_delimiter_match_len; - } else if (start[_idx] == line_delimiter[0]) { - _state.forward_to(ReaderState::POST_ENCLOSE_LINE_DELIMITER); - ++_delimiter_match_len; - } else { - // unfortunately, meet corner case(suppose `,` is delimiter and `"` is enclose): ,"part1"part2, - // will reset to left enclose idx to do parse in nornal state - _delimiter_match_len = 0; - _state.forward_to(ReaderState::NORMAL); - _idx = _left_enclose_pos; - return; +bool CsvLineReaderContext::_look_for_line_delim(const uint8_t* curr_start, size_t curr_len) { + const uint8_t* line_delimiter_pos = + (uint8_t*)memmem(curr_start, curr_len, line_delimiter.c_str(), line_delimiter_len); + if (line_delimiter_pos == nullptr) { + return false; } - ++_idx; -} - -void CsvLineReaderContext::on_match_escape(const uint8_t* start, size_t len) { - _state.forward_to(_state.prev_state); - ++_idx; + _result = line_delimiter_pos; + return true; } -void CsvLineReaderContext::on_post_match_enclose_column_sep(const uint8_t* start, size_t len) { - if (_delimiter_match_len == _column_sep_len) { - _column_sep_positions.push_back(_idx - _column_sep_len); - _delimiter_match_len = 0; - _state.forward_to(ReaderState::START); - } else if (start[_idx] == _column_sep[_delimiter_match_len]) { - DCHECK_GT(_column_sep_len, 1); - ++_delimiter_match_len; +void CsvLineReaderContext::on_pre_match_enclose(const uint8_t* start, size_t len) { + bool should_escape = false; + do { + if (UNLIKELY(start[_idx] == _escape)) { + should_escape = !should_escape; + } else if (UNLIKELY(should_escape)) { + should_escape = false; + } else if (UNLIKELY(start[_idx] == _enclose)) { + _state.forward_to(ReaderState::MATCH_ENCLOSE); + ++_idx; + return; + } ++_idx; - } else { - // unfortunately, meet corner case(suppose `,` is delimiter and `"` is enclose): ,"part1"part2, - // will reset to left enclose idx to do parse in nornal state - _delimiter_match_len = 0; - _state.forward_to(ReaderState::NORMAL); - _idx = _left_enclose_pos; - } + } while (_idx != len); } -void CsvLineReaderContext::on_post_match_enclose_line_delimiter(const uint8_t* start, size_t len) { - if (_delimiter_match_len == line_delimiter_len) { - _result = start + _idx - line_delimiter_len; - } else if (start[_idx] == line_delimiter[_delimiter_match_len]) { - DCHECK_GT(line_delimiter_len, 1); - ++_delimiter_match_len; - ++_idx; - } else { - // unfortunately, meet corner case(suppose `,` is delimiter and `"` is enclose): ,"part1"part2, - // will reset to left enclose idx to do parse in nornal state - _delimiter_match_len = 0; - _state.forward_to(ReaderState::NORMAL); - _idx = _left_enclose_pos; +void CsvLineReaderContext::on_match_enclose(const uint8_t* start, size_t len) { + const uint8_t* curr_start = start + _idx; + if (_look_for_column_sep(curr_start, _column_sep_len)) { + return; } -} - -void CsvLineReaderContext::on_found_line(const uint8_t* start, size_t len) { - DCHECK_NE(_delimiter_match_len, 0); - if (_delimiter_match_len == line_delimiter_len) { - // found line delimiter - _result = start + _idx - line_delimiter_len; - _delimiter_match_len = 0; - } else if (start[_idx] == line_delimiter[_delimiter_match_len]) { - // matching multi-char line delimiter - DCHECK_GT(line_delimiter_len, 1); - ++_delimiter_match_len; - ++_idx; - } else { - _delimiter_match_len = 0; - _state.forward_to(ReaderState::NORMAL); + if (_look_for_line_delim(curr_start, line_delimiter_len)) { + _idx = _result - start + line_delimiter_len; + return; } + // unfortunately, meet corner case(suppose `,` is delimiter and `"` is enclose): ,"part1"part2, + // will reset to left enclose idx to do parse in nornal state + _idx = _left_enclose_pos; + _state.forward_to(ReaderState::NORMAL); } NewPlainTextLineReader::NewPlainTextLineReader(RuntimeProfile* profile, diff --git a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h index c8631835813c49d..8198556d615894f 100644 --- a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h +++ b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h @@ -105,13 +105,16 @@ class CsvLineReaderContext final : public PlainTexLineReaderCtx { public: explicit CsvLineReaderContext(const std::string& line_delimiter_, const size_t line_delimiter_len_, const std::string& column_sep_, - const size_t column_sep_len_, const char enclose, - const char escape) + const size_t column_sep_num, const size_t column_sep_len_, + const char enclose, const char escape) : PlainTexLineReaderCtx(line_delimiter_, line_delimiter_len_), _enclose(enclose), _escape(escape), _column_sep(column_sep_), - _column_sep_len(column_sep_len_) {} + _column_sep_len(column_sep_len_), + _column_sep_num(column_sep_num) { + _column_sep_positions.reserve(column_sep_num); + } const uint8_t* read_line(const uint8_t* start, const size_t len) override; @@ -140,10 +143,14 @@ class CsvLineReaderContext final : public PlainTexLineReaderCtx { void on_found_line(const uint8_t* start, size_t len); private: + bool _look_for_column_sep(const uint8_t* curr_start, size_t len); + bool _look_for_line_delim(const uint8_t* curr_start, size_t len); + const char _enclose; const char _escape; const std::string _column_sep; const size_t _column_sep_len; + const size_t _column_sep_num; size_t _idx = 0; size_t _delimiter_match_len = 0;