diff --git a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala index 0794b45158e6..4133b5c605b8 100644 --- a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala +++ b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala @@ -116,11 +116,13 @@ class ClickhouseOptimisticTransaction( var options = writeOptions match { case None => Map.empty[String, String] case Some(writeOptions) => - writeOptions.options.filterKeys { - key => - key.equalsIgnoreCase(DeltaOptions.MAX_RECORDS_PER_FILE) || - key.equalsIgnoreCase(DeltaOptions.COMPRESSION) - }.toMap + writeOptions.options + .filterKeys { + key => + key.equalsIgnoreCase(DeltaOptions.MAX_RECORDS_PER_FILE) || + key.equalsIgnoreCase(DeltaOptions.COMPRESSION) + } + .map(identity) } spark.conf.getAll.foreach( diff --git a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala index 0794b45158e6..4133b5c605b8 100644 --- a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala +++ b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala @@ -116,11 +116,13 @@ class ClickhouseOptimisticTransaction( var options = writeOptions match { case None => Map.empty[String, String] case Some(writeOptions) => - writeOptions.options.filterKeys { - key => - key.equalsIgnoreCase(DeltaOptions.MAX_RECORDS_PER_FILE) || - key.equalsIgnoreCase(DeltaOptions.COMPRESSION) - }.toMap + writeOptions.options + .filterKeys { + key => + key.equalsIgnoreCase(DeltaOptions.MAX_RECORDS_PER_FILE) || + key.equalsIgnoreCase(DeltaOptions.COMPRESSION) + } + .map(identity) } spark.conf.getAll.foreach( diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala index 43e0627dffef..0110d085b98c 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala @@ -84,11 +84,11 @@ class CHListenerApi extends ListenerApi with Logging { s".max_bytes_before_external_sort" if (conf.getLong(externalSortKey, -1) < 0) { if (conf.getBoolean("spark.memory.offHeap.enabled", false)) { - val memSize = JavaUtils.byteStringAsBytes(conf.get("spark.memory.offHeap.size")).toInt - if (memSize > 0) { - val cores = conf.getInt("spark.executor.cores", 1) - val sortMemLimit = ((memSize / cores) * 0.8).toInt - logInfo(s"max memory for sorting: $sortMemLimit") + val memSize = JavaUtils.byteStringAsBytes(conf.get("spark.memory.offHeap.size")) + if (memSize > 0L) { + val cores = conf.getInt("spark.executor.cores", 1).toLong + val sortMemLimit = ((memSize / cores) * 0.8).toLong + logDebug(s"max memory for sorting: $sortMemLimit") conf.set(externalSortKey, sortMemLimit.toString) } } diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala index ac6ac959f97c..228dc9feb377 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala @@ -231,6 +231,7 @@ object MergeTreePartsPartitionsUtil extends Logging { size * part.size / part.marks) } } + .sortBy(_.bytesOnDisk)(implicitly[Ordering[Long]].reverse) var currentSize = 0L val currentFiles = new ArrayBuffer[MergeTreePartSplit] diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala index 6c3d7dea0527..129f5405c28f 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala @@ -282,8 +282,8 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite .format("clickhouse") .load(dataPath) .where("l_shipdate = date'1998-09-02'") - .collect() - assertResult(110501)(result.apply(0).get(0)) + .count() + assertResult(183L)(result) } test("test mergetree path based insert overwrite partitioned table with small table, static") { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteTaskNotSerializableSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteTaskNotSerializableSuite.scala new file mode 100644 index 000000000000..e8550fb32dd9 --- /dev/null +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteTaskNotSerializableSuite.scala @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings + +import org.apache.spark.SparkConf +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper + +// Some sqls' line length exceeds 100 +// scalastyle:off line.size.limit + +class GlutenClickHouseMergeTreeWriteTaskNotSerializableSuite + extends GlutenClickHouseTPCHAbstractSuite + with AdaptiveSparkPlanHelper { + + override protected val needCopyParquetToTablePath = true + + override protected val tablesPath: String = basePath + "/tpch-data" + override protected val tpchQueries: String = rootPath + "queries/tpch-queries-ch" + override protected val queriesResults: String = rootPath + "mergetree-queries-output" + + /** Run Gluten + ClickHouse Backend with SortShuffleManager */ + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + .set("spark.io.compression.codec", "LZ4") + .set("spark.sql.shuffle.partitions", "5") + .set("spark.sql.autoBroadcastJoinThreshold", "10MB") + .set("spark.sql.adaptive.enabled", "true") + .set("spark.sql.files.maxPartitionBytes", "20000000") + .set("spark.memory.offHeap.size", "4G") + } + + override protected def createTPCHNotNullTables(): Unit = { + createNotNullTPCHTablesInParquet(tablesPath) + } + + test("GLUTEN-6470: Fix Task not serializable error when inserting mergetree data") { + + val externalSortKey = s"${CHBackendSettings.getBackendConfigPrefix}.runtime_settings" + + s".max_bytes_before_external_sort" + assertResult(3435973836L)(spark.conf.get(externalSortKey).toLong) + + spark.sql(s""" + |DROP TABLE IF EXISTS lineitem_task_not_serializable; + |""".stripMargin) + + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS lineitem_task_not_serializable + |( + | l_orderkey bigint, + | l_partkey bigint, + | l_suppkey bigint, + | l_linenumber bigint, + | l_quantity double, + | l_extendedprice double, + | l_discount double, + | l_tax double, + | l_returnflag string, + | l_linestatus string, + | l_shipdate date, + | l_commitdate date, + | l_receiptdate date, + | l_shipinstruct string, + | l_shipmode string, + | l_comment string + |) + |USING clickhouse + |LOCATION '$basePath/lineitem_task_not_serializable' + |""".stripMargin) + + spark.sql(s""" + | insert into table lineitem_task_not_serializable + | select * from lineitem + |""".stripMargin) + + val sqlStr = + s""" + |SELECT + | l_returnflag, + | l_linestatus, + | sum(l_quantity) AS sum_qty, + | sum(l_extendedprice) AS sum_base_price, + | sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price, + | sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge, + | avg(l_quantity) AS avg_qty, + | avg(l_extendedprice) AS avg_price, + | avg(l_discount) AS avg_disc, + | count(*) AS count_order + |FROM + | lineitem_task_not_serializable + |WHERE + | l_shipdate <= date'1998-09-02' - interval 1 day + |GROUP BY + | l_returnflag, + | l_linestatus + |ORDER BY + | l_returnflag, + | l_linestatus; + | + |""".stripMargin + runTPCHQueryBySQL(1, sqlStr)(_ => {}) + } +} +// scalastyle:off line.size.limit diff --git a/gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala b/gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala index 86133542e269..cfdce536062e 100644 --- a/gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala +++ b/gluten-core/src/main/scala/org/apache/spark/softaffinity/SoftAffinity.scala @@ -39,7 +39,7 @@ abstract class Affinity(val manager: AffinityManager) extends LogLevelUtil with filePaths: Array[String], preferredLocations: Array[String]): Array[String] = { if (shouldUseSoftAffinity(filePaths, preferredLocations)) { - internalGetHostLocations(filePaths.min) + internalGetHostLocations(filePaths(0)) } else { preferredLocations }