Skip to content

Commit

Permalink
BatchWrite: fix commit_ts_expired (#1661)
Browse files Browse the repository at this point in the history
  • Loading branch information
marsishandsome authored Oct 27, 2020
1 parent d23fcd4 commit 6dd2b93
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 39 deletions.
106 changes: 67 additions & 39 deletions core/src/main/scala/com/pingcap/tispark/write/TiBatchWrite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
package com.pingcap.tispark.write

import com.pingcap.tikv.exception.TiBatchWriteException
import com.pingcap.tikv.util.ConcreteBackOffer
import com.pingcap.tikv.{TTLManager, TiDBJDBCClient, _}
import com.pingcap.tikv.util.{BackOffFunction, BackOffer, ConcreteBackOffer}
import com.pingcap.tikv.{TTLManager, TiDBJDBCClient, TwoPhaseCommitter, _}
import com.pingcap.tispark.TiDBUtils
import com.pingcap.tispark.utils.TiUtil
import org.apache.spark.SparkConf
Expand Down Expand Up @@ -306,43 +306,7 @@ class TiBatchWrite(
logger.info("prewriteSecondaryKeys success")

// driver primary commit
val commitTs = tiSession.getTimestamp.getVersion
// check commitTS
if (commitTs <= startTs) {
throw new TiBatchWriteException(
s"invalid transaction tso with startTs=$startTs, commitTs=$commitTs")
}

// for test
if (options.sleepAfterPrewriteSecondaryKey > 0) {
logger.info(s"sleep ${options.sleepAfterPrewriteSecondaryKey} ms for test")
Thread.sleep(options.sleepAfterPrewriteSecondaryKey)
}

// check schema change
if (!useTableLock) {
tiBatchWriteTables.foreach(_.checkSchemaChange())
}

// for test
if (options.sleepAfterGetCommitTS > 0) {
logger.info(s"sleep ${options.sleepAfterGetCommitTS} ms for test")
Thread.sleep(options.sleepAfterGetCommitTS)
}

val commitPrimaryBackoff = ConcreteBackOffer.newCustomBackOff(PRIMARY_KEY_COMMIT_BACKOFF)

// check connection lost if using lock table
checkConnectionLost()

logger.info("start to commitPrimaryKey")
ti2PCClient.commitPrimaryKey(commitPrimaryBackoff, primaryKey.bytes, commitTs)
try {
ti2PCClient.close()
} catch {
case _: Throwable =>
}
logger.info("commitPrimaryKey success")
val commitTs = commitPrimaryKeyWithRetry(startTs, primaryKey, ti2PCClient)

// stop primary key ttl update
if (isTTLUpdate) {
Expand Down Expand Up @@ -394,6 +358,70 @@ class TiBatchWrite(
logger.info(s"batch write cost ${(endMS - startMS) / 1000} seconds")
}

private def commitPrimaryKeyWithRetry(
startTs: Long,
primaryKey: SerializableKey,
ti2PCClient: TwoPhaseCommitter): Long = {
val backoff = ConcreteBackOffer.newCustomBackOff(options.commitPrimaryKeyBackOfferMS)
while (true) {
try {
return commitPrimaryKey(startTs, primaryKey, ti2PCClient)
} catch {
case e: TiBatchWriteException =>
throw e
case e: Throwable =>
backoff.doBackOff(
BackOffFunction.BackOffFuncType.BoRegionMiss,
new TiBatchWriteException("commit primary key error", e))
}
}
0L
}

private def commitPrimaryKey(
startTs: Long,
primaryKey: SerializableKey,
ti2PCClient: TwoPhaseCommitter): Long = {
val commitTsAttempt = tiSession.getTimestamp.getVersion
// check commitTS
if (commitTsAttempt <= startTs) {
throw new TiBatchWriteException(
s"invalid transaction tso with startTs=$startTs, commitTsAttempt=$commitTsAttempt")
}

// for test
if (options.sleepAfterPrewriteSecondaryKey > 0) {
logger.info(s"sleep ${options.sleepAfterPrewriteSecondaryKey} ms for test")
Thread.sleep(options.sleepAfterPrewriteSecondaryKey)
}

// check schema change
if (!useTableLock) {
tiBatchWriteTables.foreach(_.checkSchemaChange())
}

// for test
if (options.sleepAfterGetCommitTS > 0) {
logger.info(s"sleep ${options.sleepAfterGetCommitTS} ms for test")
Thread.sleep(options.sleepAfterGetCommitTS)
}

val commitPrimaryBackoff = ConcreteBackOffer.newCustomBackOff(PRIMARY_KEY_COMMIT_BACKOFF)

// check connection lost if using lock table
checkConnectionLost()

logger.info(s"start to commitPrimaryKey, commitTsAttempt=$commitTsAttempt")
ti2PCClient.commitPrimaryKey(commitPrimaryBackoff, primaryKey.bytes, commitTsAttempt)
try {
ti2PCClient.close()
} catch {
case _: Throwable =>
}
logger.info("commitPrimaryKey success")
commitTsAttempt
}

private def getRegionSplitPoints(
rdd: RDD[(SerializableKey, Array[Byte])]): List[SerializableKey] = {
val count = rdd.count()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ class TiDBOptions(@transient val parameters: CaseInsensitiveMap[String]) extends
val retryCommitSecondaryKey: Boolean =
getOrDefault(TIDB_RETRY_COMMIT_SECONDARY_KEY, "true").toBoolean
val prewriteMaxRetryTimes: Int = getOrDefault(TIDB_PREWRITE_MAX_RETRY_TIMES, "64").toInt
val commitPrimaryKeyBackOfferMS: Int =
getOrDefault(TIDB_COMMIT_PRIMARY_KEY_BACKOFFER_MS, "120000").toInt

// region split
val enableRegionSplit: Boolean = getOrDefault(TIDB_ENABLE_REGION_SPLIT, "true").toBoolean
Expand Down Expand Up @@ -199,6 +201,7 @@ object TiDBOptions {
val TIDB_WRITE_THREAD_PER_TASK: String = newOption("writeThreadPerTask")
val TIDB_RETRY_COMMIT_SECONDARY_KEY: String = newOption("retryCommitSecondaryKey")
val TIDB_PREWRITE_MAX_RETRY_TIMES: String = newOption("prewriteMaxRetryTimes")
val TIDB_COMMIT_PRIMARY_KEY_BACKOFFER_MS: String = newOption("commitPrimaryKeyBackOfferMS")

// region split
val TIDB_ENABLE_REGION_SPLIT: String = newOption("enableRegionSplit")
Expand Down

0 comments on commit 6dd2b93

Please sign in to comment.