Skip to content

Commit

Permalink
[GLUTEN-6470][CH]Fix Task not serializable error when inserting merge…
Browse files Browse the repository at this point in the history
…tree data (#6473)

* [GLUTEN-6470][CH]Fix Task not serializable error when inserting mergetree data

When inserting mergetree data, it occurs the Task not serializable error in some cases.

RC:
In the Delta, the options of the `DeltaOptions` is a `CaseInsensitiveMap`, if calling the api `filterKeys()` of the `CaseInsensitiveMap`, it may become not serializable, so lead to this error.

Close #6470.

* fix ut

* add ut
  • Loading branch information
zzcclp authored Jul 17, 2024
1 parent 6b4e5a8 commit 48788c4
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,13 @@ class ClickhouseOptimisticTransaction(
var options = writeOptions match {
case None => Map.empty[String, String]
case Some(writeOptions) =>
writeOptions.options.filterKeys {
key =>
key.equalsIgnoreCase(DeltaOptions.MAX_RECORDS_PER_FILE) ||
key.equalsIgnoreCase(DeltaOptions.COMPRESSION)
}.toMap
writeOptions.options
.filterKeys {
key =>
key.equalsIgnoreCase(DeltaOptions.MAX_RECORDS_PER_FILE) ||
key.equalsIgnoreCase(DeltaOptions.COMPRESSION)
}
.map(identity)
}

spark.conf.getAll.foreach(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,13 @@ class ClickhouseOptimisticTransaction(
var options = writeOptions match {
case None => Map.empty[String, String]
case Some(writeOptions) =>
writeOptions.options.filterKeys {
key =>
key.equalsIgnoreCase(DeltaOptions.MAX_RECORDS_PER_FILE) ||
key.equalsIgnoreCase(DeltaOptions.COMPRESSION)
}.toMap
writeOptions.options
.filterKeys {
key =>
key.equalsIgnoreCase(DeltaOptions.MAX_RECORDS_PER_FILE) ||
key.equalsIgnoreCase(DeltaOptions.COMPRESSION)
}
.map(identity)
}

spark.conf.getAll.foreach(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,11 @@ class CHListenerApi extends ListenerApi with Logging {
s".max_bytes_before_external_sort"
if (conf.getLong(externalSortKey, -1) < 0) {
if (conf.getBoolean("spark.memory.offHeap.enabled", false)) {
val memSize = JavaUtils.byteStringAsBytes(conf.get("spark.memory.offHeap.size")).toInt
if (memSize > 0) {
val cores = conf.getInt("spark.executor.cores", 1)
val sortMemLimit = ((memSize / cores) * 0.8).toInt
logInfo(s"max memory for sorting: $sortMemLimit")
val memSize = JavaUtils.byteStringAsBytes(conf.get("spark.memory.offHeap.size"))
if (memSize > 0L) {
val cores = conf.getInt("spark.executor.cores", 1).toLong
val sortMemLimit = ((memSize / cores) * 0.8).toLong
logDebug(s"max memory for sorting: $sortMemLimit")
conf.set(externalSortKey, sortMemLimit.toString)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
size * part.size / part.marks)
}
}
.sortBy(_.bytesOnDisk)(implicitly[Ordering[Long]].reverse)

var currentSize = 0L
val currentFiles = new ArrayBuffer[MergeTreePartSplit]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,8 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite
.format("clickhouse")
.load(dataPath)
.where("l_shipdate = date'1998-09-02'")
.collect()
assertResult(110501)(result.apply(0).get(0))
.count()
assertResult(183L)(result)
}

test("test mergetree path based insert overwrite partitioned table with small table, static") {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings

import org.apache.spark.SparkConf
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper

// Some sqls' line length exceeds 100
// scalastyle:off line.size.limit

class GlutenClickHouseMergeTreeWriteTaskNotSerializableSuite
extends GlutenClickHouseTPCHAbstractSuite
with AdaptiveSparkPlanHelper {

override protected val needCopyParquetToTablePath = true

override protected val tablesPath: String = basePath + "/tpch-data"
override protected val tpchQueries: String = rootPath + "queries/tpch-queries-ch"
override protected val queriesResults: String = rootPath + "mergetree-queries-output"

/** Run Gluten + ClickHouse Backend with SortShuffleManager */
override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager")
.set("spark.io.compression.codec", "LZ4")
.set("spark.sql.shuffle.partitions", "5")
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.sql.files.maxPartitionBytes", "20000000")
.set("spark.memory.offHeap.size", "4G")
}

override protected def createTPCHNotNullTables(): Unit = {
createNotNullTPCHTablesInParquet(tablesPath)
}

test("GLUTEN-6470: Fix Task not serializable error when inserting mergetree data") {

val externalSortKey = s"${CHBackendSettings.getBackendConfigPrefix}.runtime_settings" +
s".max_bytes_before_external_sort"
assertResult(3435973836L)(spark.conf.get(externalSortKey).toLong)

spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_task_not_serializable;
|""".stripMargin)

spark.sql(s"""
|CREATE TABLE IF NOT EXISTS lineitem_task_not_serializable
|(
| l_orderkey bigint,
| l_partkey bigint,
| l_suppkey bigint,
| l_linenumber bigint,
| l_quantity double,
| l_extendedprice double,
| l_discount double,
| l_tax double,
| l_returnflag string,
| l_linestatus string,
| l_shipdate date,
| l_commitdate date,
| l_receiptdate date,
| l_shipinstruct string,
| l_shipmode string,
| l_comment string
|)
|USING clickhouse
|LOCATION '$basePath/lineitem_task_not_serializable'
|""".stripMargin)

spark.sql(s"""
| insert into table lineitem_task_not_serializable
| select * from lineitem
|""".stripMargin)

val sqlStr =
s"""
|SELECT
| l_returnflag,
| l_linestatus,
| sum(l_quantity) AS sum_qty,
| sum(l_extendedprice) AS sum_base_price,
| sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
| sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge,
| avg(l_quantity) AS avg_qty,
| avg(l_extendedprice) AS avg_price,
| avg(l_discount) AS avg_disc,
| count(*) AS count_order
|FROM
| lineitem_task_not_serializable
|WHERE
| l_shipdate <= date'1998-09-02' - interval 1 day
|GROUP BY
| l_returnflag,
| l_linestatus
|ORDER BY
| l_returnflag,
| l_linestatus;
|
|""".stripMargin
runTPCHQueryBySQL(1, sqlStr)(_ => {})
}
}
// scalastyle:off line.size.limit
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ abstract class Affinity(val manager: AffinityManager) extends LogLevelUtil with
filePaths: Array[String],
preferredLocations: Array[String]): Array[String] = {
if (shouldUseSoftAffinity(filePaths, preferredLocations)) {
internalGetHostLocations(filePaths.min)
internalGetHostLocations(filePaths(0))
} else {
preferredLocations
}
Expand Down

0 comments on commit 48788c4

Please sign in to comment.