Skip to content

Commit

Permalink
[feature](load) refactor CSV reading process during scanning, and sup…
Browse files Browse the repository at this point in the history
…port enclose and escape for stream load (#22539)

 ## Proposed changes

Refactor thoughts: close #22383
Descriptions about `enclose` and `escape`: #22385

2023-08-09:
It's a pity that experiment shows that the original way for parsing plain CSV is faster. Therefor, the refactor is only applied on enclose related code. The plain CSV parser use the original logic.

Fallback of performance is unavoidable anyway. From the `CSV reader`'s perspective, the real weak point may be the write column behavior, proved by the flame graph.

Trimming escape will be enable after fix: #22411 is merged

Cases should be discussed:

1. When an incomplete enclose appears in the beginning of a large scale data, the line delimiter will be unreachable till the EOF, will the buffer become extremely large?
2. What if an infinite line occurs in the case? Essentially,  `1.` is equivalent to this.

Only support stream load as trial in this PR, avoid too many unrelated changes. Docs will be added when `enclose` and `escape` is available for all kinds of load.
  • Loading branch information
TangSiyang2001 authored and xiaokang committed Aug 17, 2023
1 parent a38de55 commit 8eac2be
Show file tree
Hide file tree
Showing 30 changed files with 836 additions and 171 deletions.
2 changes: 1 addition & 1 deletion be/src/exec/line_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace doris {
namespace io {
class IOContext;
}
// This class is used for CSV scanner, to read content line by line
// This class is used to read content line by line
class LineReader {
public:
virtual ~LineReader() = default;
Expand Down
5 changes: 4 additions & 1 deletion be/src/exec/text_converter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,12 @@ TextConverter::TextConverter(char escape_char, char collection_delimiter, char m

void TextConverter::write_string_column(const SlotDescriptor* slot_desc,
vectorized::MutableColumnPtr* column_ptr, const char* data,
size_t len) {
size_t len, bool need_escape) {
DCHECK(column_ptr->get()->is_nullable());
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(column_ptr->get());
if (need_escape) {
unescape_string_on_spot(data, &len);
}
if ((len == 2 && data[0] == '\\' && data[1] == 'N') || len == SQL_NULL_DATA) {
nullable_column->get_null_map_data().push_back(1);
reinterpret_cast<vectorized::ColumnString&>(nullable_column->get_nested_column())
Expand Down
14 changes: 11 additions & 3 deletions be/src/exec/text_converter.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

#pragma once

#include <stddef.h>
#include <cstddef>

#include "vec/columns/column.h"

Expand All @@ -33,9 +33,15 @@ class TextConverter {

TextConverter(char escape_char, char collection_delimiter = '\2', char map_kv_delimiter = '\3');

inline void write_string_column(const SlotDescriptor* slot_desc,
vectorized::MutableColumnPtr* column_ptr, const char* data,
size_t len) {
return write_string_column(slot_desc, column_ptr, data, len, false);
}

void write_string_column(const SlotDescriptor* slot_desc,
vectorized::MutableColumnPtr* column_ptr, const char* data,
size_t len);
vectorized::MutableColumnPtr* column_ptr, const char* data, size_t len,
bool need_escape);

inline bool write_column(const SlotDescriptor* slot_desc,
vectorized::MutableColumnPtr* column_ptr, const char* data, size_t len,
Expand All @@ -62,6 +68,8 @@ class TextConverter {
}
void set_map_kv_delimiter(char mapkv_delimiter) { _map_kv_delimiter = mapkv_delimiter; }

inline void set_escape_char(const char escape) { this->_escape_char = escape; }

private:
bool _write_data(const TypeDescriptor& type_desc, vectorized::IColumn* nullable_col_ptr,
const char* data, size_t len, bool copy_string, bool need_escape, size_t rows,
Expand Down
6 changes: 6 additions & 0 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,12 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
if (!http_req->header(HTTP_LINE_DELIMITER).empty()) {
request.__set_line_delimiter(http_req->header(HTTP_LINE_DELIMITER));
}
if (!http_req->header(HTTP_ENCLOSE).empty() && http_req->header(HTTP_ENCLOSE).size() > 0) {
request.__set_enclose(http_req->header(HTTP_ENCLOSE)[0]);
}
if (!http_req->header(HTTP_ESCAPE).empty() && http_req->header(HTTP_ESCAPE).size() > 0) {
request.__set_escape(http_req->header(HTTP_ESCAPE)[0]);
}
if (!http_req->header(HTTP_PARTITIONS).empty()) {
request.__set_partitions(http_req->header(HTTP_PARTITIONS));
request.__set_isTempPartition(false);
Expand Down
2 changes: 2 additions & 0 deletions be/src/http/http_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ static const std::string HTTP_COLUMNS = "columns";
static const std::string HTTP_WHERE = "where";
static const std::string HTTP_COLUMN_SEPARATOR = "column_separator";
static const std::string HTTP_LINE_DELIMITER = "line_delimiter";
static const std::string HTTP_ENCLOSE = "enclose";
static const std::string HTTP_ESCAPE = "escape";
static const std::string HTTP_MAX_FILTER_RATIO = "max_filter_ratio";
static const std::string HTTP_TIMEOUT = "timeout";
static const std::string HTTP_PARTITIONS = "partitions";
Expand Down
Loading

0 comments on commit 8eac2be

Please sign in to comment.