Skip to content

Commit

Permalink
try to enhance the performance
Browse files Browse the repository at this point in the history
  • Loading branch information
TangSiyang2001 committed Aug 2, 2023
1 parent 2cf0dbe commit 98934ea
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 125 deletions.
16 changes: 8 additions & 8 deletions be/src/vec/exec/format/csv/csv_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ CsvReader::CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounte
}
_size = _range.size;

_split_values.reserve(sizeof(Slice) * _file_slot_descs.size());
_split_values.reserve(_file_slot_descs.size());
_init_system_properties();
_init_file_description();
}
Expand Down Expand Up @@ -211,7 +211,7 @@ Status CsvReader::init_reader(bool is_load) {

_text_line_reader_ctx = std::make_shared<CsvLineReaderContext>(
_line_delimiter, _line_delimiter_length, _value_separator, _value_separator_length,
_enclose, _escape);
_file_slot_descs.size() - 1, _enclose, _escape);

//get array delimiter
_array_delimiter = _params.file_attributes.text_params.array_delimiter;
Expand All @@ -220,6 +220,8 @@ Status CsvReader::init_reader(bool is_load) {
if (_params.file_attributes.__isset.trim_double_quotes) {
_trim_double_quotes = _params.file_attributes.trim_double_quotes;
}
bool _trim_tailing_spaces =
(_state != nullptr && _state->trim_tailing_spaces_for_external_table_query());

// create decompressor.
// _decompressor may be nullptr if this is not a compressed file
Expand Down Expand Up @@ -424,8 +426,8 @@ Status CsvReader::_fill_dest_columns(const Slice& line, Block* block,
col_idx < _split_values.size() ? _split_values[col_idx] : _s_null_slice;
// For load task, we always read "string" from file, so use "write_string_column"
// TODO(tsy): use escape = true
_text_converter->write_string_column(src_slot_desc, &columns[i], value.data,
value.size, true);
_text_converter->write_string_column(src_slot_desc, &columns[i], value.data, value.size,
_escape != 0);
}
} else {
// if _split_values.size > _file_slot_descs.size()
Expand Down Expand Up @@ -528,9 +530,7 @@ void CsvReader::_split_line(const Slice& line) {
}

void CsvReader::_process_value_field(const char* data, size_t start_offset, size_t value_len) {
const bool should_trim_tailing_spaces =
(_state != nullptr && _state->trim_tailing_spaces_for_external_table_query());
if (should_trim_tailing_spaces) {
if (_trim_tailing_spaces) {
while (value_len > 0 && *(data + start_offset + value_len - 1) == ' ') {
--value_len;
}
Expand Down Expand Up @@ -653,7 +653,7 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) {

_text_line_reader_ctx = std::make_shared<CsvLineReaderContext>(
_line_delimiter, _line_delimiter_length, _value_separator, _value_separator_length,
_enclose, _escape);
_file_slot_descs.size() - 1, _enclose, _escape);

_line_reader =
NewPlainTextLineReader::create_unique(_profile, _file_reader, _decompressor.get(),
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/format/csv/csv_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ class CsvReader : public GenericReader {
int _value_separator_length;
int _line_delimiter_length;
bool _trim_double_quotes = false;
bool _trim_tailing_spaces = false;

io::IOContext* _io_ctx;

Expand Down
172 changes: 58 additions & 114 deletions be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
#include <glog/logging.h>

#include <algorithm>
#include <cstddef>
#include <memory>
#include <ostream>

#include "common/compiler_util.h"
#include "exec/decompressor.h"
#include "gutil/strings/memutil.h"
#include "io/fs/file_reader.h"
#include "util/slice.h"

Expand Down Expand Up @@ -56,6 +58,10 @@ const uint8_t* CsvLineReaderContext::read_line(const uint8_t* start, const size_
on_match_column_sep(start, len);
break;
}
case ReaderState::FOUND_LINE: {
on_found_line(start, len);
break;
}
case ReaderState::PRE_MATCH_ENCLOSE: {
on_pre_match_enclose(start, len);
break;
Expand All @@ -76,10 +82,6 @@ const uint8_t* CsvLineReaderContext::read_line(const uint8_t* start, const size_
on_match_escape(start, len);
break;
}
case ReaderState::FOUND_LINE: {
on_found_line(start, len);
break;
}
}
}
if (UNLIKELY(_state == ReaderState::FOUND_LINE && _delimiter_match_len == line_delimiter_len)) {
Expand All @@ -90,139 +92,81 @@ const uint8_t* CsvLineReaderContext::read_line(const uint8_t* start, const size_
}

void CsvLineReaderContext::on_start(const uint8_t* start, size_t len) {
DCHECK_EQ(_delimiter_match_len, 0);
if (start[_idx] == _enclose) {
if (UNLIKELY(start[_idx] == _enclose)) {
_left_enclose_pos = _idx - 1;
_state.forward_to(ReaderState::PRE_MATCH_ENCLOSE);
} else if (UNLIKELY(start[_idx] == _column_sep[0])) {
++_delimiter_match_len;
_state.forward_to(ReaderState::MATCH_COLUMN_SEP);
} else if (UNLIKELY(start[_idx] == line_delimiter[0])) {
++_delimiter_match_len;
_state.forward_to(ReaderState::FOUND_LINE);
++_idx;
} else {
_state.forward_to(ReaderState::NORMAL);
}
++_idx;
}

void CsvLineReaderContext::on_normal(const uint8_t* start, size_t len) {
DCHECK_EQ(_delimiter_match_len, 0);
if (UNLIKELY(start[_idx] == _column_sep[0])) {
++_delimiter_match_len;
_state.forward_to(ReaderState::MATCH_COLUMN_SEP);
} else if (UNLIKELY(start[_idx] == line_delimiter[0])) {
++_delimiter_match_len;
_state.forward_to(ReaderState::FOUND_LINE);
const uint8_t* curr_start = start + _idx;
const size_t curr_len = len - _idx;
if (LIKELY(_look_for_column_sep(curr_start, curr_len))) {
return;
}
++_idx;
}

void CsvLineReaderContext::on_match_column_sep(const uint8_t* start, size_t len) {
DCHECK_NE(_delimiter_match_len, 0);
if (_delimiter_match_len == _column_sep_len) {
// found colum sep
_column_sep_positions.push_back(_idx - _column_sep_len);
_delimiter_match_len = 0;
_state.forward_to(ReaderState::START);
} else if (start[_idx] == _column_sep[_delimiter_match_len]) {
// matching multi-char col sep
DCHECK_GT(_column_sep_len, 1);
++_delimiter_match_len;
++_idx;
if (_look_for_line_delim(curr_start, curr_len)) {
_idx = _result - start + line_delimiter_len;
} else {
// not found
_delimiter_match_len = 0;
_state.forward_to(ReaderState::NORMAL);
_idx = len;
}
}

void CsvLineReaderContext::on_pre_match_enclose(const uint8_t* start, size_t len) {
if (_state.prev_state == ReaderState::START) {
DCHECK_GT(_idx, 0);
_left_enclose_pos = _idx - 1;
_state.forward_to(_state.curr_state);
return;
}
if (start[_idx] == _escape) {
_state.forward_to(ReaderState::MATCH_ESCAPE);
} else if (start[_idx] == _enclose) {
_state.forward_to(ReaderState::MATCH_ENCLOSE);
bool CsvLineReaderContext::_look_for_column_sep(const uint8_t* curr_start, size_t curr_len) {
const uint8_t* col_sep_pos =
(uint8_t*)memmem(curr_start, curr_len, _column_sep.c_str(), _column_sep_len);
if (LIKELY(col_sep_pos != nullptr)) {
const size_t forward_distance = col_sep_pos - curr_start;
_idx += forward_distance;
_column_sep_positions.push_back(_idx);
_idx += _column_sep_len;
_state.forward_to(ReaderState::START);
return true;
}
++_idx;
return false;
}

void CsvLineReaderContext::on_match_enclose(const uint8_t* start, size_t len) {
if (LIKELY(start[_idx] == _column_sep[0])) {
_state.forward_to(ReaderState::POST_ENCLOSE_COLUMN_SEP);
++_delimiter_match_len;
} else if (start[_idx] == line_delimiter[0]) {
_state.forward_to(ReaderState::POST_ENCLOSE_LINE_DELIMITER);
++_delimiter_match_len;
} else {
// unfortunately, meet corner case(suppose `,` is delimiter and `"` is enclose): ,"part1"part2,
// will reset to left enclose idx to do parse in nornal state
_delimiter_match_len = 0;
_state.forward_to(ReaderState::NORMAL);
_idx = _left_enclose_pos;
return;
bool CsvLineReaderContext::_look_for_line_delim(const uint8_t* curr_start, size_t curr_len) {
const uint8_t* line_delimiter_pos =
(uint8_t*)memmem(curr_start, curr_len, line_delimiter.c_str(), line_delimiter_len);
if (line_delimiter_pos == nullptr) {
return false;
}
++_idx;
}

void CsvLineReaderContext::on_match_escape(const uint8_t* start, size_t len) {
_state.forward_to(_state.prev_state);
++_idx;
_result = line_delimiter_pos;
return true;
}

void CsvLineReaderContext::on_post_match_enclose_column_sep(const uint8_t* start, size_t len) {
if (_delimiter_match_len == _column_sep_len) {
_column_sep_positions.push_back(_idx - _column_sep_len);
_delimiter_match_len = 0;
_state.forward_to(ReaderState::START);
} else if (start[_idx] == _column_sep[_delimiter_match_len]) {
DCHECK_GT(_column_sep_len, 1);
++_delimiter_match_len;
void CsvLineReaderContext::on_pre_match_enclose(const uint8_t* start, size_t len) {
bool should_escape = false;
do {
if (UNLIKELY(start[_idx] == _escape)) {
should_escape = !should_escape;
} else if (UNLIKELY(should_escape)) {
should_escape = false;
} else if (UNLIKELY(start[_idx] == _enclose)) {
_state.forward_to(ReaderState::MATCH_ENCLOSE);
++_idx;
return;
}
++_idx;
} else {
// unfortunately, meet corner case(suppose `,` is delimiter and `"` is enclose): ,"part1"part2,
// will reset to left enclose idx to do parse in nornal state
_delimiter_match_len = 0;
_state.forward_to(ReaderState::NORMAL);
_idx = _left_enclose_pos;
}
} while (_idx != len);
}

void CsvLineReaderContext::on_post_match_enclose_line_delimiter(const uint8_t* start, size_t len) {
if (_delimiter_match_len == line_delimiter_len) {
_result = start + _idx - line_delimiter_len;
} else if (start[_idx] == line_delimiter[_delimiter_match_len]) {
DCHECK_GT(line_delimiter_len, 1);
++_delimiter_match_len;
++_idx;
} else {
// unfortunately, meet corner case(suppose `,` is delimiter and `"` is enclose): ,"part1"part2,
// will reset to left enclose idx to do parse in nornal state
_delimiter_match_len = 0;
_state.forward_to(ReaderState::NORMAL);
_idx = _left_enclose_pos;
void CsvLineReaderContext::on_match_enclose(const uint8_t* start, size_t len) {
const uint8_t* curr_start = start + _idx;
if (_look_for_column_sep(curr_start, _column_sep_len)) {
return;
}
}

void CsvLineReaderContext::on_found_line(const uint8_t* start, size_t len) {
DCHECK_NE(_delimiter_match_len, 0);
if (_delimiter_match_len == line_delimiter_len) {
// found line delimiter
_result = start + _idx - line_delimiter_len;
_delimiter_match_len = 0;
} else if (start[_idx] == line_delimiter[_delimiter_match_len]) {
// matching multi-char line delimiter
DCHECK_GT(line_delimiter_len, 1);
++_delimiter_match_len;
++_idx;
} else {
_delimiter_match_len = 0;
_state.forward_to(ReaderState::NORMAL);
if (_look_for_line_delim(curr_start, line_delimiter_len)) {
_idx = _result - start + line_delimiter_len;
return;
}
// unfortunately, meet corner case(suppose `,` is delimiter and `"` is enclose): ,"part1"part2,
// will reset to left enclose idx to do parse in nornal state
_idx = _left_enclose_pos;
_state.forward_to(ReaderState::NORMAL);
}

NewPlainTextLineReader::NewPlainTextLineReader(RuntimeProfile* profile,
Expand Down
13 changes: 10 additions & 3 deletions be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,16 @@ class CsvLineReaderContext final : public PlainTexLineReaderCtx {
public:
explicit CsvLineReaderContext(const std::string& line_delimiter_,
const size_t line_delimiter_len_, const std::string& column_sep_,
const size_t column_sep_len_, const char enclose,
const char escape)
const size_t column_sep_num, const size_t column_sep_len_,
const char enclose, const char escape)
: PlainTexLineReaderCtx(line_delimiter_, line_delimiter_len_),
_enclose(enclose),
_escape(escape),
_column_sep(column_sep_),
_column_sep_len(column_sep_len_) {}
_column_sep_len(column_sep_len_),
_column_sep_num(column_sep_num) {
_column_sep_positions.reserve(column_sep_num);
}

const uint8_t* read_line(const uint8_t* start, const size_t len) override;

Expand Down Expand Up @@ -140,10 +143,14 @@ class CsvLineReaderContext final : public PlainTexLineReaderCtx {
void on_found_line(const uint8_t* start, size_t len);

private:
bool _look_for_column_sep(const uint8_t* curr_start, size_t len);
bool _look_for_line_delim(const uint8_t* curr_start, size_t len);

const char _enclose;
const char _escape;
const std::string _column_sep;
const size_t _column_sep_len;
const size_t _column_sep_num;

size_t _idx = 0;
size_t _delimiter_match_len = 0;
Expand Down

0 comments on commit 98934ea

Please sign in to comment.