Skip to content

Commit

Permalink
[GLUTEN-3704][CH]Enable orc/parquet subcolumn pruning (#3707)
Browse files Browse the repository at this point in the history
* apply orc/parquet subcolumn pruning

* fix failed uts
  • Loading branch information
taiyang-li authored Nov 23, 2023
1 parent 02db7fd commit df3eb37
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 180 deletions.
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Parser/TypeParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class TypeParser

static DB::Block buildBlockFromNamedStruct(const substrait::NamedStruct & struct_);

/// Build block from substrait NamedStruct without following DFS rules.
/// Build block from substrait NamedStruct without DFS rules, different from buildBlockFromNamedStruct
static DB::Block buildBlockFromNamedStructWithoutDFS(const substrait::NamedStruct & struct_);

static bool isTypeMatched(const substrait::Type & substrait_type, const DB::DataTypePtr & ch_type);
Expand Down
1 change: 0 additions & 1 deletion cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ class FormatFile
/// Create a new input format for reading this file
virtual InputFormatPtr createInputFormat(const DB::Block & header) = 0;

/// Get schema which describes the columns of this file
virtual DB::NamesAndTypesList getSchema() const
{
const auto & schema = file_info.schema();
Expand Down
158 changes: 20 additions & 138 deletions cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,59 +61,22 @@ namespace local_engine
// build blocks with a const virtual column to indicate how many rows is in it.
static DB::Block getRealHeader(const DB::Block & header)
{
if (header.columns())
return header;
return BlockUtil::buildRowCountHeader();
return header ? header : BlockUtil::buildRowCountHeader();
}

SubstraitFileSource::SubstraitFileSource(
DB::ContextPtr context_, const DB::Block & header_, const substrait::ReadRel::LocalFiles & file_infos)
: DB::SourceWithKeyCondition(getRealHeader(header_), false), context(context_), output_header(header_)
: DB::SourceWithKeyCondition(getRealHeader(header_), false), context(context_), output_header(header_), to_read_header(output_header)
{
if (file_infos.items_size())
{
/// Initialize files
Poco::URI file_uri(file_infos.items().Get(0).uri_file());
read_buffer_builder = ReadBufferBuilderFactory::instance().createBuilder(file_uri.getScheme(), context);
for (const auto & item : file_infos.items())
files.emplace_back(FormatFileUtil::createFile(context, read_buffer_builder, item));

/// Decide which tuple type column in output_header should skip flatten.
file_schema = files[0]->getSchema();
for (size_t i = 0; i < output_header.columns(); ++i)
{
const auto & col = output_header.getByPosition(i);

/// Find the same column in the file schema, if the two columns have the same tuple element size, then it should be skipped flatten.
for (const auto & pair : file_schema)
{
if (boost::iequals(pair.name, col.name))
{
auto type_in_file = DB::removeNullable(pair.type);
auto type_in_header = DB::removeNullable(col.type);

const auto * tuple_type_in_file = typeid_cast<const DB::DataTypeTuple *>(type_in_file.get());
const auto * tuple_type_in_header = typeid_cast<const DB::DataTypeTuple *>(type_in_header.get());
if (tuple_type_in_file && tuple_type_in_header && tuple_type_in_file->haveExplicitNames() && tuple_type_in_header->haveExplicitNames()
&& tuple_type_in_file->getElements().size() == tuple_type_in_header->getElements().size())
columns_to_skip_flatten.insert(i);
}
}
}
}

/**
* We may query part fields of a struct column. For example, we have a column c in type
* struct{x:int, y:int, z:int}, and just want fields c.x and c.y. In the substrait plan, we get
* a column c described in type struct{x:int, y:int} which is not matched with the original
* struct type and cause some exceptions. To solve this, we flatten all struct columns into
* independent field columns recursively, and fold the field columns back into struct columns
* at the end.
*/
flatten_output_header = BlockUtil::flattenBlock(output_header, BlockUtil::FLAT_STRUCT, true, columns_to_skip_flatten);
to_read_header = flatten_output_header;
if (file_infos.items_size())
{
/// file partition keys are read from the file path
/// File partition keys are read from the file path
auto partition_keys = files[0]->getFilePartitionKeys();
for (const auto & key : partition_keys)
{
Expand All @@ -139,16 +102,6 @@ void SubstraitFileSource::setKeyCondition(const DB::ActionsDAG::NodeRawConstPtrs
DB::NameSet{});
}

std::vector<String> SubstraitFileSource::getPartitionKeys() const
{
return files.size() > 0 ? files[0]->getFilePartitionKeys() : std::vector<String>();
}

DB::String SubstraitFileSource::getFileFormat() const
{
return files.size() > 0 ? files[0]->getFileFormat() : "Unknown";
}

DB::Chunk SubstraitFileSource::generate()
{
while (true)
Expand All @@ -161,19 +114,7 @@ DB::Chunk SubstraitFileSource::generate()

DB::Chunk chunk;
if (file_reader->pull(chunk))
{
if (output_header.columns())
{
auto block = foldFlattenColumns(chunk.detachColumns(), output_header, columns_to_skip_flatten);
auto columns = block.getColumns();
return DB::Chunk(columns, block.rows());
}
else
{
// The count(*)/count(1) case
return chunk;
}
}
return chunk;

/// try to read from next file
file_reader.reset();
Expand All @@ -198,86 +139,26 @@ bool SubstraitFileSource::tryPrepareReader()
return true;
}

if (!to_read_header.columns())
if (!to_read_header)
{
auto total_rows = current_file->getTotalRows();
if (total_rows)
file_reader = std::make_unique<ConstColumnsFileReader>(current_file, context, flatten_output_header, *total_rows);
if (total_rows.has_value())
file_reader = std::make_unique<ConstColumnsFileReader>(current_file, context, output_header, *total_rows);
else
{
/// For text/json format file, we can't get total rows from file metadata.
/// So we add a dummy column to indicate the number of rows.
auto dummy_header = BlockUtil::buildRowCountHeader();
auto flatten_output_header_contains_dummy = flatten_output_header;
flatten_output_header_contains_dummy.insertUnique(dummy_header.getByPosition(0));
file_reader = std::make_unique<NormalFileReader>(current_file, context, dummy_header, flatten_output_header_contains_dummy);
file_reader
= std::make_unique<NormalFileReader>(current_file, context, getRealHeader(to_read_header), getRealHeader(output_header));
}
}
else
file_reader = std::make_unique<NormalFileReader>(current_file, context, to_read_header, flatten_output_header);
file_reader = std::make_unique<NormalFileReader>(current_file, context, to_read_header, output_header);

file_reader->applyKeyCondition(key_condition);
return true;
}

DB::Block SubstraitFileSource::foldFlattenColumns(
const DB::Columns & cols, const DB::Block & header, const std::unordered_set<size_t> & columns_to_skip_flatten)
{
DB::ColumnsWithTypeAndName result_cols;

size_t pos = 0;
for (size_t i = 0; i < header.columns(); ++i)
{
const auto & named_col = header.getByPosition(i);

DB::ColumnWithTypeAndName result_col;
if (columns_to_skip_flatten.contains(i)) [[unlikely]]
{
result_col.name = named_col.name;
result_col.type = named_col.type;
result_col.column = cols[pos];
++pos;
}
else
result_col = foldFlattenColumn(named_col.type, named_col.name, pos, cols);

result_cols.emplace_back(std::move(result_col));
}

return DB::Block(std::move(result_cols));
}

DB::ColumnWithTypeAndName
SubstraitFileSource::foldFlattenColumn(DB::DataTypePtr col_type, const std::string & col_name, size_t & pos, const DB::Columns & cols)
{
DB::DataTypePtr nested_type = DB::removeNullable(col_type);
const DB::DataTypeTuple * type_tuple = typeid_cast<const DB::DataTypeTuple *>(nested_type.get());
if (type_tuple && type_tuple->haveExplicitNames())
{
const auto & field_types = type_tuple->getElements();
const auto & field_names = type_tuple->getElementNames();

size_t fields_num = field_names.size();
DB::Columns tuple_cols;
for (size_t i = 0; i < fields_num; ++i)
{
auto named_col = foldFlattenColumn(field_types[i], field_names[i], pos, cols);
tuple_cols.push_back(named_col.column);
}
auto tuple_col = DB::ColumnTuple::create(std::move(tuple_cols));

// The original type col_type may be wrapped by nullable, so add a cast here.
DB::ColumnWithTypeAndName ret_col(std::move(tuple_col), nested_type, col_name);
ret_col.column = DB::castColumn(ret_col, col_type);
ret_col.type = col_type;
return ret_col;
}

size_t curr_pos = pos;
++pos;
return DB::ColumnWithTypeAndName(cols[curr_pos], col_type, col_name);
}

DB::ColumnPtr FileReaderWrapper::createConstColumn(DB::DataTypePtr data_type, const DB::Field & field, size_t rows)
{
auto nested_type = DB::removeNullable(data_type);
Expand Down Expand Up @@ -437,12 +318,12 @@ bool ConstColumnsFileReader::pull(DB::Chunk & chunk)
remained_rows -= block_size;
}
DB::Columns res_columns;
size_t columns_num = header.columns();
if (columns_num)
size_t col_num = header.columns();
if (col_num)
{
res_columns.reserve(columns_num);
res_columns.reserve(col_num);
const auto & partition_values = file->getFilePartitionValues();
for (size_t pos = 0; pos < columns_num; ++pos)
for (size_t pos = 0; pos < col_num; ++pos)
{
auto col_with_name_and_type = header.getByPosition(pos);
auto type = col_with_name_and_type.type;
Expand Down Expand Up @@ -477,20 +358,21 @@ NormalFileReader::NormalFileReader(

bool NormalFileReader::pull(DB::Chunk & chunk)
{
DB::Chunk tmp_chunk;
auto status = reader->pull(tmp_chunk);
DB::Chunk raw_chunk;
auto status = reader->pull(raw_chunk);
if (!status)
return false;

size_t rows = tmp_chunk.getNumRows();
size_t rows = raw_chunk.getNumRows();
if (!rows)
return false;

auto read_columns = tmp_chunk.detachColumns();
auto read_columns = raw_chunk.detachColumns();
auto columns_with_name_and_type = output_header.getColumnsWithTypeAndName();
auto partition_values = file->getFilePartitionValues();

DB::Columns res_columns;
res_columns.reserve(columns_with_name_and_type.size());
for (auto & column : columns_with_name_and_type)
{
if (to_read_header.has(column.name))
Expand Down
26 changes: 2 additions & 24 deletions cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,41 +104,19 @@ class SubstraitFileSource : public DB::SourceWithKeyCondition

void setKeyCondition(const DB::ActionsDAG::NodeRawConstPtrs & nodes, DB::ContextPtr context_) override;

std::vector<String> getPartitionKeys() const;
DB::String getFileFormat() const;

protected:
DB::Chunk generate() override;

private:
DB::ContextPtr context;
DB::Block output_header; /// Sample header before flatten, may contains partitions keys
DB::Block flatten_output_header; // Sample header after flatten, include partition keys
DB::Block to_read_header; // Sample header after flatten, not include partition keys
DB::Block output_header; /// Sample header may contains partitions keys
DB::Block to_read_header; // Sample header not include partition keys
FormatFiles files;
DB::NamesAndTypesList file_schema; /// The column names and types in the file

/// The columns to skip flatten based on output_header
/// Notice that not all tuple type columns need to be flatten.
/// E.g. if parquet file schema is `info struct<name string, age int>`, and output_header is `info Tuple(name String, age Int32)`
/// then there is not need to flatten `info` column, because null value of `info` column will be represented as null value of `info.name` and `info.age`, which is obviously wrong.
std::unordered_set<size_t> columns_to_skip_flatten;

UInt32 current_file_index = 0;
std::unique_ptr<FileReaderWrapper> file_reader;
ReadBufferBuilderPtr read_buffer_builder;

bool tryPrepareReader();

// E.g we have flatten columns correspond to header {a:int, b.x.i: int, b.x.j: string, b.y: string}
// but we want to fold all the flatten struct columns into one struct column,
// {a:int, b: {x: {i: int, j: string}, y: string}}
// Notice, don't support list with named struct. ClickHouse may take advantage of this to support
// nested table, but not the case in spark.
static DB::Block
foldFlattenColumns(const DB::Columns & cols, const DB::Block & header, const std::unordered_set<size_t> & columns_to_skip_flatten);

static DB::ColumnWithTypeAndName
foldFlattenColumn(DB::DataTypePtr col_type, const std::string & col_name, size_t & pos, const DB::Columns & cols);
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,6 @@ namespace local_engine
SubstraitFileSourceStep::SubstraitFileSourceStep(DB::ContextPtr context_, DB::Pipe pipe_, const String &)
: SourceStepWithFilter(DB::DataStream{.header = pipe_.getHeader()}), pipe(std::move(pipe_)), context(context_)
{
DB::Processors processors = pipe.getProcessors();
for (size_t i = 0; i < processors.size(); ++i)
{
DB::ProcessorPtr processor = processors[i];
const SubstraitFileSource * source = static_cast<const SubstraitFileSource *>(processor.get());
if (source)
{
partition_keys = source->getPartitionKeys();
file_format = source->getFileFormat();
}
}
}

void SubstraitFileSourceStep::initializePipeline(DB::QueryPipelineBuilder & pipeline, const DB::BuildQueryPipelineSettings &)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ class SubstraitFileSourceStep : public DB::SourceStepWithFilter
private:
DB::Pipe pipe;
DB::ContextPtr context;
std::vector<DB::String> partition_keys;
DB::String file_format;
};

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ FormatFile::InputFormatPtr TextFormatFile::createInputFormat(const DB::Block & h

/// Initialize format settings
DB::FormatSettings format_settings = DB::getFormatSettings(context);
const auto & schema = file_info.schema();
for (const auto & name : schema.names())
format_settings.hive_text.input_field_names.push_back(name);
format_settings.hive_text.input_field_names = getSchema().getNames();

std::string text_field_delimiter = file_info.text().field_delimiter();
format_settings.hive_text.fields_delimiter = file_info.text().field_delimiter()[0];
Expand Down

0 comments on commit df3eb37

Please sign in to comment.