Skip to content

Commit

Permalink
try to enhance performance
Browse files Browse the repository at this point in the history
  • Loading branch information
TangSiyang2001 committed Aug 3, 2023
1 parent bf6819e commit 8e4b51d
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 66 deletions.
24 changes: 18 additions & 6 deletions be/src/vec/exec/format/csv/csv_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,15 @@ Status CsvReader::init_reader(bool is_load) {
}
_text_converter->set_escape_char(_escape);

_text_line_reader_ctx = std::make_shared<CsvLineReaderContext>(
_line_delimiter, _line_delimiter_length, _value_separator, _value_separator_length,
_file_slot_descs.size() - 1, _enclose, _escape);
if (_enclose == 0) {
_text_line_reader_ctx = std::make_shared<CsvLineReaderContext>(
_line_delimiter, _line_delimiter_length, _value_separator, _value_separator_length,
_file_slot_descs.size() - 1);
} else {
_text_line_reader_ctx = std::make_shared<EncloseCsvLineReaderContext>(
_line_delimiter, _line_delimiter_length, _value_separator, _value_separator_length,
_file_slot_descs.size() - 1, _enclose, _escape);
}

//get array delimiter
_array_delimiter = _params.file_attributes.text_params.array_delimiter;
Expand Down Expand Up @@ -651,9 +657,15 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) {
// _decompressor may be nullptr if this is not a compressed file
RETURN_IF_ERROR(_create_decompressor());

_text_line_reader_ctx = std::make_shared<CsvLineReaderContext>(
_line_delimiter, _line_delimiter_length, _value_separator, _value_separator_length,
_file_slot_descs.size() - 1, _enclose, _escape);
if (_enclose == 0) {
_text_line_reader_ctx = std::make_shared<CsvLineReaderContext>(
_line_delimiter, _line_delimiter_length, _value_separator, _value_separator_length,
_file_slot_descs.size() - 1);
} else {
_text_line_reader_ctx = std::make_shared<EncloseCsvLineReaderContext>(
_line_delimiter, _line_delimiter_length, _value_separator, _value_separator_length,
_file_slot_descs.size() - 1, _enclose, _escape);
}

_line_reader =
NewPlainTextLineReader::create_unique(_profile, _file_reader, _decompressor.get(),
Expand Down
104 changes: 67 additions & 37 deletions be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@

#include <gen_cpp/Metrics_types.h>
#include <glog/logging.h>
#include <string.h>

#include <algorithm>
#include <cstddef>
#include <memory>
#include <ostream>

#include "common/compiler_util.h"
#include "exec/decompressor.h"
#include "gutil/strings/memutil.h"
#include "io/fs/file_reader.h"
Expand All @@ -43,7 +43,23 @@

namespace doris {

size_t CsvLineReaderContext::_extend_reading_range(const uint8_t* start) {
const uint8_t* CsvLineReaderContext::read_line(const uint8_t* start, const size_t length) {
_total_len = length;
size_t bound = update_reading_bound(start);
read_line_impl(start, bound);
return _result;
}

void CsvLineReaderContext::read_line_impl(const uint8_t* start, size_t& bound) {
const uint8_t* curr_start;
size_t curr_len;
do {
curr_start = start + _idx;
curr_len = bound - _idx;
} while (look_for_column_sep(curr_start, curr_len));
}

size_t CsvLineReaderContext::update_reading_bound(const uint8_t* start) {
_result = (uint8_t*)memmem(start + _idx, _total_len - _idx, line_delimiter.c_str(),
line_delimiter_len);
size_t len;
Expand All @@ -55,76 +71,89 @@ size_t CsvLineReaderContext::_extend_reading_range(const uint8_t* start) {
return len;
}

const uint8_t* CsvLineReaderContext::read_line(const uint8_t* start, const size_t total_len) {
_total_len = total_len;
template <bool SingleChar>
const uint8_t* CsvLineReaderContext::look_for_column_sep_pos(const uint8_t* curr_start,
size_t curr_len,
const char* column_sep,
size_t column_sep_len) {
const uint8_t* col_sep_pos;
if constexpr (SingleChar) {
col_sep_pos = (uint8_t*)memchr(curr_start, column_sep[0], curr_len);
} else {
col_sep_pos = (uint8_t*)memmem(curr_start, curr_len, column_sep, column_sep_len);
}
return col_sep_pos;
}

template const uint8_t* CsvLineReaderContext::look_for_column_sep_pos<true>(
const uint8_t* curr_start, size_t curr_len, const char* column_sep, size_t column_sep_len);
template const uint8_t* CsvLineReaderContext::look_for_column_sep_pos<false>(
const uint8_t* curr_start, size_t curr_len, const char* column_sep, size_t column_sep_len);

size_t len = _extend_reading_range(start);
bool CsvLineReaderContext::look_for_column_sep(const uint8_t* curr_start, size_t curr_len) {
const uint8_t* col_sep_pos =
find_col_sep_func(curr_start, curr_len, _column_sep.c_str(), _column_sep_len);

if (col_sep_pos != nullptr) [[likely]] {
const size_t forward_distance = col_sep_pos - curr_start;
_column_sep_positions.push_back(_idx + forward_distance);
_idx += (forward_distance + _column_sep_len);
return true;
}
return false;
}

while (_idx != len) {
void EncloseCsvLineReaderContext::read_line_impl(const uint8_t* start, size_t& bound) {
while (_idx != bound) {
switch (_state.curr_state) {
case ReaderState::START: {
on_start(start, len);
_on_start(start, bound);
break;
}
case ReaderState::NORMAL: {
on_normal(start, len);
_on_normal(start, bound);
break;
}
case ReaderState::PRE_MATCH_ENCLOSE: {
on_pre_match_enclose(start, len);
_on_pre_match_enclose(start, bound);
break;
}
case ReaderState::MATCH_ENCLOSE: {
on_match_enclose(start, len);
_on_match_enclose(start, bound);
break;
}
}
}
return _result;
}

void CsvLineReaderContext::on_start(const uint8_t* start, size_t& len) {
if (UNLIKELY(start[_idx] == _enclose)) {
_left_enclose_pos = _idx;
void EncloseCsvLineReaderContext::_on_start(const uint8_t* start, size_t& len) {
if (start[_idx] == _enclose) [[unlikely]] {
_state.forward_to(ReaderState::PRE_MATCH_ENCLOSE);
++_idx;
} else {
_state.forward_to(ReaderState::NORMAL);
}
}

void CsvLineReaderContext::on_normal(const uint8_t* start, size_t& len) {
void EncloseCsvLineReaderContext::_on_normal(const uint8_t* start, size_t& len) {
const uint8_t* curr_start = start + _idx;
size_t curr_len = len - _idx;
if (LIKELY(_look_for_column_sep(curr_start, curr_len))) {
if (look_for_column_sep(curr_start, curr_len)) [[likely]] {
_state.forward_to(ReaderState::START);
return;
}
_idx = len;
}

bool CsvLineReaderContext::_look_for_column_sep(const uint8_t* curr_start, size_t curr_len) {
const uint8_t* col_sep_pos =
(uint8_t*)memmem(curr_start, curr_len, _column_sep.c_str(), _column_sep_len);
if (LIKELY(col_sep_pos != nullptr)) {
const size_t forward_distance = col_sep_pos - curr_start;
_idx += forward_distance;
_column_sep_positions.push_back(_idx);
_idx += _column_sep_len;
return true;
}
return false;
}

void CsvLineReaderContext::on_pre_match_enclose(const uint8_t* start, size_t& len) {
void EncloseCsvLineReaderContext::_on_pre_match_enclose(const uint8_t* start, size_t& len) {
bool should_escape = false;
do {
do {
if (start[_idx] == _escape) [[unlikely]] {
should_escape = !should_escape;
} else if (UNLIKELY(should_escape)) {
} else if (should_escape) [[unlikely]] {
should_escape = false;
} else if (UNLIKELY(start[_idx] == _enclose)) {
} else if (start[_idx] == _enclose) [[unlikely]] {
_state.forward_to(ReaderState::MATCH_ENCLOSE);
++_idx;
return;
Expand All @@ -133,15 +162,16 @@ void CsvLineReaderContext::on_pre_match_enclose(const uint8_t* start, size_t& le
} while (_idx != len);

if (_idx != _total_len) {
len = _extend_reading_range(start);
continue;
len = update_reading_bound(start);
} else {
break;
}
} while (false);
} while (true);
}

void CsvLineReaderContext::on_match_enclose(const uint8_t* start, size_t& len) {
void EncloseCsvLineReaderContext::_on_match_enclose(const uint8_t* start, size_t& len) {
const uint8_t* curr_start = start + _idx;
if (LIKELY(_look_for_column_sep(curr_start, _column_sep_len))) {
if (look_for_column_sep(curr_start, _column_sep_len)) [[likely]] {
_state.forward_to(ReaderState::START);
return;
}
Expand Down
79 changes: 56 additions & 23 deletions be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
// under the License.

#pragma once

#include <stdint.h>

#include <cstddef>
#include <functional>
#include <memory>
#include <string>
#include <vector>
Expand Down Expand Up @@ -76,8 +76,8 @@ class PlainTexLineReaderCtx : public TextLineReaderContextIf {
const size_t line_delimiter_len_)
: line_delimiter(line_delimiter_), line_delimiter_len(line_delimiter_len_) {}

inline const uint8_t* read_line(const uint8_t* start, const size_t len) override {
return (uint8_t*)memmem(start, len, line_delimiter.c_str(), line_delimiter_len);
inline const uint8_t* read_line(const uint8_t* start, const size_t length) override {
return (uint8_t*)memmem(start, length, line_delimiter.c_str(), line_delimiter_len);
}

[[nodiscard]] inline size_t line_delimiter_length() const override {
Expand All @@ -91,25 +91,31 @@ class PlainTexLineReaderCtx : public TextLineReaderContextIf {
const size_t line_delimiter_len;
};

class CsvLineReaderContext final : public PlainTexLineReaderCtx {
class CsvLineReaderContext : public PlainTexLineReaderCtx {
public:
explicit CsvLineReaderContext(const std::string& line_delimiter_,
const size_t line_delimiter_len_, const std::string& column_sep_,
const size_t column_sep_len_, const size_t column_sep_num,
const char enclose, const char escape)
const size_t column_sep_len_, const size_t column_sep_num_)
: PlainTexLineReaderCtx(line_delimiter_, line_delimiter_len_),
_enclose(enclose),
_escape(escape),
_column_sep(column_sep_),
_column_sep_len(column_sep_len_) {
_column_sep_positions.reserve(column_sep_num);
_column_sep_len(column_sep_len_),
_is_single_char_col_sep(column_sep_len_ == 1) {
_column_sep_positions.reserve(column_sep_num_);
if (column_sep_len_ == 1) {
find_col_sep_func = std::bind(&CsvLineReaderContext::look_for_column_sep_pos<true>,
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3, std::placeholders::_4);
} else {
find_col_sep_func = std::bind(&CsvLineReaderContext::look_for_column_sep_pos<false>,
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3, std::placeholders::_4);
}
}

const uint8_t* read_line(const uint8_t* start, const size_t len) override;
const uint8_t* read_line(const uint8_t* start, const size_t length) override;

inline void refresh() override {
_idx = 0;
_left_enclose_pos = 0;
_state.reset();
_result = nullptr;
_column_sep_positions.clear();
Expand All @@ -120,31 +126,58 @@ class CsvLineReaderContext final : public PlainTexLineReaderCtx {
}

protected:
void on_start(const uint8_t* start, size_t& len);
void on_normal(const uint8_t* start, size_t& len);
void on_pre_match_enclose(const uint8_t* start, size_t& len);
void on_match_enclose(const uint8_t* start, size_t& len);
virtual void read_line_impl(const uint8_t* start, size_t& bound);

private:
bool _look_for_column_sep(const uint8_t* curr_start, size_t curr_len);
bool _look_for_line_delim(const uint8_t* curr_start, size_t curr_len);
size_t _extend_reading_range(const uint8_t* start);
size_t update_reading_bound(const uint8_t* start);

bool look_for_column_sep(const uint8_t* curr_start, size_t curr_len);

std::function<const uint8_t*(const uint8_t*, size_t, const char*, size_t)> find_col_sep_func;

template <bool SingleChar>
static const uint8_t* look_for_column_sep_pos(const uint8_t* curr_start, size_t curr_len,
const char* column_sep, size_t column_sep_len);

const char _enclose;
const char _escape;
const std::string _column_sep;
const size_t _column_sep_len;
const bool _is_single_char_col_sep;

size_t _total_len;

size_t _idx = 0;
size_t _left_enclose_pos = 0;
ReaderStateWrapper _state;
const uint8_t* _result = nullptr;
// record the start pos of each column sep
std::vector<size_t> _column_sep_positions;
};

class EncloseCsvLineReaderContext final : public CsvLineReaderContext {
public:
explicit EncloseCsvLineReaderContext(const std::string& line_delimiter_,
const size_t line_delimiter_len_,
const std::string& column_sep_,
const size_t column_sep_len_, const size_t column_sep_num_,
const char enclose, const char escape)
: CsvLineReaderContext(line_delimiter_, line_delimiter_len_, column_sep_,
column_sep_len_, column_sep_num_),
_enclose(enclose),
_escape(escape) {
_column_sep_positions.reserve(column_sep_num_);
}

protected:
void read_line_impl(const uint8_t* start, size_t& bound) override;

private:
void _on_start(const uint8_t* start, size_t& len);
void _on_normal(const uint8_t* start, size_t& len);
void _on_pre_match_enclose(const uint8_t* start, size_t& len);
void _on_match_enclose(const uint8_t* start, size_t& len);

const char _enclose;
const char _escape;
};

using TextLineReaderCtxPtr = std::shared_ptr<TextLineReaderContextIf>;

class NewPlainTextLineReader : public LineReader {
Expand Down

0 comments on commit 8e4b51d

Please sign in to comment.