diff --git a/core/src/main/scala/com/pingcap/tispark/write/TiBatchWrite.scala b/core/src/main/scala/com/pingcap/tispark/write/TiBatchWrite.scala index 69cad71b18..3cf2a56b8e 100644 --- a/core/src/main/scala/com/pingcap/tispark/write/TiBatchWrite.scala +++ b/core/src/main/scala/com/pingcap/tispark/write/TiBatchWrite.scala @@ -16,8 +16,8 @@ package com.pingcap.tispark.write import com.pingcap.tikv.exception.TiBatchWriteException -import com.pingcap.tikv.util.ConcreteBackOffer -import com.pingcap.tikv.{TTLManager, TiDBJDBCClient, _} +import com.pingcap.tikv.util.{BackOffFunction, BackOffer, ConcreteBackOffer} +import com.pingcap.tikv.{TTLManager, TiDBJDBCClient, TwoPhaseCommitter, _} import com.pingcap.tispark.TiDBUtils import com.pingcap.tispark.utils.TiUtil import org.apache.spark.SparkConf @@ -306,43 +306,7 @@ class TiBatchWrite( logger.info("prewriteSecondaryKeys success") // driver primary commit - val commitTs = tiSession.getTimestamp.getVersion - // check commitTS - if (commitTs <= startTs) { - throw new TiBatchWriteException( - s"invalid transaction tso with startTs=$startTs, commitTs=$commitTs") - } - - // for test - if (options.sleepAfterPrewriteSecondaryKey > 0) { - logger.info(s"sleep ${options.sleepAfterPrewriteSecondaryKey} ms for test") - Thread.sleep(options.sleepAfterPrewriteSecondaryKey) - } - - // check schema change - if (!useTableLock) { - tiBatchWriteTables.foreach(_.checkSchemaChange()) - } - - // for test - if (options.sleepAfterGetCommitTS > 0) { - logger.info(s"sleep ${options.sleepAfterGetCommitTS} ms for test") - Thread.sleep(options.sleepAfterGetCommitTS) - } - - val commitPrimaryBackoff = ConcreteBackOffer.newCustomBackOff(PRIMARY_KEY_COMMIT_BACKOFF) - - // check connection lost if using lock table - checkConnectionLost() - - logger.info("start to commitPrimaryKey") - ti2PCClient.commitPrimaryKey(commitPrimaryBackoff, primaryKey.bytes, commitTs) - try { - ti2PCClient.close() - } catch { - case _: Throwable => - } - logger.info("commitPrimaryKey success") + val commitTs = commitPrimaryKeyWithRetry(startTs, primaryKey, ti2PCClient) // stop primary key ttl update if (isTTLUpdate) { @@ -394,6 +358,70 @@ class TiBatchWrite( logger.info(s"batch write cost ${(endMS - startMS) / 1000} seconds") } + private def commitPrimaryKeyWithRetry( + startTs: Long, + primaryKey: SerializableKey, + ti2PCClient: TwoPhaseCommitter): Long = { + val backoff = ConcreteBackOffer.newCustomBackOff(options.commitPrimaryKeyBackOfferMS) + while (true) { + try { + return commitPrimaryKey(startTs, primaryKey, ti2PCClient) + } catch { + case e: TiBatchWriteException => + throw e + case e: Throwable => + backoff.doBackOff( + BackOffFunction.BackOffFuncType.BoRegionMiss, + new TiBatchWriteException("commit primary key error", e)) + } + } + 0L + } + + private def commitPrimaryKey( + startTs: Long, + primaryKey: SerializableKey, + ti2PCClient: TwoPhaseCommitter): Long = { + val commitTsAttempt = tiSession.getTimestamp.getVersion + // check commitTS + if (commitTsAttempt <= startTs) { + throw new TiBatchWriteException( + s"invalid transaction tso with startTs=$startTs, commitTsAttempt=$commitTsAttempt") + } + + // for test + if (options.sleepAfterPrewriteSecondaryKey > 0) { + logger.info(s"sleep ${options.sleepAfterPrewriteSecondaryKey} ms for test") + Thread.sleep(options.sleepAfterPrewriteSecondaryKey) + } + + // check schema change + if (!useTableLock) { + tiBatchWriteTables.foreach(_.checkSchemaChange()) + } + + // for test + if (options.sleepAfterGetCommitTS > 0) { + logger.info(s"sleep ${options.sleepAfterGetCommitTS} ms for test") + Thread.sleep(options.sleepAfterGetCommitTS) + } + + val commitPrimaryBackoff = ConcreteBackOffer.newCustomBackOff(PRIMARY_KEY_COMMIT_BACKOFF) + + // check connection lost if using lock table + checkConnectionLost() + + logger.info(s"start to commitPrimaryKey, commitTsAttempt=$commitTsAttempt") + ti2PCClient.commitPrimaryKey(commitPrimaryBackoff, primaryKey.bytes, commitTsAttempt) + try { + ti2PCClient.close() + } catch { + case _: Throwable => + } + logger.info("commitPrimaryKey success") + commitTsAttempt + } + private def getRegionSplitPoints( rdd: RDD[(SerializableKey, Array[Byte])]): List[SerializableKey] = { val count = rdd.count() diff --git a/core/src/main/scala/com/pingcap/tispark/write/TiDBOptions.scala b/core/src/main/scala/com/pingcap/tispark/write/TiDBOptions.scala index 3413da6671..f91759ffcf 100644 --- a/core/src/main/scala/com/pingcap/tispark/write/TiDBOptions.scala +++ b/core/src/main/scala/com/pingcap/tispark/write/TiDBOptions.scala @@ -84,6 +84,8 @@ class TiDBOptions(@transient val parameters: CaseInsensitiveMap[String]) extends val retryCommitSecondaryKey: Boolean = getOrDefault(TIDB_RETRY_COMMIT_SECONDARY_KEY, "true").toBoolean val prewriteMaxRetryTimes: Int = getOrDefault(TIDB_PREWRITE_MAX_RETRY_TIMES, "64").toInt + val commitPrimaryKeyBackOfferMS: Int = + getOrDefault(TIDB_COMMIT_PRIMARY_KEY_BACKOFFER_MS, "120000").toInt // region split val enableRegionSplit: Boolean = getOrDefault(TIDB_ENABLE_REGION_SPLIT, "true").toBoolean @@ -199,6 +201,7 @@ object TiDBOptions { val TIDB_WRITE_THREAD_PER_TASK: String = newOption("writeThreadPerTask") val TIDB_RETRY_COMMIT_SECONDARY_KEY: String = newOption("retryCommitSecondaryKey") val TIDB_PREWRITE_MAX_RETRY_TIMES: String = newOption("prewriteMaxRetryTimes") + val TIDB_COMMIT_PRIMARY_KEY_BACKOFFER_MS: String = newOption("commitPrimaryKeyBackOfferMS") // region split val TIDB_ENABLE_REGION_SPLIT: String = newOption("enableRegionSplit")