Skip to content

Commit

Permalink
[feature](load) support compressed JSON format data for broker load (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
elvestar authored and Doris-Extras committed Apr 10, 2024
1 parent 1a2177a commit b0b5f84
Show file tree
Hide file tree
Showing 14 changed files with 149 additions and 83 deletions.
97 changes: 88 additions & 9 deletions be/src/exec/decompressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,39 +19,42 @@

#include <strings.h>

#include <memory>
#include <ostream>

#include "common/logging.h"
#include "common/status.h"
#include "gutil/endian.h"
#include "gutil/strings/substitute.h"

namespace doris {

Status Decompressor::create_decompressor(CompressType type, Decompressor** decompressor) {
Status Decompressor::create_decompressor(CompressType type,
std::unique_ptr<Decompressor>* decompressor) {
switch (type) {
case CompressType::UNCOMPRESSED:
*decompressor = nullptr;
decompressor->reset(nullptr);
break;
case CompressType::GZIP:
*decompressor = new GzipDecompressor(false);
decompressor->reset(new GzipDecompressor(false));
break;
case CompressType::DEFLATE:
*decompressor = new GzipDecompressor(true);
decompressor->reset(new GzipDecompressor(true));
break;
case CompressType::BZIP2:
*decompressor = new Bzip2Decompressor();
decompressor->reset(new Bzip2Decompressor());
break;
case CompressType::LZ4FRAME:
*decompressor = new Lz4FrameDecompressor();
decompressor->reset(new Lz4FrameDecompressor());
break;
case CompressType::LZ4BLOCK:
*decompressor = new Lz4BlockDecompressor();
decompressor->reset(new Lz4BlockDecompressor());
break;
case CompressType::SNAPPYBLOCK:
*decompressor = new SnappyBlockDecompressor();
decompressor->reset(new SnappyBlockDecompressor());
break;
case CompressType::LZOP:
*decompressor = new LzopDecompressor();
decompressor->reset(new LzopDecompressor());
break;
default:
return Status::InternalError("Unknown compress type: {}", type);
Expand All @@ -65,6 +68,82 @@ Status Decompressor::create_decompressor(CompressType type, Decompressor** decom
return st;
}

Status Decompressor::create_decompressor(TFileCompressType::type type,
std::unique_ptr<Decompressor>* decompressor) {
CompressType compress_type;
switch (type) {
case TFileCompressType::PLAIN:
case TFileCompressType::UNKNOWN:
compress_type = CompressType::UNCOMPRESSED;
break;
case TFileCompressType::GZ:
compress_type = CompressType::GZIP;
break;
case TFileCompressType::LZO:
case TFileCompressType::LZOP:
compress_type = CompressType::LZOP;
break;
case TFileCompressType::BZ2:
compress_type = CompressType::BZIP2;
break;
case TFileCompressType::LZ4FRAME:
compress_type = CompressType::LZ4FRAME;
break;
case TFileCompressType::LZ4BLOCK:
compress_type = CompressType::LZ4BLOCK;
break;
case TFileCompressType::DEFLATE:
compress_type = CompressType::DEFLATE;
break;
case TFileCompressType::SNAPPYBLOCK:
compress_type = CompressType::SNAPPYBLOCK;
break;
default:
return Status::InternalError<false>("unknown compress type: {}", type);
}
RETURN_IF_ERROR(Decompressor::create_decompressor(compress_type, decompressor));

return Status::OK();
}

Status Decompressor::create_decompressor(TFileFormatType::type type,
std::unique_ptr<Decompressor>* decompressor) {
CompressType compress_type;
switch (type) {
case TFileFormatType::FORMAT_PROTO:
[[fallthrough]];
case TFileFormatType::FORMAT_CSV_PLAIN:
compress_type = CompressType::UNCOMPRESSED;
break;
case TFileFormatType::FORMAT_CSV_GZ:
compress_type = CompressType::GZIP;
break;
case TFileFormatType::FORMAT_CSV_BZ2:
compress_type = CompressType::BZIP2;
break;
case TFileFormatType::FORMAT_CSV_LZ4FRAME:
compress_type = CompressType::LZ4FRAME;
break;
case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
compress_type = CompressType::LZ4BLOCK;
break;
case TFileFormatType::FORMAT_CSV_LZOP:
compress_type = CompressType::LZOP;
break;
case TFileFormatType::FORMAT_CSV_DEFLATE:
compress_type = CompressType::DEFLATE;
break;
case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
compress_type = CompressType::SNAPPYBLOCK;
break;
default:
return Status::InternalError<false>("unknown compress type: {}", type);
}
RETURN_IF_ERROR(Decompressor::create_decompressor(compress_type, decompressor));

return Status::OK();
}

uint32_t Decompressor::_read_int32(uint8_t* buf) {
return (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
}
Expand Down
11 changes: 10 additions & 1 deletion be/src/exec/decompressor.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
#include <stdint.h>
#include <zlib.h>

#include <memory>
#include <string>

#include "common/status.h"
#include "gen_cpp/PlanNodes_types.h"

namespace doris {

Expand Down Expand Up @@ -57,7 +59,14 @@ class Decompressor {
size_t* more_output_bytes) = 0;

public:
static Status create_decompressor(CompressType type, Decompressor** decompressor);
static Status create_decompressor(CompressType type,
std::unique_ptr<Decompressor>* decompressor);

static Status create_decompressor(TFileCompressType::type type,
std::unique_ptr<Decompressor>* decompressor);

static Status create_decompressor(TFileFormatType::type type,
std::unique_ptr<Decompressor>* decompressor);

virtual std::string debug_info();

Expand Down
2 changes: 1 addition & 1 deletion be/src/util/block_compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ class HadoopLz4BlockCompression : public Lz4BlockCompression {
}

private:
Decompressor* _decompressor;
std::unique_ptr<Decompressor> _decompressor;
};
// Used for LZ4 frame format, decompress speed is two times faster than LZ4.
class Lz4fBlockCompression : public BlockCompressionCodec {
Expand Down
65 changes: 2 additions & 63 deletions be/src/vec/exec/format/csv/csv_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -571,72 +571,11 @@ Status CsvReader::get_parsed_schema(std::vector<std::string>* col_names,
}

Status CsvReader::_create_decompressor() {
CompressType compress_type;
if (_file_compress_type != TFileCompressType::UNKNOWN) {
switch (_file_compress_type) {
case TFileCompressType::PLAIN:
compress_type = CompressType::UNCOMPRESSED;
break;
case TFileCompressType::GZ:
compress_type = CompressType::GZIP;
break;
case TFileCompressType::LZO:
case TFileCompressType::LZOP:
compress_type = CompressType::LZOP;
break;
case TFileCompressType::BZ2:
compress_type = CompressType::BZIP2;
break;
case TFileCompressType::LZ4FRAME:
compress_type = CompressType::LZ4FRAME;
break;
case TFileCompressType::LZ4BLOCK:
compress_type = CompressType::LZ4BLOCK;
break;
case TFileCompressType::DEFLATE:
compress_type = CompressType::DEFLATE;
break;
case TFileCompressType::SNAPPYBLOCK:
compress_type = CompressType::SNAPPYBLOCK;
break;
default:
return Status::InternalError<false>("unknown compress type: {}", _file_compress_type);
}
RETURN_IF_ERROR(Decompressor::create_decompressor(_file_compress_type, &_decompressor));
} else {
switch (_file_format_type) {
case TFileFormatType::FORMAT_PROTO:
[[fallthrough]];
case TFileFormatType::FORMAT_CSV_PLAIN:
compress_type = CompressType::UNCOMPRESSED;
break;
case TFileFormatType::FORMAT_CSV_GZ:
compress_type = CompressType::GZIP;
break;
case TFileFormatType::FORMAT_CSV_BZ2:
compress_type = CompressType::BZIP2;
break;
case TFileFormatType::FORMAT_CSV_LZ4FRAME:
compress_type = CompressType::LZ4FRAME;
break;
case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
compress_type = CompressType::LZ4BLOCK;
break;
case TFileFormatType::FORMAT_CSV_LZOP:
compress_type = CompressType::LZOP;
break;
case TFileFormatType::FORMAT_CSV_DEFLATE:
compress_type = CompressType::DEFLATE;
break;
case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
compress_type = CompressType::SNAPPYBLOCK;
break;
default:
return Status::InternalError<false>("unknown format type: {}", _file_format_type);
}
RETURN_IF_ERROR(Decompressor::create_decompressor(_file_format_type, &_decompressor));
}
Decompressor* decompressor;
RETURN_IF_ERROR(Decompressor::create_decompressor(compress_type, &decompressor));
_decompressor.reset(decompressor);

return Status::OK();
}
Expand Down
20 changes: 19 additions & 1 deletion be/src/vec/exec/format/json/new_json_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ NewJsonReader::NewJsonReader(RuntimeState* state, RuntimeProfile* profile, Scann
_file_reader(nullptr),
_line_reader(nullptr),
_reader_eof(false),
_decompressor(nullptr),
_skip_first_line(false),
_next_row(0),
_total_rows(0),
Expand All @@ -99,6 +100,12 @@ NewJsonReader::NewJsonReader(RuntimeState* state, RuntimeProfile* profile, Scann
_bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES);
_read_timer = ADD_TIMER(_profile, "ReadTime");
_file_read_timer = ADD_TIMER(_profile, "FileReadTime");
if (_range.__isset.compress_type) {
// for compatibility
_file_compress_type = _range.compress_type;
} else {
_file_compress_type = _params.compress_type;
}
_init_system_properties();
_init_file_description();
}
Expand All @@ -115,13 +122,20 @@ NewJsonReader::NewJsonReader(RuntimeProfile* profile, const TFileScanRangeParams
_file_slot_descs(file_slot_descs),
_line_reader(nullptr),
_reader_eof(false),
_decompressor(nullptr),
_skip_first_line(false),
_next_row(0),
_total_rows(0),
_value_allocator(_value_buffer, sizeof(_value_buffer)),
_parse_allocator(_parse_buffer, sizeof(_parse_buffer)),
_origin_json_doc(&_value_allocator, sizeof(_parse_buffer), &_parse_allocator),
_io_ctx(io_ctx) {
if (_range.__isset.compress_type) {
// for compatibility
_file_compress_type = _range.compress_type;
} else {
_file_compress_type = _params.compress_type;
}
_init_system_properties();
_init_file_description();
}
Expand Down Expand Up @@ -155,6 +169,10 @@ Status NewJsonReader::init_reader(
// generate _col_default_value_map
RETURN_IF_ERROR(_get_column_default_value(_file_slot_descs, col_default_value_ctx));

// create decompressor.
// _decompressor may be nullptr if this is not a compressed file
RETURN_IF_ERROR(Decompressor::create_decompressor(_file_compress_type, &_decompressor));

#ifdef __AVX2__
if (config::enable_simdjson_reader) {
RETURN_IF_ERROR(_simdjson_init_reader());
Expand Down Expand Up @@ -402,7 +420,7 @@ Status NewJsonReader::_open_line_reader() {
_skip_first_line = false;
}
_line_reader = NewPlainTextLineReader::create_unique(
_profile, _file_reader, nullptr,
_profile, _file_reader, _decompressor.get(),
std::make_shared<PlainTextLineReaderCtx>(_line_delimiter, _line_delimiter_length), size,
_current_offset);
return Status::OK();
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/exec/format/json/new_json_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <vector>

#include "common/status.h"
#include "exec/decompressor.h"
#include "exec/line_reader.h"
#include "exprs/json_functions.h"
#include "io/file_factory.h"
Expand Down Expand Up @@ -212,6 +213,8 @@ class NewJsonReader : public GenericReader {
io::FileReaderSPtr _file_reader;
std::unique_ptr<LineReader> _line_reader;
bool _reader_eof;
std::unique_ptr<Decompressor> _decompressor;
TFileCompressType::type _file_compress_type;

// When we fetch range doesn't start from 0 will always skip the first line
bool _skip_first_line;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ WITH BROKER broker_name
Specifies the file type, CSV, PARQUET and ORC formats are supported. Default is CSV.

- `COMPRESS_TYPE AS`
Specifies the file compress type, GZ/LZO/BZ2/LZ4FRAME/DEFLATE/LZOP
Specifies the file compress type, GZ/LZO/BZ2/LZ4FRAME/DEFLATE/LZOP. Only valid in CSV or JSON format.

- `column list`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ WITH BROKER broker_name
指定文件类型,支持 CSV、PARQUET 和 ORC 格式。默认为 CSV。

- `COMPRESS_TYPE AS`
指定文件压缩类型, 支持GZ/BZ2/LZ4FRAME。
指定文件压缩类型, 支持 GZ/BZ2/LZ4FRAME。仅在 CSV 或 JSON 格式下生效

- `column list`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,8 @@ public void createScanRangeLocations(FileLoadScanNode.ParamCreateContext context
if (tmpBytes > bytesPerInstance && jobType != JobType.STREAM_LOAD) {
// Now only support split plain text
if (compressType == TFileCompressType.PLAIN
&& (formatType == TFileFormatType.FORMAT_CSV_PLAIN && fileStatus.isSplitable)
|| formatType == TFileFormatType.FORMAT_JSON) {
&& ((formatType == TFileFormatType.FORMAT_CSV_PLAIN && fileStatus.isSplitable)
|| formatType == TFileFormatType.FORMAT_JSON)) {
long rangeBytes = bytesPerInstance - curInstanceBytes;
TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, rangeBytes,
columnsFromPath);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
240
360

Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading

0 comments on commit b0b5f84

Please sign in to comment.