diff --git a/cpp-ch/local-engine/Parser/TypeParser.h b/cpp-ch/local-engine/Parser/TypeParser.h index 58bf369a499b..7793ae198b86 100644 --- a/cpp-ch/local-engine/Parser/TypeParser.h +++ b/cpp-ch/local-engine/Parser/TypeParser.h @@ -44,7 +44,7 @@ class TypeParser static DB::Block buildBlockFromNamedStruct(const substrait::NamedStruct & struct_); - /// Build block from substrait NamedStruct without following DFS rules. + /// Build block from substrait NamedStruct without DFS rules, different from buildBlockFromNamedStruct static DB::Block buildBlockFromNamedStructWithoutDFS(const substrait::NamedStruct & struct_); static bool isTypeMatched(const substrait::Type & substrait_type, const DB::DataTypePtr & ch_type); diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.h b/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.h index 2f8b4dc16a71..034d17249a95 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.h +++ b/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.h @@ -57,7 +57,6 @@ class FormatFile /// Create a new input format for reading this file virtual InputFormatPtr createInputFormat(const DB::Block & header) = 0; - /// Get schema which describes the columns of this file virtual DB::NamesAndTypesList getSchema() const { const auto & schema = file_info.schema(); diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp index 3f947573f9de..bf7acbadb17f 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp +++ b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp @@ -61,59 +61,22 @@ namespace local_engine // build blocks with a const virtual column to indicate how many rows is in it. static DB::Block getRealHeader(const DB::Block & header) { - if (header.columns()) - return header; - return BlockUtil::buildRowCountHeader(); + return header ? header : BlockUtil::buildRowCountHeader(); } SubstraitFileSource::SubstraitFileSource( DB::ContextPtr context_, const DB::Block & header_, const substrait::ReadRel::LocalFiles & file_infos) - : DB::SourceWithKeyCondition(getRealHeader(header_), false), context(context_), output_header(header_) + : DB::SourceWithKeyCondition(getRealHeader(header_), false), context(context_), output_header(header_), to_read_header(output_header) { if (file_infos.items_size()) { + /// Initialize files Poco::URI file_uri(file_infos.items().Get(0).uri_file()); read_buffer_builder = ReadBufferBuilderFactory::instance().createBuilder(file_uri.getScheme(), context); for (const auto & item : file_infos.items()) files.emplace_back(FormatFileUtil::createFile(context, read_buffer_builder, item)); - /// Decide which tuple type column in output_header should skip flatten. - file_schema = files[0]->getSchema(); - for (size_t i = 0; i < output_header.columns(); ++i) - { - const auto & col = output_header.getByPosition(i); - - /// Find the same column in the file schema, if the two columns have the same tuple element size, then it should be skipped flatten. - for (const auto & pair : file_schema) - { - if (boost::iequals(pair.name, col.name)) - { - auto type_in_file = DB::removeNullable(pair.type); - auto type_in_header = DB::removeNullable(col.type); - - const auto * tuple_type_in_file = typeid_cast(type_in_file.get()); - const auto * tuple_type_in_header = typeid_cast(type_in_header.get()); - if (tuple_type_in_file && tuple_type_in_header && tuple_type_in_file->haveExplicitNames() && tuple_type_in_header->haveExplicitNames() - && tuple_type_in_file->getElements().size() == tuple_type_in_header->getElements().size()) - columns_to_skip_flatten.insert(i); - } - } - } - } - - /** - * We may query part fields of a struct column. For example, we have a column c in type - * struct{x:int, y:int, z:int}, and just want fields c.x and c.y. In the substrait plan, we get - * a column c described in type struct{x:int, y:int} which is not matched with the original - * struct type and cause some exceptions. To solve this, we flatten all struct columns into - * independent field columns recursively, and fold the field columns back into struct columns - * at the end. - */ - flatten_output_header = BlockUtil::flattenBlock(output_header, BlockUtil::FLAT_STRUCT, true, columns_to_skip_flatten); - to_read_header = flatten_output_header; - if (file_infos.items_size()) - { - /// file partition keys are read from the file path + /// File partition keys are read from the file path auto partition_keys = files[0]->getFilePartitionKeys(); for (const auto & key : partition_keys) { @@ -139,16 +102,6 @@ void SubstraitFileSource::setKeyCondition(const DB::ActionsDAG::NodeRawConstPtrs DB::NameSet{}); } -std::vector SubstraitFileSource::getPartitionKeys() const -{ - return files.size() > 0 ? files[0]->getFilePartitionKeys() : std::vector(); -} - -DB::String SubstraitFileSource::getFileFormat() const -{ - return files.size() > 0 ? files[0]->getFileFormat() : "Unknown"; -} - DB::Chunk SubstraitFileSource::generate() { while (true) @@ -161,19 +114,7 @@ DB::Chunk SubstraitFileSource::generate() DB::Chunk chunk; if (file_reader->pull(chunk)) - { - if (output_header.columns()) - { - auto block = foldFlattenColumns(chunk.detachColumns(), output_header, columns_to_skip_flatten); - auto columns = block.getColumns(); - return DB::Chunk(columns, block.rows()); - } - else - { - // The count(*)/count(1) case - return chunk; - } - } + return chunk; /// try to read from next file file_reader.reset(); @@ -198,86 +139,26 @@ bool SubstraitFileSource::tryPrepareReader() return true; } - if (!to_read_header.columns()) + if (!to_read_header) { auto total_rows = current_file->getTotalRows(); - if (total_rows) - file_reader = std::make_unique(current_file, context, flatten_output_header, *total_rows); + if (total_rows.has_value()) + file_reader = std::make_unique(current_file, context, output_header, *total_rows); else { /// For text/json format file, we can't get total rows from file metadata. /// So we add a dummy column to indicate the number of rows. - auto dummy_header = BlockUtil::buildRowCountHeader(); - auto flatten_output_header_contains_dummy = flatten_output_header; - flatten_output_header_contains_dummy.insertUnique(dummy_header.getByPosition(0)); - file_reader = std::make_unique(current_file, context, dummy_header, flatten_output_header_contains_dummy); + file_reader + = std::make_unique(current_file, context, getRealHeader(to_read_header), getRealHeader(output_header)); } } else - file_reader = std::make_unique(current_file, context, to_read_header, flatten_output_header); + file_reader = std::make_unique(current_file, context, to_read_header, output_header); file_reader->applyKeyCondition(key_condition); return true; } -DB::Block SubstraitFileSource::foldFlattenColumns( - const DB::Columns & cols, const DB::Block & header, const std::unordered_set & columns_to_skip_flatten) -{ - DB::ColumnsWithTypeAndName result_cols; - - size_t pos = 0; - for (size_t i = 0; i < header.columns(); ++i) - { - const auto & named_col = header.getByPosition(i); - - DB::ColumnWithTypeAndName result_col; - if (columns_to_skip_flatten.contains(i)) [[unlikely]] - { - result_col.name = named_col.name; - result_col.type = named_col.type; - result_col.column = cols[pos]; - ++pos; - } - else - result_col = foldFlattenColumn(named_col.type, named_col.name, pos, cols); - - result_cols.emplace_back(std::move(result_col)); - } - - return DB::Block(std::move(result_cols)); -} - -DB::ColumnWithTypeAndName -SubstraitFileSource::foldFlattenColumn(DB::DataTypePtr col_type, const std::string & col_name, size_t & pos, const DB::Columns & cols) -{ - DB::DataTypePtr nested_type = DB::removeNullable(col_type); - const DB::DataTypeTuple * type_tuple = typeid_cast(nested_type.get()); - if (type_tuple && type_tuple->haveExplicitNames()) - { - const auto & field_types = type_tuple->getElements(); - const auto & field_names = type_tuple->getElementNames(); - - size_t fields_num = field_names.size(); - DB::Columns tuple_cols; - for (size_t i = 0; i < fields_num; ++i) - { - auto named_col = foldFlattenColumn(field_types[i], field_names[i], pos, cols); - tuple_cols.push_back(named_col.column); - } - auto tuple_col = DB::ColumnTuple::create(std::move(tuple_cols)); - - // The original type col_type may be wrapped by nullable, so add a cast here. - DB::ColumnWithTypeAndName ret_col(std::move(tuple_col), nested_type, col_name); - ret_col.column = DB::castColumn(ret_col, col_type); - ret_col.type = col_type; - return ret_col; - } - - size_t curr_pos = pos; - ++pos; - return DB::ColumnWithTypeAndName(cols[curr_pos], col_type, col_name); -} - DB::ColumnPtr FileReaderWrapper::createConstColumn(DB::DataTypePtr data_type, const DB::Field & field, size_t rows) { auto nested_type = DB::removeNullable(data_type); @@ -437,12 +318,12 @@ bool ConstColumnsFileReader::pull(DB::Chunk & chunk) remained_rows -= block_size; } DB::Columns res_columns; - size_t columns_num = header.columns(); - if (columns_num) + size_t col_num = header.columns(); + if (col_num) { - res_columns.reserve(columns_num); + res_columns.reserve(col_num); const auto & partition_values = file->getFilePartitionValues(); - for (size_t pos = 0; pos < columns_num; ++pos) + for (size_t pos = 0; pos < col_num; ++pos) { auto col_with_name_and_type = header.getByPosition(pos); auto type = col_with_name_and_type.type; @@ -477,20 +358,21 @@ NormalFileReader::NormalFileReader( bool NormalFileReader::pull(DB::Chunk & chunk) { - DB::Chunk tmp_chunk; - auto status = reader->pull(tmp_chunk); + DB::Chunk raw_chunk; + auto status = reader->pull(raw_chunk); if (!status) return false; - size_t rows = tmp_chunk.getNumRows(); + size_t rows = raw_chunk.getNumRows(); if (!rows) return false; - auto read_columns = tmp_chunk.detachColumns(); + auto read_columns = raw_chunk.detachColumns(); auto columns_with_name_and_type = output_header.getColumnsWithTypeAndName(); auto partition_values = file->getFilePartitionValues(); DB::Columns res_columns; + res_columns.reserve(columns_with_name_and_type.size()); for (auto & column : columns_with_name_and_type) { if (to_read_header.has(column.name)) diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.h b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.h index 5d0a2d69e9a4..08929cc5dc4a 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.h +++ b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.h @@ -104,41 +104,19 @@ class SubstraitFileSource : public DB::SourceWithKeyCondition void setKeyCondition(const DB::ActionsDAG::NodeRawConstPtrs & nodes, DB::ContextPtr context_) override; - std::vector getPartitionKeys() const; - DB::String getFileFormat() const; - protected: DB::Chunk generate() override; private: DB::ContextPtr context; - DB::Block output_header; /// Sample header before flatten, may contains partitions keys - DB::Block flatten_output_header; // Sample header after flatten, include partition keys - DB::Block to_read_header; // Sample header after flatten, not include partition keys + DB::Block output_header; /// Sample header may contains partitions keys + DB::Block to_read_header; // Sample header not include partition keys FormatFiles files; - DB::NamesAndTypesList file_schema; /// The column names and types in the file - - /// The columns to skip flatten based on output_header - /// Notice that not all tuple type columns need to be flatten. - /// E.g. if parquet file schema is `info struct`, and output_header is `info Tuple(name String, age Int32)` - /// then there is not need to flatten `info` column, because null value of `info` column will be represented as null value of `info.name` and `info.age`, which is obviously wrong. - std::unordered_set columns_to_skip_flatten; UInt32 current_file_index = 0; std::unique_ptr file_reader; ReadBufferBuilderPtr read_buffer_builder; bool tryPrepareReader(); - - // E.g we have flatten columns correspond to header {a:int, b.x.i: int, b.x.j: string, b.y: string} - // but we want to fold all the flatten struct columns into one struct column, - // {a:int, b: {x: {i: int, j: string}, y: string}} - // Notice, don't support list with named struct. ClickHouse may take advantage of this to support - // nested table, but not the case in spark. - static DB::Block - foldFlattenColumns(const DB::Columns & cols, const DB::Block & header, const std::unordered_set & columns_to_skip_flatten); - - static DB::ColumnWithTypeAndName - foldFlattenColumn(DB::DataTypePtr col_type, const std::string & col_name, size_t & pos, const DB::Columns & cols); }; } diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.cpp index 0a731ef756da..0b3478e049af 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.cpp +++ b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.cpp @@ -46,17 +46,6 @@ namespace local_engine SubstraitFileSourceStep::SubstraitFileSourceStep(DB::ContextPtr context_, DB::Pipe pipe_, const String &) : SourceStepWithFilter(DB::DataStream{.header = pipe_.getHeader()}), pipe(std::move(pipe_)), context(context_) { - DB::Processors processors = pipe.getProcessors(); - for (size_t i = 0; i < processors.size(); ++i) - { - DB::ProcessorPtr processor = processors[i]; - const SubstraitFileSource * source = static_cast(processor.get()); - if (source) - { - partition_keys = source->getPartitionKeys(); - file_format = source->getFileFormat(); - } - } } void SubstraitFileSourceStep::initializePipeline(DB::QueryPipelineBuilder & pipeline, const DB::BuildQueryPipelineSettings &) diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.h b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.h index b74599fe53a0..3a8143b68e24 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.h +++ b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSourceStep.h @@ -39,8 +39,6 @@ class SubstraitFileSourceStep : public DB::SourceStepWithFilter private: DB::Pipe pipe; DB::ContextPtr context; - std::vector partition_keys; - DB::String file_format; }; } diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/TextFormatFile.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/TextFormatFile.cpp index 811ed753dd0d..5cae962a7f2c 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/TextFormatFile.cpp +++ b/cpp-ch/local-engine/Storages/SubstraitSource/TextFormatFile.cpp @@ -42,9 +42,7 @@ FormatFile::InputFormatPtr TextFormatFile::createInputFormat(const DB::Block & h /// Initialize format settings DB::FormatSettings format_settings = DB::getFormatSettings(context); - const auto & schema = file_info.schema(); - for (const auto & name : schema.names()) - format_settings.hive_text.input_field_names.push_back(name); + format_settings.hive_text.input_field_names = getSchema().getNames(); std::string text_field_delimiter = file_info.text().field_delimiter(); format_settings.hive_text.fields_delimiter = file_info.text().field_delimiter()[0];