Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-3704][CH]Enable orc/parquet subcolumn pruning #3707

Merged
merged 2 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Parser/TypeParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class TypeParser

static DB::Block buildBlockFromNamedStruct(const substrait::NamedStruct & struct_);

/// Build block from substrait NamedStruct without following DFS rules.
/// Build block from substrait NamedStruct without DFS rules, different from buildBlockFromNamedStruct
static DB::Block buildBlockFromNamedStructWithoutDFS(const substrait::NamedStruct & struct_);

static bool isTypeMatched(const substrait::Type & substrait_type, const DB::DataTypePtr & ch_type);
Expand Down
1 change: 0 additions & 1 deletion cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ class FormatFile
/// Create a new input format for reading this file
virtual InputFormatPtr createInputFormat(const DB::Block & header) = 0;

/// Get schema which describes the columns of this file
virtual DB::NamesAndTypesList getSchema() const
{
const auto & schema = file_info.schema();
Expand Down
158 changes: 20 additions & 138 deletions cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,59 +61,22 @@ namespace local_engine
// build blocks with a const virtual column to indicate how many rows is in it.
static DB::Block getRealHeader(const DB::Block & header)
{
if (header.columns())
return header;
return BlockUtil::buildRowCountHeader();
return header ? header : BlockUtil::buildRowCountHeader();
}

SubstraitFileSource::SubstraitFileSource(
DB::ContextPtr context_, const DB::Block & header_, const substrait::ReadRel::LocalFiles & file_infos)
: DB::SourceWithKeyCondition(getRealHeader(header_), false), context(context_), output_header(header_)
: DB::SourceWithKeyCondition(getRealHeader(header_), false), context(context_), output_header(header_), to_read_header(output_header)
{
if (file_infos.items_size())
{
/// Initialize files
Poco::URI file_uri(file_infos.items().Get(0).uri_file());
read_buffer_builder = ReadBufferBuilderFactory::instance().createBuilder(file_uri.getScheme(), context);
for (const auto & item : file_infos.items())
files.emplace_back(FormatFileUtil::createFile(context, read_buffer_builder, item));

/// Decide which tuple type column in output_header should skip flatten.
file_schema = files[0]->getSchema();
for (size_t i = 0; i < output_header.columns(); ++i)
{
const auto & col = output_header.getByPosition(i);

/// Find the same column in the file schema, if the two columns have the same tuple element size, then it should be skipped flatten.
for (const auto & pair : file_schema)
{
if (boost::iequals(pair.name, col.name))
{
auto type_in_file = DB::removeNullable(pair.type);
auto type_in_header = DB::removeNullable(col.type);

const auto * tuple_type_in_file = typeid_cast<const DB::DataTypeTuple *>(type_in_file.get());
const auto * tuple_type_in_header = typeid_cast<const DB::DataTypeTuple *>(type_in_header.get());
if (tuple_type_in_file && tuple_type_in_header && tuple_type_in_file->haveExplicitNames() && tuple_type_in_header->haveExplicitNames()
&& tuple_type_in_file->getElements().size() == tuple_type_in_header->getElements().size())
columns_to_skip_flatten.insert(i);
}
}
}
}

/**
* We may query part fields of a struct column. For example, we have a column c in type
* struct{x:int, y:int, z:int}, and just want fields c.x and c.y. In the substrait plan, we get
* a column c described in type struct{x:int, y:int} which is not matched with the original
* struct type and cause some exceptions. To solve this, we flatten all struct columns into
* independent field columns recursively, and fold the field columns back into struct columns
* at the end.
*/
flatten_output_header = BlockUtil::flattenBlock(output_header, BlockUtil::FLAT_STRUCT, true, columns_to_skip_flatten);
to_read_header = flatten_output_header;
if (file_infos.items_size())
{
/// file partition keys are read from the file path
/// File partition keys are read from the file path
auto partition_keys = files[0]->getFilePartitionKeys();
for (const auto & key : partition_keys)
{
Expand All @@ -139,16 +102,6 @@ void SubstraitFileSource::setKeyCondition(const DB::ActionsDAG::NodeRawConstPtrs
DB::NameSet{});
}

std::vector<String> SubstraitFileSource::getPartitionKeys() const
{
return files.size() > 0 ? files[0]->getFilePartitionKeys() : std::vector<String>();
}

DB::String SubstraitFileSource::getFileFormat() const
{
return files.size() > 0 ? files[0]->getFileFormat() : "Unknown";
}

DB::Chunk SubstraitFileSource::generate()
{
while (true)
Expand All @@ -161,19 +114,7 @@ DB::Chunk SubstraitFileSource::generate()

DB::Chunk chunk;
if (file_reader->pull(chunk))
{
if (output_header.columns())
{
auto block = foldFlattenColumns(chunk.detachColumns(), output_header, columns_to_skip_flatten);
auto columns = block.getColumns();
return DB::Chunk(columns, block.rows());
}
else
{
// The count(*)/count(1) case
return chunk;
}
}
return chunk;

/// try to read from next file
file_reader.reset();
Expand All @@ -198,86 +139,26 @@ bool SubstraitFileSource::tryPrepareReader()
return true;
}

if (!to_read_header.columns())
if (!to_read_header)
{
auto total_rows = current_file->getTotalRows();
if (total_rows)
file_reader = std::make_unique<ConstColumnsFileReader>(current_file, context, flatten_output_header, *total_rows);
if (total_rows.has_value())
file_reader = std::make_unique<ConstColumnsFileReader>(current_file, context, output_header, *total_rows);
else
{
/// For text/json format file, we can't get total rows from file metadata.
/// So we add a dummy column to indicate the number of rows.
auto dummy_header = BlockUtil::buildRowCountHeader();
auto flatten_output_header_contains_dummy = flatten_output_header;
flatten_output_header_contains_dummy.insertUnique(dummy_header.getByPosition(0));
file_reader = std::make_unique<NormalFileReader>(current_file, context, dummy_header, flatten_output_header_contains_dummy);
file_reader
= std::make_unique<NormalFileReader>(current_file, context, getRealHeader(to_read_header), getRealHeader(output_header));
}
}
else
file_reader = std::make_unique<NormalFileReader>(current_file, context, to_read_header, flatten_output_header);
file_reader = std::make_unique<NormalFileReader>(current_file, context, to_read_header, output_header);

file_reader->applyKeyCondition(key_condition);
return true;
}

DB::Block SubstraitFileSource::foldFlattenColumns(
const DB::Columns & cols, const DB::Block & header, const std::unordered_set<size_t> & columns_to_skip_flatten)
{
DB::ColumnsWithTypeAndName result_cols;

size_t pos = 0;
for (size_t i = 0; i < header.columns(); ++i)
{
const auto & named_col = header.getByPosition(i);

DB::ColumnWithTypeAndName result_col;
if (columns_to_skip_flatten.contains(i)) [[unlikely]]
{
result_col.name = named_col.name;
result_col.type = named_col.type;
result_col.column = cols[pos];
++pos;
}
else
result_col = foldFlattenColumn(named_col.type, named_col.name, pos, cols);

result_cols.emplace_back(std::move(result_col));
}

return DB::Block(std::move(result_cols));
}

DB::ColumnWithTypeAndName
SubstraitFileSource::foldFlattenColumn(DB::DataTypePtr col_type, const std::string & col_name, size_t & pos, const DB::Columns & cols)
{
DB::DataTypePtr nested_type = DB::removeNullable(col_type);
const DB::DataTypeTuple * type_tuple = typeid_cast<const DB::DataTypeTuple *>(nested_type.get());
if (type_tuple && type_tuple->haveExplicitNames())
{
const auto & field_types = type_tuple->getElements();
const auto & field_names = type_tuple->getElementNames();

size_t fields_num = field_names.size();
DB::Columns tuple_cols;
for (size_t i = 0; i < fields_num; ++i)
{
auto named_col = foldFlattenColumn(field_types[i], field_names[i], pos, cols);
tuple_cols.push_back(named_col.column);
}
auto tuple_col = DB::ColumnTuple::create(std::move(tuple_cols));

// The original type col_type may be wrapped by nullable, so add a cast here.
DB::ColumnWithTypeAndName ret_col(std::move(tuple_col), nested_type, col_name);
ret_col.column = DB::castColumn(ret_col, col_type);
ret_col.type = col_type;
return ret_col;
}

size_t curr_pos = pos;
++pos;
return DB::ColumnWithTypeAndName(cols[curr_pos], col_type, col_name);
}

DB::ColumnPtr FileReaderWrapper::createConstColumn(DB::DataTypePtr data_type, const DB::Field & field, size_t rows)
{
auto nested_type = DB::removeNullable(data_type);
Expand Down Expand Up @@ -437,12 +318,12 @@ bool ConstColumnsFileReader::pull(DB::Chunk & chunk)
remained_rows -= block_size;
}
DB::Columns res_columns;
size_t columns_num = header.columns();
if (columns_num)
size_t col_num = header.columns();
if (col_num)
{
res_columns.reserve(columns_num);
res_columns.reserve(col_num);
const auto & partition_values = file->getFilePartitionValues();
for (size_t pos = 0; pos < columns_num; ++pos)
for (size_t pos = 0; pos < col_num; ++pos)
{
auto col_with_name_and_type = header.getByPosition(pos);
auto type = col_with_name_and_type.type;
Expand Down Expand Up @@ -477,20 +358,21 @@ NormalFileReader::NormalFileReader(

bool NormalFileReader::pull(DB::Chunk & chunk)
{
DB::Chunk tmp_chunk;
auto status = reader->pull(tmp_chunk);
DB::Chunk raw_chunk;
auto status = reader->pull(raw_chunk);
if (!status)
return false;

size_t rows = tmp_chunk.getNumRows();
size_t rows = raw_chunk.getNumRows();
if (!rows)
return false;

auto read_columns = tmp_chunk.detachColumns();
auto read_columns = raw_chunk.detachColumns();
auto columns_with_name_and_type = output_header.getColumnsWithTypeAndName();
auto partition_values = file->getFilePartitionValues();

DB::Columns res_columns;
res_columns.reserve(columns_with_name_and_type.size());
for (auto & column : columns_with_name_and_type)
{
if (to_read_header.has(column.name))
Expand Down
26 changes: 2 additions & 24 deletions cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,41 +104,19 @@ class SubstraitFileSource : public DB::SourceWithKeyCondition

void setKeyCondition(const DB::ActionsDAG::NodeRawConstPtrs & nodes, DB::ContextPtr context_) override;

std::vector<String> getPartitionKeys() const;
DB::String getFileFormat() const;

protected:
DB::Chunk generate() override;

private:
DB::ContextPtr context;
DB::Block output_header; /// Sample header before flatten, may contains partitions keys
DB::Block flatten_output_header; // Sample header after flatten, include partition keys
DB::Block to_read_header; // Sample header after flatten, not include partition keys
DB::Block output_header; /// Sample header may contains partitions keys
DB::Block to_read_header; // Sample header not include partition keys
FormatFiles files;
DB::NamesAndTypesList file_schema; /// The column names and types in the file

/// The columns to skip flatten based on output_header
/// Notice that not all tuple type columns need to be flatten.
/// E.g. if parquet file schema is `info struct<name string, age int>`, and output_header is `info Tuple(name String, age Int32)`
/// then there is not need to flatten `info` column, because null value of `info` column will be represented as null value of `info.name` and `info.age`, which is obviously wrong.
std::unordered_set<size_t> columns_to_skip_flatten;

UInt32 current_file_index = 0;
std::unique_ptr<FileReaderWrapper> file_reader;
ReadBufferBuilderPtr read_buffer_builder;

bool tryPrepareReader();

// E.g we have flatten columns correspond to header {a:int, b.x.i: int, b.x.j: string, b.y: string}
// but we want to fold all the flatten struct columns into one struct column,
// {a:int, b: {x: {i: int, j: string}, y: string}}
// Notice, don't support list with named struct. ClickHouse may take advantage of this to support
// nested table, but not the case in spark.
static DB::Block
foldFlattenColumns(const DB::Columns & cols, const DB::Block & header, const std::unordered_set<size_t> & columns_to_skip_flatten);

static DB::ColumnWithTypeAndName
foldFlattenColumn(DB::DataTypePtr col_type, const std::string & col_name, size_t & pos, const DB::Columns & cols);
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,6 @@ namespace local_engine
SubstraitFileSourceStep::SubstraitFileSourceStep(DB::ContextPtr context_, DB::Pipe pipe_, const String &)
: SourceStepWithFilter(DB::DataStream{.header = pipe_.getHeader()}), pipe(std::move(pipe_)), context(context_)
{
DB::Processors processors = pipe.getProcessors();
for (size_t i = 0; i < processors.size(); ++i)
{
DB::ProcessorPtr processor = processors[i];
const SubstraitFileSource * source = static_cast<const SubstraitFileSource *>(processor.get());
if (source)
{
partition_keys = source->getPartitionKeys();
file_format = source->getFileFormat();
}
}
}

void SubstraitFileSourceStep::initializePipeline(DB::QueryPipelineBuilder & pipeline, const DB::BuildQueryPipelineSettings &)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ class SubstraitFileSourceStep : public DB::SourceStepWithFilter
private:
DB::Pipe pipe;
DB::ContextPtr context;
std::vector<DB::String> partition_keys;
DB::String file_format;
};

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ FormatFile::InputFormatPtr TextFormatFile::createInputFormat(const DB::Block & h

/// Initialize format settings
DB::FormatSettings format_settings = DB::getFormatSettings(context);
const auto & schema = file_info.schema();
for (const auto & name : schema.names())
format_settings.hive_text.input_field_names.push_back(name);
format_settings.hive_text.input_field_names = getSchema().getNames();

std::string text_field_delimiter = file_info.text().field_delimiter();
format_settings.hive_text.fields_delimiter = file_info.text().field_delimiter()[0];
Expand Down
Loading