Skip to content

Commit

Permalink
[GLUTEN-6588][CH] Cast columns if necessary before finally writing to…
Browse files Browse the repository at this point in the history
… ORC/Parquet files during native inserting (#6691)

* [GLUTEN-6588][CH] Cast columns if necessary before finally writing to ORC/Parquet files during native inserting

* fix style

* fix style

* fix conflicts and remove examples

* fix style

* fix style
  • Loading branch information
taiyang-li authored Aug 4, 2024
1 parent 944c926 commit 5d6c6f3
Show file tree
Hide file tree
Showing 16 changed files with 107 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
public class CHDatasourceJniWrapper {

public native long nativeInitFileWriterWrapper(
String filePath, String[] preferredColumnNames, String formatHint);
String filePath, byte[] preferredSchema, String formatHint);

public native long nativeInitMergeTreeWriterWrapper(
byte[] plan,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.datasources.v1

import org.apache.gluten.execution.datasource.GlutenRowSplitter
import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.memory.CHThreadGroup
import org.apache.gluten.vectorized.CHColumnVector

Expand All @@ -26,6 +27,7 @@ import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.orc.OrcUtils
import org.apache.spark.sql.types.StructType

import io.substrait.proto.{NamedStruct, Type}
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.mapreduce.TaskAttemptContext

Expand All @@ -39,10 +41,20 @@ trait CHFormatWriterInjects extends GlutenFormatWriterInjectsBase {
val originPath = path
val datasourceJniWrapper = new CHDatasourceJniWrapper();
CHThreadGroup.registerNewThreadGroup()

val namedStructBuilder = NamedStruct.newBuilder
val structBuilder = Type.Struct.newBuilder
for (field <- dataSchema.fields) {
namedStructBuilder.addNames(field.name)
structBuilder.addTypes(ConverterUtils.getTypeNode(field.dataType, field.nullable).toProtobuf)
}
namedStructBuilder.setStruct(structBuilder.build)
var namedStruct = namedStructBuilder.build

val instance =
datasourceJniWrapper.nativeInitFileWriterWrapper(
path,
dataSchema.fieldNames,
namedStruct.toByteArray,
getFormatName());

new OutputWriter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -913,12 +913,10 @@ class GlutenClickHouseNativeWriteTableSuite
(table_name, create_sql, insert_sql)
},
(table_name, _) =>
if (isSparkVersionGE("3.5")) {
compareResultsAgainstVanillaSpark(
s"select * from $table_name",
compareResult = true,
_ => {})
}
compareResultsAgainstVanillaSpark(
s"select * from $table_name",
compareResult = true,
_ => {})
)
}
}
5 changes: 3 additions & 2 deletions cpp-ch/local-engine/Common/CHUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@
*/

#include "CHUtil.h"

#include <filesystem>
#include <format>
#include <memory>
#include <optional>
#include <unistd.h>

#include <AggregateFunctions/Combinators/AggregateFunctionCombinatorFactory.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <Columns/ColumnArray.h>
Expand Down Expand Up @@ -1009,7 +1010,7 @@ void BackendInitializerUtil::init(const std::string_view plan)
});
}

void BackendInitializerUtil::updateConfig(const DB::ContextMutablePtr & context, const std::string_view plan)
void BackendInitializerUtil::updateConfig(const DB::ContextMutablePtr & context, std::string_view plan)
{
std::map<std::string, std::string> backend_conf_map = getBackendConfMap(plan);

Expand Down
3 changes: 1 addition & 2 deletions cpp-ch/local-engine/Common/CHUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,7 @@ class BackendInitializerUtil
/// 1. global level resources like global_context/shared_context, notice that they can only be initialized once in process lifetime
/// 2. session level resources like settings/configs, they can be initialized multiple times following the lifetime of executor/driver
static void init(const std::string_view plan);
static void updateConfig(const DB::ContextMutablePtr &, const std::string_view);

static void updateConfig(const DB::ContextMutablePtr &, std::string_view);

// use excel text parser
inline static const std::string USE_EXCEL_PARSER = "use_excel_serialization";
Expand Down
28 changes: 21 additions & 7 deletions cpp-ch/local-engine/Storages/Output/FileWriterWrappers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
* limitations under the License.
*/
#include "FileWriterWrappers.h"
#include <algorithm>

namespace local_engine
{
Expand All @@ -28,7 +27,6 @@ NormalFileWriter::NormalFileWriter(const OutputFormatFilePtr & file_, const DB::
{
}


void NormalFileWriter::consume(DB::Block & block)
{
if (!writer) [[unlikely]]
Expand All @@ -39,6 +37,22 @@ void NormalFileWriter::consume(DB::Block & block)
writer = std::make_unique<DB::PushingPipelineExecutor>(*pipeline);
}

/// In case input block didn't have the same types as the preferred schema, we cast the input block to the preferred schema.
/// Notice that preferred_schema is the actual file schema, which is also the data schema of current inserted table.
/// Refer to issue: https://github.com/apache/incubator-gluten/issues/6588
size_t index = 0;
const auto & preferred_schema = file->getPreferredSchema();
for (auto & column : block)
{
if (column.name.starts_with("__bucket_value__"))
continue;

const auto & preferred_column = preferred_schema.getByPosition(index++);
column.column = DB::castColumn(column, preferred_column.type);
column.name = preferred_column.name;
column.type = preferred_column.type;
}

/// Although gluten will append MaterializingTransform to the end of the pipeline before native insert in most cases, there are some cases in which MaterializingTransform won't be appended.
/// e.g. https://github.com/oap-project/gluten/issues/2900
/// So we need to do materialize here again to make sure all blocks passed to native writer are all materialized.
Expand All @@ -54,8 +68,8 @@ void NormalFileWriter::close()
writer->finish();
}

OutputFormatFilePtr create_output_format_file(
const DB::ContextPtr & context, const std::string & file_uri, const DB::Names & preferred_column_names, const std::string & format_hint)
OutputFormatFilePtr createOutputFormatFile(
const DB::ContextPtr & context, const std::string & file_uri, const DB::Block & preferred_schema, const std::string & format_hint)
{
// the passed in file_uri is exactly what is expected to see in the output folder
// e.g /xxx/中文/timestamp_field=2023-07-13 03%3A00%3A17.622/abc.parquet
Expand All @@ -64,13 +78,13 @@ OutputFormatFilePtr create_output_format_file(
Poco::URI::encode(file_uri, "", encoded); // encode the space and % seen in the file_uri
Poco::URI poco_uri(encoded);
auto write_buffer_builder = WriteBufferBuilderFactory::instance().createBuilder(poco_uri.getScheme(), context);
return OutputFormatFileUtil::createFile(context, write_buffer_builder, encoded, preferred_column_names, format_hint);
return OutputFormatFileUtil::createFile(context, write_buffer_builder, encoded, preferred_schema, format_hint);
}

std::unique_ptr<FileWriterWrapper> createFileWriterWrapper(
const DB::ContextPtr & context, const std::string & file_uri, const DB::Names & preferred_column_names, const std::string & format_hint)
const DB::ContextPtr & context, const std::string & file_uri, const DB::Block & preferred_schema, const std::string & format_hint)
{
return std::make_unique<NormalFileWriter>(create_output_format_file(context, file_uri, preferred_column_names, format_hint), context);
return std::make_unique<NormalFileWriter>(createOutputFormatFile(context, file_uri, preferred_schema, format_hint), context);
}

}
12 changes: 6 additions & 6 deletions cpp-ch/local-engine/Storages/Output/FileWriterWrappers.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class FileWriterWrapper
public:
explicit FileWriterWrapper(const OutputFormatFilePtr & file_) : file(file_) { }
virtual ~FileWriterWrapper() = default;

virtual void consume(DB::Block & block) = 0;
virtual void close() = 0;

Expand All @@ -53,10 +54,9 @@ using FileWriterWrapperPtr = std::shared_ptr<FileWriterWrapper>;
class NormalFileWriter : public FileWriterWrapper
{
public:
//TODO: EmptyFileReader and ConstColumnsFileReader ?
//TODO: to support complex types
NormalFileWriter(const OutputFormatFilePtr & file_, const DB::ContextPtr & context_);
~NormalFileWriter() override = default;

void consume(DB::Block & block) override;
void close() override;

Expand All @@ -71,13 +71,13 @@ class NormalFileWriter : public FileWriterWrapper
std::unique_ptr<FileWriterWrapper> createFileWriterWrapper(
const DB::ContextPtr & context,
const std::string & file_uri,
const DB::Names & preferred_column_names,
const DB::Block & preferred_schema,
const std::string & format_hint);

OutputFormatFilePtr create_output_format_file(
OutputFormatFilePtr createOutputFormatFile(
const DB::ContextPtr & context,
const std::string & file_uri,
const DB::Names & preferred_column_names,
const DB::Block & preferred_schema,
const std::string & format_hint);

class WriteStats : public DB::ISimpleTransform
Expand Down Expand Up @@ -191,7 +191,7 @@ class SubstraitFileSink final : public SinkToStorage
: SinkToStorage(header)
, partition_id_(partition_id.empty() ? NO_PARTITION_ID : partition_id)
, relative_path_(relative)
, output_format_(create_output_format_file(context, makeFilename(base_path, partition_id, relative), header.getNames(), format_hint)
, output_format_(createOutputFormatFile(context, makeFilename(base_path, partition_id, relative), header, format_hint)
->createOutputFormat(header))
{
}
Expand Down
6 changes: 3 additions & 3 deletions cpp-ch/local-engine/Storages/Output/ORCOutputFormatFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ ORCOutputFormatFile::ORCOutputFormatFile(
DB::ContextPtr context_,
const std::string & file_uri_,
WriteBufferBuilderPtr write_buffer_builder_,
const std::vector<std::string> & preferred_column_names_)
: OutputFormatFile(context_, file_uri_, write_buffer_builder_, preferred_column_names_)
const DB::Block & preferred_schema_)
: OutputFormatFile(context_, file_uri_, write_buffer_builder_, preferred_schema_)
{
}

Expand All @@ -37,7 +37,7 @@ OutputFormatFile::OutputFormatPtr ORCOutputFormatFile::createOutputFormat(const
auto res = std::make_shared<OutputFormatFile::OutputFormat>();
res->write_buffer = write_buffer_builder->build(file_uri);

auto new_header = creatHeaderWithPreferredColumnNames(header);
auto new_header = creatHeaderWithPreferredSchema(header);
// TODO: align all spark orc config with ch orc config
auto format_settings = DB::getFormatSettings(context);
auto output_format = std::make_shared<DB::ORCBlockOutputFormat>(*(res->write_buffer), new_header, format_settings);
Expand Down
4 changes: 1 addition & 3 deletions cpp-ch/local-engine/Storages/Output/ORCOutputFormatFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
#include "config.h"

#if USE_ORC

# include <memory>
# include <IO/WriteBuffer.h>
# include <Storages/Output/OutputFormatFile.h>

Expand All @@ -34,7 +32,7 @@ class ORCOutputFormatFile : public OutputFormatFile
DB::ContextPtr context_,
const std::string & file_uri_,
WriteBufferBuilderPtr write_buffer_builder_,
const std::vector<std::string> & preferred_column_names_);
const DB::Block & preferred_schema_);
~ORCOutputFormatFile() override = default;

OutputFormatFile::OutputFormatPtr createOutputFormat(const DB::Block & header) override;
Expand Down
43 changes: 21 additions & 22 deletions cpp-ch/local-engine/Storages/Output/OutputFormatFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,49 +38,48 @@ OutputFormatFile::OutputFormatFile(
DB::ContextPtr context_,
const std::string & file_uri_,
WriteBufferBuilderPtr write_buffer_builder_,
const std::vector<std::string> & preferred_column_names_)
: context(context_), file_uri(file_uri_), write_buffer_builder(write_buffer_builder_), preferred_column_names(preferred_column_names_)
const DB::Block & preferred_schema_)
: context(context_), file_uri(file_uri_), write_buffer_builder(write_buffer_builder_), preferred_schema(preferred_schema_)
{
}

Block OutputFormatFile::creatHeaderWithPreferredColumnNames(const Block & header)
Block OutputFormatFile::creatHeaderWithPreferredSchema(const Block & header)
{
if (!preferred_column_names.empty())
if (!preferred_schema)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "preferred_schema is empty");

/// Create a new header with the preferred column name and type
DB::ColumnsWithTypeAndName columns;
columns.reserve(preferred_schema.columns());
size_t index = 0;
for (const auto & name_type : header.getNamesAndTypesList())
{
/// Create a new header with the preferred column name
DB::NamesAndTypesList names_types_list = header.getNamesAndTypesList();
DB::ColumnsWithTypeAndName cols;
size_t index = 0;
for (const auto & name_type : header.getNamesAndTypesList())
{
if (name_type.name.starts_with("__bucket_value__"))
continue;
if (name_type.name.starts_with("__bucket_value__"))
continue;

DB::ColumnWithTypeAndName col(name_type.type->createColumn(), name_type.type, preferred_column_names.at(index++));
cols.emplace_back(std::move(col));
}
assert(preferred_column_names.size() == index);
return {std::move(cols)};
const auto & preferred_column = preferred_schema.getByPosition(index++);
ColumnWithTypeAndName column(preferred_column.type->createColumn(), preferred_column.type, preferred_column.name);
columns.emplace_back(std::move(column));
}
else
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "preferred_column_names is empty");
assert(preferred_column_names.size() == index);
return {std::move(columns)};
}

OutputFormatFilePtr OutputFormatFileUtil::createFile(
DB::ContextPtr context,
local_engine::WriteBufferBuilderPtr write_buffer_builder,
const std::string & file_uri,
const std::vector<std::string> & preferred_column_names,
const DB::Block & preferred_schema,
const std::string & format_hint)
{
#if USE_PARQUET
if (boost::to_lower_copy(file_uri).ends_with(".parquet") || "parquet" == boost::to_lower_copy(format_hint))
return std::make_shared<ParquetOutputFormatFile>(context, file_uri, write_buffer_builder, preferred_column_names);
return std::make_shared<ParquetOutputFormatFile>(context, file_uri, write_buffer_builder, preferred_schema);
#endif

#if USE_ORC
if (boost::to_lower_copy(file_uri).ends_with(".orc") || "orc" == boost::to_lower_copy(format_hint))
return std::make_shared<ORCOutputFormatFile>(context, file_uri, write_buffer_builder, preferred_column_names);
return std::make_shared<ORCOutputFormatFile>(context, file_uri, write_buffer_builder, preferred_schema);
#endif


Expand Down
10 changes: 6 additions & 4 deletions cpp-ch/local-engine/Storages/Output/OutputFormatFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,21 @@ class OutputFormatFile
DB::ContextPtr context_,
const std::string & file_uri_,
WriteBufferBuilderPtr write_buffer_builder_,
const std::vector<std::string> & preferred_column_names_);
const DB::Block & prefered_schema_);

virtual ~OutputFormatFile() = default;

virtual OutputFormatPtr createOutputFormat(const DB::Block & header_) = 0;

virtual const DB::Block getPreferredSchema() const { return preferred_schema; }

protected:
DB::Block creatHeaderWithPreferredColumnNames(const DB::Block & header);
DB::Block creatHeaderWithPreferredSchema(const DB::Block & header);

DB::ContextPtr context;
std::string file_uri;
WriteBufferBuilderPtr write_buffer_builder;
std::vector<std::string> preferred_column_names;
DB::Block preferred_schema;
};
using OutputFormatFilePtr = std::shared_ptr<OutputFormatFile>;

Expand All @@ -66,7 +68,7 @@ class OutputFormatFileUtil
DB::ContextPtr context,
WriteBufferBuilderPtr write_buffer_builder_,
const std::string & file_uri_,
const std::vector<std::string> & preferred_column_names,
const DB::Block & prefered_schema_,
const std::string & format_hint = "");
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@
#include "ParquetOutputFormatFile.h"

#if USE_PARQUET

# include <memory>
# include <string>
# include <utility>

# include <Formats/FormatFactory.h>
# include <Formats/FormatSettings.h>
Expand All @@ -35,8 +33,8 @@ ParquetOutputFormatFile::ParquetOutputFormatFile(
DB::ContextPtr context_,
const std::string & file_uri_,
const WriteBufferBuilderPtr & write_buffer_builder_,
const std::vector<std::string> & preferred_column_names_)
: OutputFormatFile(context_, file_uri_, write_buffer_builder_, preferred_column_names_)
const DB::Block & preferred_schema_)
: OutputFormatFile(context_, file_uri_, write_buffer_builder_, preferred_schema_)
{
}

Expand All @@ -45,7 +43,7 @@ OutputFormatFile::OutputFormatPtr ParquetOutputFormatFile::createOutputFormat(co
auto res = std::make_shared<OutputFormatFile::OutputFormat>();
res->write_buffer = write_buffer_builder->build(file_uri);

auto new_header = creatHeaderWithPreferredColumnNames(header);
auto new_header = creatHeaderWithPreferredSchema(header);
// TODO: align all spark parquet config with ch parquet config
auto format_settings = DB::getFormatSettings(context);
auto output_format = std::make_shared<DB::ParquetBlockOutputFormat>(*(res->write_buffer), new_header, format_settings);
Expand Down
Loading

0 comments on commit 5d6c6f3

Please sign in to comment.