Skip to content

Commit

Permalink
add inject points and skip rowsets case
Browse files Browse the repository at this point in the history
  • Loading branch information
bobhan1 committed Jul 30, 2024
1 parent 63f051d commit 7a220ae
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 97 deletions.
6 changes: 6 additions & 0 deletions be/src/olap/txn_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,12 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id,
TTransactionId transaction_id, TTabletId tablet_id,
TabletUid tablet_uid, const Version& version,
TabletPublishStatistics* stats) {
DBUG_EXECUTE_IF("TxnManager::publish_txn.enable_spin_wait", {
auto block_tablet_id = dp->param<int64_t>("tablet_id");
if (tablet_id == block_tablet_id) {
DBUG_EXECUTE_IF("TxnManager::publish_txn.block_publish_txn", DBUG_BLOCK);
}
});
auto tablet = _engine.tablet_manager()->get_tablet(tablet_id);
if (tablet == nullptr) {
return Status::OK();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
1 1 1 1 1
2 2 2 2 2
3 3 3 3 3

-- !sql --
1 999 999 1 1
2 888 888 2 2
3 777 777 3 3

Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ suite("test_partial_update_skip_compaction", "nonConcurrent") {
}
logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}");

def get_rs_metas = {
def check_rs_metas = { expected_rs_meta_size, check_func ->
def metaUrl = sql_return_maparray("show tablets from ${table1};").get(0).MetaUrl
def command = "curl ${metaUrl}"
log.info("get_rs_metas execute command: ${command}")
Expand All @@ -78,115 +78,79 @@ suite("test_partial_update_skip_compaction", "nonConcurrent") {
out = process.text
def jsonMeta = parseJson(out)
assertEquals(code, 0)
return jsonMeta.rs_metas

assertEquals(jsonMeta.rs_metas.size(), expected_rs_meta_size)
for (def meta : jsonMeta.rs_metas) {
int startVersion = meta.start_version.toInteger()
int endVersion = meta.end_version.toInteger()
int numSegments = meta.num_segments.toInteger()
int numRows = meta.num_rows.toInteger()
String overlapPb = meta.segments_overlap_pb.toString()
logger.info("[${startVersion}-${endVersion}] ${overlapPb} ${meta.num_segments} ${numRows} ${meta.rowset_id_v2}")
check_func(startVersion, endVersion, numSegments, numRows, overlapPb)
}
}

def metas = get_rs_metas()
assertEquals(metas.size(), 4)
for (def meta : metas) {
int startVersion = meta.start_version.toInteger()
int endVersion = meta.end_version.toInteger()
int numSegments = meta.num_segments.toInteger()
check_rs_metas(4, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb ->
if (startVersion == 0) {
assertEquals(endVersion, 1)
assertEquals(numSegments, 0)
} else {
assertEquals(startVersion, endVersion)
assertEquals(numSegments, 1)
}
logger.info("[${startVersion}-${endVersion}] ${meta.num_segments} ${meta.rowset_id_v2}")
}
})

try {
GetDebugPoint().clearDebugPointsForAllBEs()

// trigger full compaction on tablet
logger.info("trigger compaction on another BE ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}")
def (code, out, err) = be_run_full_compaction(tabletBackend.Host, tabletBackend.HttpPort, tabletId)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
assertEquals("success", compactJson.status.toLowerCase())
// block the partial update before publish phase
GetDebugPoint().enableDebugPointForAllBEs("TxnManager::publish_txn.enable_spin_wait", [tablet_id: "${tabletId}"])
GetDebugPoint().enableDebugPointForAllBEs("TxnManager::publish_txn.block_publish_txn")

Thread.sleep(2000)
def t1 = Thread.start {
sql "set enable_unique_key_partial_update=true;"
sql "sync;"
sql "insert into ${table1}(k1,c1,c2) values(1,999,999),(2,888,888),(3,777,777);"
}

metas = get_rs_metas()
assertEquals(metas.size(), 1)
for (def meta : metas) {
logger.info("[${meta.start_version}-${meta.end_version}] ${meta.num_segments} ${meta.rowset_id_v2}")
// trigger full compaction on tablet
logger.info("trigger compaction on another BE ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}")
def (code, out, err) = be_run_full_compaction(tabletBackend.Host, tabletBackend.HttpPort, tabletId)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
assertEquals("success", compactJson.status.toLowerCase())

// wait for full compaction to complete
Thread.sleep(1500)

check_rs_metas(1, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb ->
assertEquals(startVersion, 0)
assertEquals(endVersion, 4)
assertEquals(overlapPb, "NONOVERLAPPING")
})

GetDebugPoint().disableDebugPointForAllBEs("TxnManager::publish_txn.block_publish_txn")
t1.join()

order_qt_sql "select * from ${table1};"

check_rs_metas(2, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb ->
if (startVersion == 5) {
assertEquals(endVersion, 5)
// checks that partial update skips the alignment process of rowsets produced by compaction and
// doesn't generate new segment in publish phase
assertEquals(numSegments, 1)
}
})

} catch(Exception e) {
logger.info(e.getMessage())
throw e
} finally {
GetDebugPoint().clearDebugPointsForAllBEs()
}

// def wait_for_publish = {txnId, waitSecond ->
// String st = "PREPARE"
// while (!st.equalsIgnoreCase("VISIBLE") && !st.equalsIgnoreCase("ABORTED") && waitSecond > 0) {
// Thread.sleep(1000)
// waitSecond -= 1
// def result = sql_return_maparray "show transaction from ${dbName} where id = ${txnId}"
// assertNotNull(result)
// st = result[0].TransactionStatus
// }
// log.info("Stream load with txn ${txnId} is ${st}")
// assertEquals(st, "VISIBLE")
// }

// String txnId1
// streamLoad {
// table "${table1}"
// set 'column_separator', ','
// set 'format', 'csv'
// set 'partial_columns', 'true'
// set 'columns', 'k1,c1,c2'
// set 'strict_mode', "false"
// set 'two_phase_commit', 'true'
// file 'data1.csv'
// time 10000 // limit inflight 10s
// check { result, exception, startTime, endTime ->
// if (exception != null) {
// throw exception
// }
// log.info("Stream load result: ${result}".toString())
// def json = parseJson(result)
// txnId1 = json.TxnId
// assertEquals("success", json.Status.toLowerCase())
// }
// }
// sql "sync;"
// order_qt_sql "select * from ${table1};"

// // another partial update that conflicts with the previous load and publishes successfully
// sql "set enable_unique_key_partial_update=true;"
// sql "sync;"
// sql "insert into ${table1}(k1,c3,c4) values(1, 99, 99),(2,88,88),(3,77,77);"
// sql "set enable_unique_key_partial_update=false;"
// sql "sync;"
// order_qt_sql "select * from ${table1};"

// // restart backend
// cluster.restartBackends()
// Thread.sleep(5000)

// // wait for be restart
// boolean ok = false
// int cnt = 0
// for (; cnt < 10; cnt++) {
// def be = sql_return_maparray("show backends").get(0)
// if (be.Alive.toBoolean()) {
// ok = true
// break;
// }
// logger.info("wait for BE restart...")
// Thread.sleep(1000)
// }
// if (!ok) {
// logger.info("BE failed to restart")
// assertTrue(false)
// }

// Thread.sleep(5000)

// do_streamload_2pc_commit(txnId1)
// wait_for_publish(txnId1, 10)


// sql "sync;"
order_qt_sql "select * from ${table1};"
sql "DROP TABLE IF EXISTS ${table1};"

}

0 comments on commit 7a220ae

Please sign in to comment.