diff --git a/backends-velox/src/main/scala/io/glutenproject/execution/HashAggregateExecTransformer.scala b/backends-velox/src/main/scala/io/glutenproject/execution/HashAggregateExecTransformer.scala index 1b8e10893a89..2d12eae0d41f 100644 --- a/backends-velox/src/main/scala/io/glutenproject/execution/HashAggregateExecTransformer.scala +++ b/backends-velox/src/main/scala/io/glutenproject/execution/HashAggregateExecTransformer.scala @@ -16,7 +16,6 @@ */ package io.glutenproject.execution -import io.glutenproject.execution.VeloxAggregateFunctionsBuilder._ import io.glutenproject.expression._ import io.glutenproject.expression.ConverterUtils.FunctionConfig import io.glutenproject.substrait.`type`.{TypeBuilder, TypeNode} @@ -24,7 +23,7 @@ import io.glutenproject.substrait.{AggregationParams, SubstraitContext} import io.glutenproject.substrait.expression.{AggregateFunctionNode, ExpressionBuilder, ExpressionNode, ScalarFunctionNode} import io.glutenproject.substrait.extensions.ExtensionBuilder import io.glutenproject.substrait.rel.{RelBuilder, RelNode} -import io.glutenproject.utils.GlutenDecimalUtil +import io.glutenproject.utils.VeloxIntermediateData import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ @@ -80,27 +79,18 @@ case class HashAggregateExecTransformer( * @return * extracting needed or not. */ - def extractStructNeeded(): Boolean = { - for (expr <- aggregateExpressions) { - val aggregateFunction = expr.aggregateFunction - aggregateFunction match { - case _: Average | _: First | _: Last | _: StddevSamp | _: StddevPop | _: VarianceSamp | - _: VariancePop | _: Corr | _: CovPopulation | _: CovSample | _: MaxMinBy => - expr.mode match { - case Partial | PartialMerge => - return true - case _ => - } - case sum: Sum if sum.dataType.isInstanceOf[DecimalType] => - expr.mode match { - case Partial | PartialMerge => - return true - case _ => - } - case _ => - } + private def extractStructNeeded(): Boolean = { + aggregateExpressions.exists { + expr => + expr.aggregateFunction match { + case aggFunc if aggFunc.aggBufferAttributes.size > 1 => + expr.mode match { + case Partial | PartialMerge => true + case _ => false + } + case _ => false + } } - false } /** @@ -133,56 +123,29 @@ case class HashAggregateExecTransformer( case _ => throw new UnsupportedOperationException(s"${expr.mode} not supported.") } + val aggFunc = expr.aggregateFunction expr.aggregateFunction match { - case _: Average | _: First | _: Last | _: MaxMinBy => - // Select first and second aggregate buffer from Velox Struct. - expressionNodes.add(ExpressionBuilder.makeSelection(colIdx, 0)) - expressionNodes.add(ExpressionBuilder.makeSelection(colIdx, 1)) - colIdx += 1 - case _: StddevSamp | _: StddevPop | _: VarianceSamp | _: VariancePop => - // Select count from Velox struct with count casted from LongType into DoubleType. - expressionNodes.add( - ExpressionBuilder - .makeCast( - ConverterUtils.getTypeNode(DoubleType, nullable = false), - ExpressionBuilder.makeSelection(colIdx, 0), - SQLConf.get.ansiEnabled)) - // Select avg from Velox Struct. - expressionNodes.add(ExpressionBuilder.makeSelection(colIdx, 1)) - // Select m2 from Velox Struct. - expressionNodes.add(ExpressionBuilder.makeSelection(colIdx, 2)) - colIdx += 1 - case sum: Sum if sum.dataType.isInstanceOf[DecimalType] => - // Select sum from Velox Struct. - expressionNodes.add(ExpressionBuilder.makeSelection(colIdx, 0)) - // Select isEmpty from Velox Struct. - expressionNodes.add(ExpressionBuilder.makeSelection(colIdx, 1)) - colIdx += 1 - case _: Corr => - // Select count from Velox struct with count casted from LongType into DoubleType. - expressionNodes.add( - ExpressionBuilder - .makeCast( - ConverterUtils.getTypeNode(DoubleType, nullable = false), - ExpressionBuilder.makeSelection(colIdx, 1), - SQLConf.get.ansiEnabled)) - expressionNodes.add(ExpressionBuilder.makeSelection(colIdx, 4)) - expressionNodes.add(ExpressionBuilder.makeSelection(colIdx, 5)) - expressionNodes.add(ExpressionBuilder.makeSelection(colIdx, 0)) - expressionNodes.add(ExpressionBuilder.makeSelection(colIdx, 2)) - expressionNodes.add(ExpressionBuilder.makeSelection(colIdx, 3)) - colIdx += 1 - case _: CovPopulation | _: CovSample => - // Select count from Velox struct with count casted from LongType into DoubleType. - expressionNodes.add( - ExpressionBuilder - .makeCast( - ConverterUtils.getTypeNode(DoubleType, nullable = false), - ExpressionBuilder.makeSelection(colIdx, 1), - SQLConf.get.ansiEnabled)) - expressionNodes.add(ExpressionBuilder.makeSelection(colIdx, 2)) - expressionNodes.add(ExpressionBuilder.makeSelection(colIdx, 3)) - expressionNodes.add(ExpressionBuilder.makeSelection(colIdx, 0)) + case _ @VeloxIntermediateData.Type(veloxTypes: Seq[DataType]) => + val (sparkOrders, sparkTypes) = + aggFunc.aggBufferAttributes.map(attr => (attr.name, attr.dataType)).unzip + val veloxOrders = VeloxIntermediateData.veloxIntermediateDataOrder(aggFunc) + val adjustedOrders = sparkOrders.map(veloxOrders.indexOf(_)) + sparkTypes.zipWithIndex.foreach { + case (sparkType, idx) => + val veloxType = veloxTypes(adjustedOrders(idx)) + if (veloxType != sparkType) { + // Velox and Spark have different type, adding a cast expression + expressionNodes.add( + ExpressionBuilder + .makeCast( + ConverterUtils.getTypeNode(sparkType, nullable = false), + ExpressionBuilder.makeSelection(colIdx, adjustedOrders(idx)), + SQLConf.get.ansiEnabled)) + } else { + // Velox and Spark have the same type + expressionNodes.add(ExpressionBuilder.makeSelection(colIdx, adjustedOrders(idx))) + } + } colIdx += 1 case _ => expressionNodes.add(ExpressionBuilder.makeSelection(colIdx)) @@ -209,43 +172,6 @@ case class HashAggregateExecTransformer( } } - /** - * Return the intermediate type node of a partial aggregation in Velox. - * @param aggregateFunction - * The aggregation function. - * @return - * The type of partial outputs. - */ - private def getIntermediateTypeNode(aggregateFunction: AggregateFunction): TypeNode = { - val structTypeNodes = aggregateFunction match { - case avg: Average => - ConverterUtils.getTypeNode(GlutenDecimalUtil.getAvgSumDataType(avg), nullable = true) :: - ConverterUtils.getTypeNode(LongType, nullable = true) :: Nil - case first: First => - ConverterUtils.getTypeNode(first.dataType, nullable = true) :: - ConverterUtils.getTypeNode(BooleanType, nullable = true) :: Nil - case last: Last => - ConverterUtils.getTypeNode(last.dataType, nullable = true) :: - ConverterUtils.getTypeNode(BooleanType, nullable = true) :: Nil - case maxMinBy: MaxMinBy => - ConverterUtils.getTypeNode(maxMinBy.valueExpr.dataType, nullable = true) :: - ConverterUtils.getTypeNode(maxMinBy.orderingExpr.dataType, nullable = true) :: Nil - case _: StddevSamp | _: StddevPop | _: VarianceSamp | _: VariancePop => - // Use struct type to represent Velox Row(BIGINT, DOUBLE, DOUBLE). - veloxVarianceIntermediateTypes.map(ConverterUtils.getTypeNode(_, nullable = false)) - case _: Corr => - veloxCorrIntermediateTypes.map(ConverterUtils.getTypeNode(_, nullable = false)) - case _: CovPopulation | _: CovSample => - veloxCovarIntermediateTypes.map(ConverterUtils.getTypeNode(_, nullable = false)) - case sum: Sum if sum.dataType.isInstanceOf[DecimalType] => - ConverterUtils.getTypeNode(sum.dataType, nullable = true) :: - ConverterUtils.getTypeNode(BooleanType, nullable = false) :: Nil - case other => - throw new UnsupportedOperationException(s"$other is not supported.") - } - TypeBuilder.makeStruct(false, structTypeNodes.asJava) - } - override protected def modeToKeyWord(aggregateMode: AggregateMode): String = { super.modeToKeyWord(if (mixedPartialAndMerge) Partial else aggregateMode) } @@ -268,7 +194,8 @@ case class HashAggregateExecTransformer( VeloxAggregateFunctionsBuilder.create(args, aggregateFunction), childrenNodeList, modeKeyWord, - getIntermediateTypeNode(aggregateFunction)) + VeloxIntermediateData.getIntermediateTypeNode(aggregateFunction) + ) aggregateNodeList.add(partialNode) case PartialMerge => val aggFunctionNode = ExpressionBuilder.makeAggregateFunction( @@ -276,7 +203,7 @@ case class HashAggregateExecTransformer( .create(args, aggregateFunction, mixedPartialAndMerge), childrenNodeList, modeKeyWord, - getIntermediateTypeNode(aggregateFunction) + VeloxIntermediateData.getIntermediateTypeNode(aggregateFunction) ) aggregateNodeList.add(aggFunctionNode) case Final => @@ -356,7 +283,7 @@ case class HashAggregateExecTransformer( _: VariancePop | _: Corr | _: CovPopulation | _: CovSample | _: MaxMinBy => expression.mode match { case Partial | PartialMerge => - typeNodeList.add(getIntermediateTypeNode(aggregateFunction)) + typeNodeList.add(VeloxIntermediateData.getIntermediateTypeNode(aggregateFunction)) case Final => typeNodeList.add( ConverterUtils @@ -367,7 +294,7 @@ case class HashAggregateExecTransformer( case sum: Sum if sum.dataType.isInstanceOf[DecimalType] => expression.mode match { case Partial | PartialMerge => - typeNodeList.add(getIntermediateTypeNode(aggregateFunction)) + typeNodeList.add(VeloxIntermediateData.getIntermediateTypeNode(aggregateFunction)) case Final => typeNodeList.add( ConverterUtils @@ -547,7 +474,7 @@ case class HashAggregateExecTransformer( // Spark's Corr order is [n, xAvg, yAvg, ck, xMk, yMk] val sparkCorrOutputAttr = aggregateFunction.inputAggBufferAttributes.map(_.name) val veloxInputOrder = - VeloxAggregateFunctionsBuilder.veloxCorrIntermediateDataOrder.map( + VeloxIntermediateData.veloxCorrIntermediateDataOrder.map( name => sparkCorrOutputAttr.indexOf(name)) for (order <- veloxInputOrder) { val attr = functionInputAttributes(order) @@ -590,7 +517,7 @@ case class HashAggregateExecTransformer( // Spark's Covar order is [n, xAvg, yAvg, ck] val sparkCorrOutputAttr = aggregateFunction.inputAggBufferAttributes.map(_.name) val veloxInputOrder = - VeloxAggregateFunctionsBuilder.veloxCovarIntermediateDataOrder.map( + VeloxIntermediateData.veloxCovarIntermediateDataOrder.map( name => sparkCorrOutputAttr.indexOf(name)) for (order <- veloxInputOrder) { val attr = functionInputAttributes(order) @@ -810,47 +737,6 @@ case class HashAggregateExecTransformer( /** An aggregation function builder specifically used by Velox backend. */ object VeloxAggregateFunctionsBuilder { - val veloxCorrIntermediateDataOrder: Seq[String] = Seq("ck", "n", "xMk", "yMk", "xAvg", "yAvg") - val veloxCovarIntermediateDataOrder: Seq[String] = Seq("ck", "n", "xAvg", "yAvg") - - val veloxVarianceIntermediateTypes: Seq[DataType] = Seq(LongType, DoubleType, DoubleType) - val veloxCovarIntermediateTypes: Seq[DataType] = Seq(DoubleType, LongType, DoubleType, DoubleType) - val veloxCorrIntermediateTypes: Seq[DataType] = - Seq(DoubleType, LongType, DoubleType, DoubleType, DoubleType, DoubleType) - - /** - * Get the compatible input types for a Velox aggregate function. - * @param aggregateFunc: - * the input aggreagate function. - * @param forMergeCompanion: - * whether this is a special case to solve mixed aggregation phases. - * @return - * the input types of a Velox aggregate function. - */ - private def getInputTypes( - aggregateFunc: AggregateFunction, - forMergeCompanion: Boolean): Seq[DataType] = { - if (!forMergeCompanion) { - return aggregateFunc.children.map(_.dataType) - } - aggregateFunc match { - case _: StddevSamp | _: StddevPop | _: VarianceSamp | _: VariancePop => - Seq(StructType(veloxVarianceIntermediateTypes.map(StructField("", _)).toArray)) - case _: CovPopulation | _: CovSample => - Seq(StructType(veloxCovarIntermediateTypes.map(StructField("", _)).toArray)) - case _: Corr => - Seq(StructType(veloxCorrIntermediateTypes.map(StructField("", _)).toArray)) - case aggFunc if aggFunc.aggBufferAttributes.size > 1 => - Seq( - StructType( - aggregateFunc.aggBufferAttributes - .map(attribute => StructField("", attribute.dataType)) - .toArray)) - case _ => - aggregateFunc.aggBufferAttributes.map(_.dataType) - } - } - /** * Create an scalar function for the input aggregate function. * @param args: @@ -887,7 +773,7 @@ object VeloxAggregateFunctionsBuilder { functionMap, ConverterUtils.makeFuncName( substraitAggFuncName, - getInputTypes(aggregateFunc, forMergeCompanion), + VeloxIntermediateData.getInputTypes(aggregateFunc, forMergeCompanion), FunctionConfig.REQ)) } } diff --git a/backends-velox/src/main/scala/io/glutenproject/utils/VeloxIntermediateData.scala b/backends-velox/src/main/scala/io/glutenproject/utils/VeloxIntermediateData.scala new file mode 100644 index 000000000000..7c4d5ecc00a7 --- /dev/null +++ b/backends-velox/src/main/scala/io/glutenproject/utils/VeloxIntermediateData.scala @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.glutenproject.utils + +import io.glutenproject.expression.ConverterUtils +import io.glutenproject.substrait.`type`.{TypeBuilder, TypeNode} + +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.types._ + +import scala.collection.JavaConverters._ + +object VeloxIntermediateData { + // Agg functions with inconsistent ordering of intermediate data between Velox and Spark. + // Corr + val veloxCorrIntermediateDataOrder: Seq[String] = Seq("ck", "n", "xMk", "yMk", "xAvg", "yAvg") + // CovPopulation, CovSample + val veloxCovarIntermediateDataOrder: Seq[String] = Seq("ck", "n", "xAvg", "yAvg") + + // Agg functions with inconsistent types of intermediate data between Velox and Spark. + // StddevSamp, StddevPop, VarianceSamp, VariancePop + val veloxVarianceIntermediateTypes: Seq[DataType] = Seq(LongType, DoubleType, DoubleType) + // CovPopulation, CovSample + val veloxCovarIntermediateTypes: Seq[DataType] = Seq(DoubleType, LongType, DoubleType, DoubleType) + // Corr + val veloxCorrIntermediateTypes: Seq[DataType] = + Seq(DoubleType, LongType, DoubleType, DoubleType, DoubleType, DoubleType) + + /** + * Return the intermediate columns order of Velox aggregation functions, with special matching + * required for some aggregation functions where the intermediate columns order are inconsistent + * with Spark. + * @param aggFunc + * Spark aggregation function + * @return + * the intermediate columns order of Velox aggregation functions + */ + def veloxIntermediateDataOrder(aggFunc: AggregateFunction): Seq[String] = { + aggFunc match { + case _: Corr => + veloxCorrIntermediateDataOrder + case _: CovPopulation | _: CovSample => + veloxCovarIntermediateDataOrder + case _ => + aggFunc.aggBufferAttributes.map(_.name) + } + } + + /** + * Get the compatible input types for a Velox aggregate function. + * + * @param aggregateFunc + * The input aggregate function. + * @param forMergeCompanion + * Whether this is a special case to solve mixed aggregation phases. + * @return + * The input types of a Velox aggregate function. + */ + def getInputTypes(aggregateFunc: AggregateFunction, forMergeCompanion: Boolean): Seq[DataType] = { + if (!forMergeCompanion) { + return aggregateFunc.children.map(_.dataType) + } + aggregateFunc match { + case _ @Type(veloxDataTypes: Seq[DataType]) => + Seq(StructType(veloxDataTypes.map(StructField("", _)).toArray)) + case _ => + // Not use StructType for single column agg intermediate data + aggregateFunc.aggBufferAttributes.map(_.dataType) + } + } + + /** + * Return the intermediate type node of a partial aggregation in Velox. + * + * @param aggFunc + * Spark aggregation function. + * @return + * The type of partial outputs. + */ + def getIntermediateTypeNode(aggFunc: AggregateFunction): TypeNode = { + val structTypeNodes = + aggFunc match { + case _ @Type(dataTypes: Seq[DataType]) => + dataTypes.map(ConverterUtils.getTypeNode(_, nullable = false)) + case _ => + throw new UnsupportedOperationException("Can not get velox intermediate types.") + } + TypeBuilder.makeStruct(false, structTypeNodes.asJava) + } + + object Type { + + /** + * Return the intermediate types of Velox agg functions, with special matching required for some + * aggregation functions where the intermediate results are inconsistent with Spark. Only return + * if the intermediate result has multiple columns. + * @param aggFunc + * Spark aggregation function + * @return + * the intermediate types of Velox aggregation functions. + */ + def unapply(aggFunc: AggregateFunction): Option[Seq[DataType]] = { + aggFunc match { + case _: Corr => + Some(veloxCorrIntermediateTypes) + case _: Covariance => + Some(veloxCovarIntermediateTypes) + case _: StddevSamp | _: StddevPop | _: VarianceSamp | _: VariancePop => + Some(veloxVarianceIntermediateTypes) + case _ if aggFunc.aggBufferAttributes.size > 1 => + Some(aggFunc.aggBufferAttributes.map(_.dataType)) + case _ => None + } + } + } +} diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h index 32f7b1b364ba..f188e24203d1 100644 --- a/cpp/core/compute/Runtime.h +++ b/cpp/core/compute/Runtime.h @@ -63,26 +63,9 @@ class Runtime : public std::enable_shared_from_this { Runtime(const std::unordered_map& confMap) : confMap_(confMap) {} virtual ~Runtime() = default; - void parsePlan(const uint8_t* data, int32_t size) { - parsePlan(data, size, {-1, -1, -1}); - } - /// Parse and cache the plan. /// Return true if parsed successfully. - void parsePlan(const uint8_t* data, int32_t size, SparkTaskInfo taskInfo) { - taskInfo_ = taskInfo; -#ifdef GLUTEN_PRINT_DEBUG - try { - auto jsonPlan = substraitFromPbToJson("Plan", data, size); - DEBUG_OUT << std::string(50, '#') << " received substrait::Plan:" << std::endl; - DEBUG_OUT << "Task stageId: " << taskInfo_.stageId << ", partitionId: " << taskInfo_.partitionId - << ", taskId: " << taskInfo_.taskId << "; " << jsonPlan << std::endl; - } catch (const std::exception& e) { - std::cerr << "Error converting Substrait plan to JSON: " << e.what() << std::endl; - } -#endif - GLUTEN_CHECK(parseProtobuf(data, size, &substraitPlan_) == true, "Parse substrait plan failed"); - } + virtual void parsePlan(const uint8_t* data, int32_t size, SparkTaskInfo taskInfo) = 0; virtual std::string planString(bool details, const std::unordered_map& sessionConf) = 0; diff --git a/cpp/core/config/GlutenConfig.cc b/cpp/core/config/GlutenConfig.cc index 233053c817e7..21f8705e0cf5 100644 --- a/cpp/core/config/GlutenConfig.cc +++ b/cpp/core/config/GlutenConfig.cc @@ -34,4 +34,13 @@ std::unordered_map parseConfMap(JNIEnv* env, jbyteArra return sparkConfs; } + +std::string printConfig(const std::unordered_map& conf) { + std::ostringstream oss; + oss << std::endl; + for (auto& [k, v] : conf) { + oss << " [" << k << ", " << v << "]\n"; + } + return oss.str(); +} } // namespace gluten diff --git a/cpp/core/config/GlutenConfig.h b/cpp/core/config/GlutenConfig.h index 810d64e5306f..32fd958c0d7a 100644 --- a/cpp/core/config/GlutenConfig.h +++ b/cpp/core/config/GlutenConfig.h @@ -24,6 +24,7 @@ namespace gluten { // store configurations that are general to all backend types +const std::string kDebugModeEnabled = "spark.gluten.sql.debug"; const std::string kGlutenSaveDir = "spark.gluten.saveDir"; @@ -56,4 +57,6 @@ const std::string kQatBackendName = "qat"; const std::string kIaaBackendName = "iaa"; std::unordered_map parseConfMap(JNIEnv* env, jbyteArray configArray); + +std::string printConfig(const std::unordered_map& conf); } // namespace gluten diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index b785645478d9..eefe8b867a72 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -341,7 +341,7 @@ JNIEXPORT jstring JNICALL Java_io_glutenproject_vectorized_PlanEvaluatorJniWrapp auto planData = reinterpret_cast(env->GetByteArrayElements(planArray, 0)); auto planSize = env->GetArrayLength(planArray); auto ctx = gluten::getRuntime(env, wrapper); - ctx->parsePlan(planData, planSize); + ctx->parsePlan(planData, planSize, {}); auto& conf = ctx->getConfMap(); auto planString = ctx->planString(details, conf); return env->NewStringUTF(planString.c_str()); diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc b/cpp/velox/benchmarks/GenericBenchmark.cc index 02f6a98d34e0..c3736134afac 100644 --- a/cpp/velox/benchmarks/GenericBenchmark.cc +++ b/cpp/velox/benchmarks/GenericBenchmark.cc @@ -123,7 +123,7 @@ auto BM_Generic = [](::benchmark::State& state, setCpu(state.thread_index()); } auto memoryManager = getDefaultMemoryManager(); - auto runtime = Runtime::create(kVeloxRuntimeKind); + auto runtime = Runtime::create(kVeloxRuntimeKind, conf); const auto& filePath = getExampleFilePath(substraitJsonFile); auto plan = getPlanFromFile(filePath); auto startTime = std::chrono::steady_clock::now(); @@ -146,7 +146,7 @@ auto BM_Generic = [](::benchmark::State& state, }); } - runtime->parsePlan(reinterpret_cast(plan.data()), plan.size()); + runtime->parsePlan(reinterpret_cast(plan.data()), plan.size(), {}); auto resultIter = runtime->createResultIterator(memoryManager.get(), "/tmp/test-spill", std::move(inputIters), conf); auto veloxPlan = dynamic_cast(runtime)->getVeloxPlan(); @@ -233,6 +233,7 @@ int main(int argc, char** argv) { std::unordered_map conf; conf.insert({gluten::kSparkBatchSize, FLAGS_batch_size}); + conf.insert({kDebugModeEnabled, "true"}); initVeloxBackend(conf); try { diff --git a/cpp/velox/benchmarks/QueryBenchmark.cc b/cpp/velox/benchmarks/QueryBenchmark.cc index 9c29edd815ca..7070267036d5 100644 --- a/cpp/velox/benchmarks/QueryBenchmark.cc +++ b/cpp/velox/benchmarks/QueryBenchmark.cc @@ -90,7 +90,7 @@ auto BM = [](::benchmark::State& state, state.PauseTiming(); state.ResumeTiming(); - runtime->parsePlan(reinterpret_cast(plan.data()), plan.size()); + runtime->parsePlan(reinterpret_cast(plan.data()), plan.size(), {}); std::shared_ptr veloxPlan; auto resultIter = getResultIterator(memoryManager.get(), runtime, scanInfos, veloxPlan); auto outputSchema = toArrowSchema(veloxPlan->outputType(), defaultLeafVeloxMemoryPool().get()); diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc index 684a75523f9a..ad7f8fc36bc8 100644 --- a/cpp/velox/compute/VeloxBackend.cc +++ b/cpp/velox/compute/VeloxBackend.cc @@ -34,6 +34,7 @@ #ifdef ENABLE_GCS #include #endif +#include "config/GlutenConfig.h" #include "jni/JniFileSystem.h" #include "udf/UdfLoader.h" #include "utils/ConfigExtractor.h" @@ -115,16 +116,6 @@ const bool kVeloxFileHandleCacheEnabledDefault = false; namespace gluten { -void VeloxBackend::printConf(const facebook::velox::Config& conf) { - std::ostringstream oss; - oss << "STARTUP: VeloxBackend conf = {\n"; - for (auto& [k, v] : conf.valuesCopy()) { - oss << " {" << k << ", " << v << "}\n"; - } - oss << "}\n"; - LOG(INFO) << oss.str(); -} - void VeloxBackend::init(const std::unordered_map& conf) { // Init glog and log level. auto veloxmemcfg = std::make_shared(conf); @@ -257,10 +248,6 @@ void VeloxBackend::init(const std::unordered_map& conf initCache(veloxcfg); initIOExecutor(veloxcfg); -#ifdef GLUTEN_PRINT_DEBUG - printConf(*veloxcfg); -#endif - veloxmemcfg->setValue( velox::connector::hive::HiveConfig::kEnableFileHandleCache, veloxcfg->get(kVeloxFileHandleCacheEnabled, kVeloxFileHandleCacheEnabledDefault) ? "true" : "false"); @@ -279,6 +266,10 @@ void VeloxBackend::init(const std::unordered_map& conf velox::exec::Operator::registerOperator(std::make_unique()); initUdf(veloxcfg); + + if (veloxcfg->get(kDebugModeEnabled, false)) { + LOG(INFO) << "VeloxBackend config:" << printConfig(veloxcfg->valuesCopy()); + } } facebook::velox::cache::AsyncDataCache* VeloxBackend::getAsyncDataCache() const { diff --git a/cpp/velox/compute/VeloxBackend.h b/cpp/velox/compute/VeloxBackend.h index 36a7a75ff7d9..19aa17eca6b5 100644 --- a/cpp/velox/compute/VeloxBackend.h +++ b/cpp/velox/compute/VeloxBackend.h @@ -63,8 +63,6 @@ class VeloxBackend { void initJolFilesystem(const facebook::velox::Config* conf); - void printConf(const facebook::velox::Config& conf); - std::string getCacheFilePrefix() { return "cache." + boost::lexical_cast(boost::uuids::random_generator()()) + "."; } diff --git a/cpp/velox/compute/VeloxRuntime.cc b/cpp/velox/compute/VeloxRuntime.cc index 00524edc956b..31410e6f753c 100644 --- a/cpp/velox/compute/VeloxRuntime.cc +++ b/cpp/velox/compute/VeloxRuntime.cc @@ -16,37 +16,39 @@ */ #include "VeloxRuntime.h" + #include -#include "arrow/c/bridge.h" +#include "VeloxBackend.h" #include "compute/ResultIterator.h" #include "compute/Runtime.h" #include "compute/VeloxPlanConverter.h" #include "config/GlutenConfig.h" #include "operators/serializer/VeloxRowToColumnarConverter.h" +#include "shuffle/VeloxShuffleReader.h" #include "shuffle/VeloxShuffleWriter.h" +#include "utils/ConfigExtractor.h" using namespace facebook; namespace gluten { -namespace { +VeloxRuntime::VeloxRuntime(const std::unordered_map& confMap) : Runtime(confMap) {} -#ifdef GLUTEN_PRINT_DEBUG -void printSessionConf(const std::unordered_map& conf) { - std::ostringstream oss; - oss << "session conf = {\n"; - for (auto& [k, v] : conf) { - oss << " {" << k << " = " << v << "}\n"; +void VeloxRuntime::parsePlan(const uint8_t* data, int32_t size, SparkTaskInfo taskInfo) { + taskInfo_ = taskInfo; + if (getConfigValue(confMap_, kDebugModeEnabled, "false") == "true") { + try { + auto jsonPlan = substraitFromPbToJson("Plan", data, size); + LOG(INFO) << std::string(50, '#') << " received substrait::Plan:"; + LOG(INFO) << taskInfo_ << std::endl << jsonPlan; + } catch (const std::exception& e) { + LOG(WARNING) << "Error converting Substrait plan to JSON: " << e.what(); + } } - oss << "}\n"; - LOG(INFO) << oss.str(); -} -#endif -} // namespace - -VeloxRuntime::VeloxRuntime(const std::unordered_map& confMap) : Runtime(confMap) {} + GLUTEN_CHECK(parseProtobuf(data, size, &substraitPlan_) == true, "Parse substrait plan failed"); +} void VeloxRuntime::getInfoAndIds( const std::unordered_map>& splitInfoMap, @@ -82,9 +84,9 @@ std::shared_ptr VeloxRuntime::createResultIterator( const std::string& spillDir, const std::vector>& inputs, const std::unordered_map& sessionConf) { -#ifdef GLUTEN_PRINT_DEBUG - printSessionConf(sessionConf); -#endif + if (getConfigValue(confMap_, kDebugModeEnabled, "false") == "true") { + LOG(INFO) << "VeloxRuntime session config:" << printConfig(confMap_); + } VeloxPlanConverter veloxPlanConverter(inputs, getLeafVeloxPool(memoryManager).get(), sessionConf); veloxPlan_ = veloxPlanConverter.toVeloxPlan(substraitPlan_); diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h index 48309436c2c4..2d5d727624f2 100644 --- a/cpp/velox/compute/VeloxRuntime.h +++ b/cpp/velox/compute/VeloxRuntime.h @@ -25,8 +25,6 @@ #include "operators/writer/VeloxParquetDatasource.h" #include "shuffle/ShuffleReader.h" #include "shuffle/ShuffleWriter.h" -#include "shuffle/VeloxShuffleReader.h" -#include "utils/ResourceMap.h" namespace gluten { @@ -37,6 +35,8 @@ class VeloxRuntime final : public Runtime { public: explicit VeloxRuntime(const std::unordered_map& confMap); + void parsePlan(const uint8_t* data, int32_t size, SparkTaskInfo taskInfo) override; + static std::shared_ptr getAggregateVeloxPool(MemoryManager* memoryManager) { return toVeloxMemoryManager(memoryManager)->getAggregateMemoryPool(); } diff --git a/cpp/velox/tests/RuntimeTest.cc b/cpp/velox/tests/RuntimeTest.cc index f5447daade4b..c6af63b8e0c2 100644 --- a/cpp/velox/tests/RuntimeTest.cc +++ b/cpp/velox/tests/RuntimeTest.cc @@ -25,6 +25,8 @@ class DummyRuntime final : public Runtime { public: DummyRuntime(const std::unordered_map& conf) : Runtime(conf) {} + void parsePlan(const uint8_t* data, int32_t size, SparkTaskInfo taskInfo) override {} + std::shared_ptr createResultIterator( MemoryManager* memoryManager, const std::string& spillDir, diff --git a/gluten-core/src/main/scala/io/glutenproject/utils/GlutenDecimalUtil.scala b/gluten-core/src/main/scala/io/glutenproject/utils/GlutenDecimalUtil.scala deleted file mode 100644 index e35473835d78..000000000000 --- a/gluten-core/src/main/scala/io/glutenproject/utils/GlutenDecimalUtil.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.glutenproject.utils - -import org.apache.spark.sql.catalyst.expressions.aggregate.Average -import org.apache.spark.sql.types.{DataType, DecimalType, DoubleType} -import org.apache.spark.sql.types.DecimalType.{MAX_PRECISION, MAX_SCALE} - -import scala.math.min - -object GlutenDecimalUtil { - object Fixed { - def unapply(t: DecimalType): Option[(Int, Int)] = Some((t.precision, t.scale)) - } - - def bounded(precision: Int, scale: Int): DecimalType = { - DecimalType(min(precision, MAX_PRECISION), min(scale, MAX_SCALE)) - } - - def getAvgSumDataType(avg: Average): DataType = avg.dataType match { - // avg.dataType is Decimal(p + 4, s + 4) and sumType is Decimal(p + 10, s) - // we need to get sumType, so p = p - 4 + 10 and s = s - 4 - case _ @GlutenDecimalUtil.Fixed(p, s) => GlutenDecimalUtil.bounded(p - 4 + 10, s - 4) - case _ => DoubleType - } -} diff --git a/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala b/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala index 90a840ffd94c..6a7c84148f63 100644 --- a/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala +++ b/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala @@ -362,6 +362,8 @@ object GlutenConfig { // Pass through to native conf val GLUTEN_SAVE_DIR = "spark.gluten.saveDir" + val GLUTEN_DEBUG_MODE = "spark.gluten.sql.debug" + // Added back to Spark Conf during executor initialization val GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY = "spark.gluten.memory.offHeap.size.in.bytes" val GLUTEN_CONSERVATIVE_OFFHEAP_SIZE_IN_BYTES_KEY = @@ -426,6 +428,7 @@ object GlutenConfig { conf: scala.collection.Map[String, String]): util.Map[String, String] = { val nativeConfMap = new util.HashMap[String, String]() val keys = ImmutableList.of( + GLUTEN_DEBUG_MODE, GLUTEN_SAVE_DIR, GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY, GLUTEN_MAX_BATCH_SIZE_KEY, @@ -507,6 +510,7 @@ object GlutenConfig { keyWithDefault.forEach(e => nativeConfMap.put(e._1, conf.getOrElse(e._1, e._2))) val keys = ImmutableList.of( + GLUTEN_DEBUG_MODE, // datasource config SPARK_SQL_PARQUET_COMPRESSION_CODEC, // datasource config end @@ -1162,7 +1166,7 @@ object GlutenConfig { .createWithDefault("DEBUG") val DEBUG_LEVEL_ENABLED = - buildConf("spark.gluten.sql.debug") + buildConf(GLUTEN_DEBUG_MODE) .internal() .booleanConf .createWithDefault(false)