Skip to content

Commit

Permalink
[VL] Port #6746, #6627, #6318, #6397, #6326, #6363 to branch-1.2 (#6773)
Browse files Browse the repository at this point in the history
* [GLUTEN-6612] Fix ParquetFileFormat issue caused by the setting of local property isNativeApplicable (#6627)

* [CORE] Fix schema mismatch between ReadRelNode and LocalFilesNode (#6746)

Co-authored-by: 蒋添 <[email protected]>

* [UT] Test input_file_name, input_file_block_start & input_file_block_length when scan falls back (#6318)

* [VL] Fix E function fallback issue (#6397)

* [VL] Add Scala 2.13 support (#6326)

* [VL] Add Scala 2.13 support

* Fix scalaStyle issues

* Fix Scala Style issues

* Add Spark 3.5.1 and Scala 2.13 test in workflow

* Add run-spark-test-spark35-scala213 job

* Add Spark 3.5.1 and Scala 2.13 test in workflow

* Fix tests failures

* Fix tests failures

* ScalaStyle fix

* Fix SoftAffinitySuite

* Fix ArrowUtil error

* Fix backend-velox scala issues

* Fix ColumnarArrowEvalPythonExec issues

* Fix ColumnarArrowEvalPythonExec issues

* Fix TestOperator.scala for style issues

* Fix TestOperator.scala for style issues

* Fix issues in DeltaRewriteTransformerRules.scala

* DeltaRewriteTransformerRules fix

* Fix style issues

* Fix issues

* Fix issues

* Fix issues

* Fix issues

* Fix issues

* Fix issues

* Fix issues

---------

Co-authored-by: Hongze Zhang <[email protected]>

* [VL] Fix Alinux3 arrow build issue (#6363)

* update velox docker and port PR #6363 for get_velox.sh update

---------

Co-authored-by: PHILO-HE <[email protected]>
Co-authored-by: jiangjiangtian <[email protected]>
Co-authored-by: 蒋添 <[email protected]>
Co-authored-by: 高阳阳 <[email protected]>
Co-authored-by: Preetesh2110 <[email protected]>
Co-authored-by: Hongze Zhang <[email protected]>
Co-authored-by: Joey <[email protected]>
  • Loading branch information
8 people authored Aug 12, 2024
1 parent c9f3d89 commit 415c722
Show file tree
Hide file tree
Showing 47 changed files with 346 additions and 277 deletions.
70 changes: 69 additions & 1 deletion .github/workflows/velox_docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ concurrency:
jobs:
build-native-lib-centos-7:
runs-on: ubuntu-20.04
container: apache/gluten:gluten-vcpkg-builder_2024_05_29 # centos7 with dependencies installed
container: apache/gluten:gluten-vcpkg-builder_2024_08_05 # centos7 with dependencies installed
steps:
- uses: actions/checkout@v2
- name: Generate cache key
Expand Down Expand Up @@ -1097,6 +1097,74 @@ jobs:
name: golden-files-spark35
path: /tmp/tpch-approved-plan/**

run-spark-test-spark35-scala213:
needs: build-native-lib-centos-8
runs-on: ubuntu-20.04
container: ghcr.io/facebookincubator/velox-dev:centos8
env:
CCACHE_DIR: "${{ github.workspace }}/.ccache"
steps:
- uses: actions/checkout@v2
- name: Download All Artifacts
uses: actions/download-artifact@v2
with:
name: velox-native-lib-centos-8-${{github.sha}}
path: ./cpp/build/releases
- name: Download UDF Example Lib
uses: actions/download-artifact@v2
with:
name: udf-example-lib-centos-8-${{github.sha}}
path: ./cpp/build/velox/udf/examples/
- name: Download Arrow Jars
uses: actions/download-artifact@v2
with:
name: arrow-jars-centos-8-${{github.sha}}
path: /root/.m2/repository/org/apache/arrow/
- name: Update mirror list
run: |
sed -i -e "s|mirrorlist=|#mirrorlist=|g" /etc/yum.repos.d/CentOS-* || true
sed -i -e "s|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g" /etc/yum.repos.d/CentOS-* || true
- name: Setup build dependency
run: |
yum install sudo patch java-1.8.0-openjdk-devel wget -y
wget https://downloads.apache.org/maven/maven-3/3.8.8/binaries/apache-maven-3.8.8-bin.tar.gz
tar -xvf apache-maven-3.8.8-bin.tar.gz
mv apache-maven-3.8.8 /usr/lib/maven
echo "PATH=${PATH}:/usr/lib/maven/bin" >> $GITHUB_ENV
- name: Get Ccache
uses: actions/cache/restore@v3
with:
path: '${{ env.CCACHE_DIR }}'
key: ccache-centos-release-default
- name: Ensure Cache Dirs Exists
working-directory: ${{ github.workspace }}
run: |
mkdir -p '${{ env.CCACHE_DIR }}'
- name: Prepare spark.test.home for Spark 3.5.1 (other tests)
run: |
cd $GITHUB_WORKSPACE/ && \
wget https://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz && \
tar --strip-components=1 -xf spark-3.5.1-bin-hadoop3.tgz spark-3.5.1-bin-hadoop3/jars/ && \
rm -rf spark-3.5.1-bin-hadoop3.tgz && \
mkdir -p $GITHUB_WORKSPACE//shims/spark35/spark_home/assembly/target/scala-2.13 && \
mv jars $GITHUB_WORKSPACE//shims/spark35/spark_home/assembly/target/scala-2.13 && \
cd $GITHUB_WORKSPACE// && \
wget https://github.com/apache/spark/archive/refs/tags/v3.5.1.tar.gz && \
tar --strip-components=1 -xf v3.5.1.tar.gz spark-3.5.1/sql/core/src/test/resources/ && \
mkdir -p shims/spark35/spark_home/ && \
mv sql shims/spark35/spark_home/ && \
dnf module -y install python39 && \
alternatives --set python3 /usr/bin/python3.9 && \
pip3 install setuptools && \
pip3 install pyspark==3.5.1 cython && \
pip3 install pandas pyarrow
- name: Build and Run unit test for Spark 3.5.1 with scala-2.13 (other tests)
run: |
cd $GITHUB_WORKSPACE/
export SPARK_SCALA_VERSION=2.13
$MVN_CMD clean install -Pspark-3.5 -Pscala-2.13 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \
$MVN_CMD test -Pspark-3.5 -Pscala-2.13 -Pbackends-velox -Piceberg -Pdelta -DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest
run-spark-test-spark35-slow:
needs: build-native-lib-centos-8
runs-on: ubuntu-20.04
Expand Down
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
runner.dialect = scala212

# Version is required to make sure IntelliJ picks the right version
version = 3.5.9
version = 3.8.3
preset = default

# Max column
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ class ClickhouseOptimisticTransaction(
// 1. insert FakeRowAdaptor
// 2. DeltaInvariantCheckerExec transform
// 3. DeltaTaskStatisticsTracker collect null count / min values / max values
// 4. set the parameters 'staticPartitionWriteOnly', 'isNativeAppliable',
// 4. set the parameters 'staticPartitionWriteOnly', 'isNativeApplicable',
// 'nativeFormat' in the LocalProperty of the sparkcontext
super.writeFiles(inputData, writeOptions, additionalConstraints)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ class ClickhouseOptimisticTransaction(
// 1. insert FakeRowAdaptor
// 2. DeltaInvariantCheckerExec transform
// 3. DeltaTaskStatisticsTracker collect null count / min values / max values
// 4. set the parameters 'staticPartitionWriteOnly', 'isNativeAppliable',
// 4. set the parameters 'staticPartitionWriteOnly', 'isNativeApplicable',
// 'nativeFormat' in the LocalProperty of the sparkcontext
super.writeFiles(inputData, writeOptions, additionalConstraints)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class ClickhouseOptimisticTransaction(
// 1. insert FakeRowAdaptor
// 2. DeltaInvariantCheckerExec transform
// 3. DeltaTaskStatisticsTracker collect null count / min values / max values
// 4. set the parameters 'staticPartitionWriteOnly', 'isNativeAppliable',
// 4. set the parameters 'staticPartitionWriteOnly', 'isNativeApplicable',
// 'nativeFormat' in the LocalProperty of the sparkcontext
super.writeFiles(inputData, writeOptions, additionalConstraints)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,9 @@ case class CHHashAggregateExecTransformer(
// Use approxPercentile.nullable as the nullable of the struct type
// to make sure it returns null when input is empty
fields = fields :+ (approxPercentile.child.dataType, approxPercentile.nullable)
fields = fields :+ (approxPercentile.percentageExpression.dataType,
approxPercentile.percentageExpression.nullable)
fields = fields :+ (
approxPercentile.percentageExpression.dataType,
approxPercentile.percentageExpression.nullable)
(makeStructType(fields), attr.nullable)
case _ =>
(makeStructTypeSingleOne(attr.dataType, attr.nullable), attr.nullable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,13 +391,13 @@ class GlutenParquetFilterSuite
'p_size.int >= 1,
'p_partkey.long.isNotNull,
('p_brand.string === "Brand#12" &&
('p_container.string in ("SM CASE", "SM BOX", "SM PACK", "SM PKG")) &&
('p_container.string.in("SM CASE", "SM BOX", "SM PACK", "SM PKG")) &&
'p_size.int <= 5) ||
('p_brand.string === "Brand#23" &&
('p_container.string in ("MED BAG", "MED BOX", "MED PKG", "MED PACK")) &&
('p_container.string.in("MED BAG", "MED BOX", "MED PKG", "MED PACK")) &&
'p_size.int <= 10) ||
('p_brand.string === "Brand#34" &&
('p_container.string in ("LG CASE", "LG BOX", "LG PACK", "LG PKG")) &&
('p_container.string.in("LG CASE", "LG BOX", "LG PACK", "LG PKG")) &&
'p_size.int <= 15)
)
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ object CHStorageJoinBenchmark extends SqlBasedBenchmark with CHSqlBasedBenchmark
_numRows += batch.numRows
}
Iterator((_numRows, blockNativeWriter.collectAsByteArray()))
// Iterator((_numRows, new Array[Byte](0)))
// Iterator((_numRows, new Array[Byte](0)))
}
.collect
val count0 = countsAndBytes.map(_._1).sum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat.{DwrfReadFormat, OrcReadFormat, ParquetReadFormat}

import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{Alias, CumeDist, DenseRank, Descending, Expression, Lag, Lead, Literal, MakeYMInterval, NamedExpression, NthValue, NTile, PercentRank, Rand, RangeFrame, Rank, RowNumber, SortOrder, SparkPartitionID, SpecialFrameBoundary, SpecifiedWindowFrame, Uuid}
import org.apache.spark.sql.catalyst.expressions.{Alias, CumeDist, DenseRank, Descending, EulerNumber, Expression, Lag, Lead, Literal, MakeYMInterval, NamedExpression, NthValue, NTile, PercentRank, Rand, RangeFrame, Rank, RowNumber, SortOrder, SparkPartitionID, SpecialFrameBoundary, SpecifiedWindowFrame, Uuid}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, ApproximatePercentile, Count, Sum}
import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
Expand Down Expand Up @@ -430,7 +430,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
expr match {
// Block directly falling back the below functions by FallbackEmptySchemaRelation.
case alias: Alias => checkExpr(alias.child)
case _: Rand | _: Uuid | _: MakeYMInterval | _: SparkPartitionID => true
case _: Rand | _: Uuid | _: MakeYMInterval | _: SparkPartitionID | _: EulerNumber => true
case _ => false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ object PullOutGenerateProjectHelper extends PullOutProjectHelper {
}
}

newProjections += Alias(CreateArray(fieldArray), generatePreAliasName)()
newProjections += Alias(CreateArray(fieldArray.toSeq), generatePreAliasName)()
}

// Plug in a Project between Generate and its child.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,8 @@ abstract class HashAggregateExecTransformer(
childNodes.add(expressionNode)
}
}
exprNodes.add(getRowConstructNode(args, childNodes, newInputAttributes, aggFunc))
exprNodes.add(
getRowConstructNode(args, childNodes, newInputAttributes.toSeq, aggFunc))
case other =>
throw new GlutenNotSupportException(s"$other is not supported.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import java.io.{DataInputStream, DataOutputStream}
import java.net.Socket
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.{mutable, Seq}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

class ColumnarArrowPythonRunner(
Expand All @@ -54,7 +54,7 @@ class ColumnarArrowPythonRunner(
schema: StructType,
timeZoneId: String,
conf: Map[String, String])
extends BasePythonRunnerShim(funcs, evalType, argOffsets) {
extends BasePythonRunnerShim(funcs.toSeq, evalType, argOffsets) {

override val simplifiedTraceback: Boolean = SQLConf.get.pysparkSimplifiedTraceback

Expand Down Expand Up @@ -239,7 +239,7 @@ case class ColumnarArrowEvalPythonExec(
val arrowSafeTypeCheck = Seq(
SQLConf.PANDAS_ARROW_SAFE_TYPE_CONVERSION.key ->
conf.arrowSafeTypeConversion.toString)
Map(timeZoneConf ++ pandasColsByName ++ arrowSafeTypeCheck: _*)
Map(timeZoneConf.toSeq ++ pandasColsByName.toSeq ++ arrowSafeTypeCheck: _*)
}

private val pythonRunnerConf = getPythonRunnerConfMap(conf)
Expand Down Expand Up @@ -280,7 +280,7 @@ case class ColumnarArrowEvalPythonExec(
case children =>
// There should not be any other UDFs, or the children can't be evaluated directly.
assert(children.forall(_.find(_.isInstanceOf[PythonUDF]).isEmpty))
(ChainedPythonFunctions(Seq(udf.func)), udf.children)
(ChainedPythonFunctions(Seq(udf.func).toSeq), udf.children)
}
}

Expand Down Expand Up @@ -410,7 +410,7 @@ object PullOutArrowEvalPythonPreProjectHelper extends PullOutProjectHelper {
val (chained, children) = collectFunctions(u)
(ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children)
case children =>
(ChainedPythonFunctions(Seq(udf.func)), udf.children)
(ChainedPythonFunctions(Seq(udf.func).toSeq), udf.children)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,12 @@ case class UDFExpression(
object UDFResolver extends Logging {
private val UDFNames = mutable.HashSet[String]()
// (udf_name, arg1, arg2, ...) => return type
private val UDFMap = mutable.HashMap[String, mutable.MutableList[UDFSignature]]()
private val UDFMap = mutable.HashMap[String, mutable.ListBuffer[UDFSignature]]()

private val UDAFNames = mutable.HashSet[String]()
// (udaf_name, arg1, arg2, ...) => return type, intermediate attributes
private val UDAFMap =
mutable.HashMap[String, mutable.MutableList[UDAFSignature]]()
mutable.HashMap[String, mutable.ListBuffer[UDAFSignature]]()

private val LIB_EXTENSION = ".so"

Expand All @@ -145,7 +145,7 @@ object UDFResolver extends Logging {
variableArity: Boolean): Unit = {
assert(argTypes.dataType.isInstanceOf[StructType])
val v =
UDFMap.getOrElseUpdate(name, mutable.MutableList[UDFSignature]())
UDFMap.getOrElseUpdate(name, mutable.ListBuffer[UDFSignature]())
v += UDFSignature(
returnType,
argTypes.dataType.asInstanceOf[StructType].fields.map(_.dataType),
Expand Down Expand Up @@ -189,7 +189,7 @@ object UDFResolver extends Logging {
}

val v =
UDAFMap.getOrElseUpdate(name, mutable.MutableList[UDAFSignature]())
UDAFMap.getOrElseUpdate(name, mutable.ListBuffer[UDAFSignature]())
v += UDAFSignature(
returnType,
argTypes.dataType.asInstanceOf[StructType].fields.map(_.dataType),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,16 @@ class ScalarFunctionsValidateSuite extends FunctionsValidateTest {
}
}

test("Test E function") {
runQueryAndCompare("""SELECT E() from lineitem limit 100""".stripMargin) {
checkGlutenOperatorMatch[ProjectExecTransformer]
}
runQueryAndCompare("""SELECT E(), l_orderkey
| from lineitem limit 100""".stripMargin) {
checkGlutenOperatorMatch[ProjectExecTransformer]
}
}

test("Test spark_partition_id function") {
runQueryAndCompare("""SELECT spark_partition_id(), l_orderkey
| from lineitem limit 100""".stripMargin) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1458,9 +1458,11 @@ class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPla
path =>
(0 to 3).toDF("x").write.parquet(path.getCanonicalPath)
spark.read.parquet(path.getCanonicalPath).createOrReplaceTempView("view")
runQueryAndCompare(
"SELECT x FROM view WHERE cast(x as timestamp) " +
"IN ('1970-01-01 08:00:00.001','1970-01-01 08:00:00.2')")(_)
runQueryAndCompare(s"""
|SELECT x FROM view
|WHERE cast(x as timestamp)
|IN ('1970-01-01 08:00:00.001','1970-01-01 08:00:00.2')
|""".stripMargin)(_ => ())
}
}

Expand Down
1 change: 0 additions & 1 deletion ep/build-velox/src/get_velox.sh
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,6 @@ function process_setup_alinux3 {
sed -i 's/python39 python39-devel python39-pip //g' scripts/setup-centos8.sh
sed -i "s/.*pip.* install/#&/" scripts/setup-centos8.sh
sed -i 's/ADDITIONAL_FLAGS=""/ADDITIONAL_FLAGS="-Wno-stringop-overflow"/g' scripts/setup-helper-functions.sh
sed -i "s/\${CMAKE_INSTALL_LIBDIR}/lib64/" third_party/CMakeLists.txt
}

function process_setup_tencentos32 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ private NamedStruct buildNamedStruct() {
for (StructField field : fileSchema.fields()) {
structBuilder.addTypes(
ConverterUtils.getTypeNode(field.dataType(), field.nullable()).toProtobuf());
namedStructBuilder.addNames(field.name());
namedStructBuilder.addNames(ConverterUtils.normalizeColName(field.name()));
}
namedStructBuilder.setStruct(structBuilder.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,8 @@ object ExpressionConverter extends SQLConfHelper with Logging {
t.children.map(replaceWithExpressionTransformerInternal(_, attributeSeq, expressionsMap)),
t
)
case e: EulerNumber =>
LiteralTransformer(Literal(Math.E))
case expr =>
GenericExpressionTransformer(
substraitExprName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,27 +163,25 @@ object GlutenWriterColumnarRules {
BackendsApiManager.getSettings.enableNativeWriteFiles() =>
injectFakeRowAdaptor(rc, rc.child)
case rc @ DataWritingCommandExec(cmd, child) =>
// These properties can be set by the same thread in last query submission.
session.sparkContext.setLocalProperty("isNativeApplicable", null)
session.sparkContext.setLocalProperty("nativeFormat", null)
session.sparkContext.setLocalProperty("staticPartitionWriteOnly", null)
if (BackendsApiManager.getSettings.supportNativeWrite(child.output.toStructType.fields)) {
val format = getNativeFormat(cmd)
session.sparkContext.setLocalProperty(
"staticPartitionWriteOnly",
BackendsApiManager.getSettings.staticPartitionWriteOnly().toString)
// FIXME: We should only use context property if having no other approaches.
// Should see if there is another way to pass these options.
session.sparkContext.setLocalProperty("isNativeAppliable", format.isDefined.toString)
session.sparkContext.setLocalProperty("isNativeApplicable", format.isDefined.toString)
session.sparkContext.setLocalProperty("nativeFormat", format.getOrElse(""))
if (format.isDefined) {
injectFakeRowAdaptor(rc, child)
} else {
rc.withNewChildren(rc.children.map(apply))
}
} else {
session.sparkContext.setLocalProperty(
"staticPartitionWriteOnly",
BackendsApiManager.getSettings.staticPartitionWriteOnly().toString)
session.sparkContext.setLocalProperty("isNativeAppliable", "false")
session.sparkContext.setLocalProperty("nativeFormat", "")

rc.withNewChildren(rc.children.map(apply))
}
case plan: SparkPlan => plan.withNewChildren(plan.children.map(apply))
Expand Down
Loading

0 comments on commit 415c722

Please sign in to comment.