Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CH] Ignore unstabe uts and add more message when failed. #7821

Merged
merged 3 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1037,12 +1037,13 @@ class GlutenClickHouseFileFormatSuite
)
}

test("read data from orc file format") {
test("read data from orc file format - except date32") {
val filePath = s"$orcDataPath/all_data_types_with_non_primitive_type.snappy.orc"
val orcFileFormat = "orc"
val sql =
s"""
| select *
| select string_field, int_field, long_field, float_field, double_field, short_field,
| byte_field, boolean_field, decimal_field
| from $orcFileFormat.`$filePath`
| where long_field > 30
|""".stripMargin
Expand Down
50 changes: 50 additions & 0 deletions cpp-ch/local-engine/Common/DebugUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,59 @@
#include <Formats/FormatSettings.h>
#include <Functions/FunctionHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <google/protobuf/json/json.h>
#include <google/protobuf/util/json_util.h>
#include <google/protobuf/wrappers.pb.h>
#include <Common/CHUtil.h>
#include <Common/logger_useful.h>

namespace pb_util = google::protobuf::util;

namespace debug
{

void dumpPlan(DB::QueryPlan & plan, bool force, LoggerPtr logger)
{
if (!logger)
{
logger = getLogger("SerializedPlanParser");
if (!logger)
return;
}

if (!force && !logger->debug())
return;

auto out = local_engine::PlanUtil::explainPlan(plan);
if (force) // force
LOG_ERROR(logger, "clickhouse plan:\n{}", out);
else
LOG_DEBUG(logger, "clickhouse plan:\n{}", out);
}

void dumpMessage(const google::protobuf::Message & message, const char * type, bool force, LoggerPtr logger)
{
if (!logger)
{
logger = getLogger("SubstraitPlan");
if (!logger)
return;
}

if (!force && !logger->debug())
return;
pb_util::JsonOptions options;
std::string json;
if (auto s = google::protobuf::json::MessageToJsonString(message, &json, options); !s.ok())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Can not convert {} to Json", type);

if (force) // force
LOG_ERROR(logger, "{}:\n{}", type, json);
else
LOG_DEBUG(logger, "{}:\n{}", type, json);
}

void headBlock(const DB::Block & block, size_t count)
{
std::cout << "============Block============" << std::endl;
Expand Down
12 changes: 12 additions & 0 deletions cpp-ch/local-engine/Common/DebugUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,20 @@

#include <Core/Block.h>

namespace google::protobuf
{
class Message;
}
namespace DB
{
class QueryPlan;
}
namespace debug
{

void dumpPlan(DB::QueryPlan & plan, bool force = false, LoggerPtr = nullptr);
void dumpMessage(const google::protobuf::Message & message, const char * type, bool force = false, LoggerPtr = nullptr);

void headBlock(const DB::Block & block, size_t count = 10);
String printBlock(const DB::Block & block, size_t count = 10);

Expand Down
3 changes: 2 additions & 1 deletion cpp-ch/local-engine/Common/GlutenConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <config.pb.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/CHUtil.h>
#include <Common/DebugUtils.h>
#include <Common/logger_useful.h>

namespace local_engine
Expand All @@ -45,7 +46,7 @@ std::map<std::string, std::string> SparkConfigs::load(std::string_view plan, boo
auto configMaps = local_engine::BinaryToMessage<gluten::ConfigMap>(plan);

if (!processStart)
logDebugMessage(configMaps, "Update Config Map Plan");
debug::dumpMessage(configMaps, "Update Config Map Plan");

for (const auto & pair : configMaps.configs())
configs.emplace(pair.first, pair.second);
Expand Down
6 changes: 3 additions & 3 deletions cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
#include <Storages/SubstraitSource/SubstraitFileSourceStep.h>
#include <google/protobuf/wrappers.pb.h>
#include <Common/BlockTypeUtils.h>

#include <Common/DebugUtils.h>

namespace DB
{
Expand Down Expand Up @@ -77,7 +77,7 @@ DB::QueryPlanPtr ReadRelParser::parse(DB::QueryPlanPtr query_plan, const substra
else
{
extension_table = BinaryToMessage<substrait::ReadRel::ExtensionTable>(split_info);
logDebugMessage(extension_table, "extension_table");
debug::dumpMessage(extension_table, "extension_table");
}
MergeTreeRelParser mergeTreeParser(parser_context, getContext());
query_plan = mergeTreeParser.parseReadRel(std::make_unique<DB::QueryPlan>(), read, extension_table);
Expand Down Expand Up @@ -131,7 +131,7 @@ QueryPlanStepPtr ReadRelParser::parseReadRelWithLocalFile(const substrait::ReadR
else
{
local_files = BinaryToMessage<substrait::ReadRel::LocalFiles>(split_info);
logDebugMessage(local_files, "local_files");
debug::dumpMessage(local_files, "local_files");
}
auto source = std::make_shared<SubstraitFileSource>(getContext(), header, local_files);
auto source_pipe = Pipe(source);
Expand Down
40 changes: 13 additions & 27 deletions cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,35 +20,20 @@
#include <string>
#include <string_view>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <Columns/ColumnSet.h>
#include <Core/Block.h>
#include <Core/ColumnWithTypeAndName.h>
#include <Core/Field.h>
#include <Core/Names.h>
#include <Core/NamesAndTypes.h>
#include <Core/Settings.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeNothing.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeSet.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/IDataType.h>
#include <DataTypes/Serializations/ISerialization.h>
#include <DataTypes/getLeastSupertype.h>
#include <Functions/FunctionFactory.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/Context.h>
#include <Interpreters/PreparedSets.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/QueryPriorities.h>
#include <Join/StorageJoinFromReadBuffer.h>
#include <Operator/BlocksBufferPoolTransform.h>
#include <Parser/ExpressionParser.h>
#include <Parser/FunctionParser.h>
Expand All @@ -73,6 +58,7 @@
#include <google/protobuf/wrappers.pb.h>
#include <Common/BlockTypeUtils.h>
#include <Common/CHUtil.h>
#include <Common/DebugUtils.h>
#include <Common/Exception.h>
#include <Common/GlutenConfig.h>
#include <Common/JNIUtils.h>
Expand Down Expand Up @@ -121,13 +107,17 @@ void adjustOutput(const DB::QueryPlanPtr & query_plan, const substrait::PlanRel
{
ActionsDAG actions_dag{blockToNameAndTypeList(query_plan->getCurrentHeader())};
NamesWithAliases aliases;
auto cols = query_plan->getCurrentHeader().getNamesAndTypesList();
const auto cols = query_plan->getCurrentHeader().getNamesAndTypesList();
if (cols.getNames().size() != static_cast<size_t>(root_rel.root().names_size()))
{
debug::dumpPlan(*query_plan, true);
debug::dumpMessage(root_rel, "substrait::PlanRel", true);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Missmatch result columns size. plan column size {}, subtrait plan size {}.",
"Missmatch result columns size. plan column size {}, subtrait plan name size {}.",
cols.getNames().size(),
root_rel.root().names_size());
}
for (int i = 0; i < static_cast<int>(cols.getNames().size()); i++)
aliases.emplace_back(NameWithAlias(cols.getNames()[i], root_rel.root().names(i)));
actions_dag.project(aliases);
Expand All @@ -144,13 +134,14 @@ void adjustOutput(const DB::QueryPlanPtr & query_plan, const substrait::PlanRel
const auto & original_cols = original_header.getColumnsWithTypeAndName();
if (static_cast<size_t>(output_schema.types_size()) != original_cols.size())
{
debug::dumpPlan(*query_plan, true);
debug::dumpMessage(root_rel, "substrait::PlanRel", true);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Mismatch output schema. plan column size {} [header: '{}'], subtrait plan size {}[schema: {}].",
"Missmatch result columns size. plan column size {}, subtrait plan output schema size {}, subtrait plan name size {}.",
original_cols.size(),
original_header.dumpStructure(),
output_schema.types_size(),
dumpMessage(output_schema));
root_rel.root().names_size());
}
bool need_final_project = false;
ColumnsWithTypeAndName final_cols;
Expand Down Expand Up @@ -192,7 +183,7 @@ void adjustOutput(const DB::QueryPlanPtr & query_plan, const substrait::PlanRel

QueryPlanPtr SerializedPlanParser::parse(const substrait::Plan & plan)
{
logDebugMessage(plan, "substrait plan");
debug::dumpMessage(plan, "substrait::Plan");
//parseExtensions(plan.extensions());
if (plan.relations_size() != 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "too many relations found");
Expand All @@ -213,12 +204,7 @@ QueryPlanPtr SerializedPlanParser::parse(const substrait::Plan & plan)
PlanUtil::checkOuputType(*query_plan);
#endif

if (auto * logger = &Poco::Logger::get("SerializedPlanParser"); logger->debug())
{
auto out = PlanUtil::explainPlan(*query_plan);
LOG_DEBUG(logger, "clickhouse plan:\n{}", out);
}

debug::dumpPlan(*query_plan);
return query_plan;
}

Expand Down
57 changes: 0 additions & 57 deletions cpp-ch/local-engine/Parser/SubstraitParserUtils.cpp

This file was deleted.

12 changes: 7 additions & 5 deletions cpp-ch/local-engine/Parser/SubstraitParserUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <string>
#include <google/protobuf/util/json_util.h>
#include <google/protobuf/wrappers.pb.h>
#include <Common/Exception.h>

namespace DB::ErrorCodes
Expand Down Expand Up @@ -67,9 +68,10 @@ Message BinaryToMessage(const std::string_view binary)
return message;
}

void logDebugMessage(const google::protobuf::Message & message, const char * type);

std::string dumpMessage(const google::protobuf::Message & message);

std::string toString(const google::protobuf::Any & any);
inline std::string toString(const google::protobuf::Any & any)
{
google::protobuf::StringValue sv;
sv.ParseFromString(any.value());
return sv.value();
}
} // namespace local_engine
6 changes: 3 additions & 3 deletions cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
#include <Storages/MergeTree/StorageMergeTreeFactory.h>
#include <google/protobuf/util/json_util.h>
#include <rapidjson/document.h>
#include <Poco/StringTokenizer.h>

#include <write_optimization.pb.h>
#include <Poco/StringTokenizer.h>
#include <Common/DebugUtils.h>

using namespace DB;
using namespace local_engine;
Expand Down Expand Up @@ -228,7 +228,7 @@ MergeTreeTableInstance::MergeTreeTableInstance(const google::protobuf::Any & any
MergeTreeTableInstance::MergeTreeTableInstance(const substrait::ReadRel::ExtensionTable & extension_table)
: MergeTreeTableInstance(extension_table.detail())
{
logDebugMessage(extension_table, "merge_tree_table");
debug::dumpMessage(extension_table, "merge_tree_table");
}

SparkStorageMergeTreePtr MergeTreeTableInstance::restoreStorage(const ContextMutablePtr & context) const
Expand Down
Loading