Skip to content

Commit

Permalink
[fix](cloud) MS limit max aborted txn num for the same txn label (#40414
Browse files Browse the repository at this point in the history
)
  • Loading branch information
SWJTU-ZhangLei authored Sep 7, 2024
1 parent c45ac4f commit 390ef1d
Show file tree
Hide file tree
Showing 7 changed files with 291 additions and 19 deletions.
3 changes: 3 additions & 0 deletions cloud/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,4 +215,7 @@ CONF_Int32(txn_lazy_max_rowsets_per_batch, "1000");

// max TabletIndexPB num for batch get
CONF_Int32(max_tablet_index_num_per_batch, "1000");

// Max aborted txn num for the same label name
CONF_mInt64(max_num_aborted_txn, "100");
} // namespace doris::cloud::config
16 changes: 14 additions & 2 deletions cloud/src/meta-service/meta_service_txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ void MetaServiceImpl::begin_txn(::google::protobuf::RpcController* controller,
// 2. if there is a PREPARE transaction, check if this is a retry request.
// 3. if there is a non-aborted transaction, throw label already used exception.

for (auto& cur_txn_id : label_pb.txn_ids()) {
for (auto it = label_pb.txn_ids().rbegin(); it != label_pb.txn_ids().rend(); ++it) {
int64_t cur_txn_id = *it;
const std::string cur_info_key = txn_info_key({instance_id, db_id, cur_txn_id});
std::string cur_info_val;
err = txn->get(cur_info_key, &cur_info_val);
Expand Down Expand Up @@ -235,8 +236,19 @@ void MetaServiceImpl::begin_txn(::google::protobuf::RpcController* controller,
}

VLOG_DEBUG << "cur_txn_info=" << cur_txn_info.ShortDebugString();
LOG(INFO) << " size=" << label_pb.txn_ids().size()
<< " status=" << cur_txn_info.status() << " txn_id=" << txn_id
<< " label=" << label;
if (cur_txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) {
continue;
if (label_pb.txn_ids().size() >= config::max_num_aborted_txn) {
code = MetaServiceCode::INVALID_ARGUMENT;
ss << "too many aborted txn for label=" << label << " txn_id=" << txn_id
<< ", please check your data quality";
msg = ss.str();
LOG(WARNING) << msg << " label_pb=" << label_pb.ShortDebugString();
return;
}
break;
}

if (cur_txn_info.status() == TxnStatusPB::TXN_STATUS_PREPARED ||
Expand Down
67 changes: 67 additions & 0 deletions cloud/test/meta_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1326,6 +1326,73 @@ TEST(MetaServiceTest, BeginTxnTest) {
ASSERT_GT(res.txn_id(), txn_id);
}
}

{
// test reuse label exceed max_num_aborted_txn

std::string cloud_unique_id = "test_cloud_unique_id";
int64_t db_id = 124343989;
int64_t table_id = 12897811;
int64_t txn_id = -1;
std::string label = "test_max_num_aborted_txn_label";
for (int i = 0; i < config::max_num_aborted_txn; i++) {
{
brpc::Controller cntl;
BeginTxnRequest req;
req.set_cloud_unique_id(cloud_unique_id);
TxnInfoPB txn_info;
txn_info.set_db_id(db_id);
txn_info.set_label(label);
txn_info.add_table_ids(table_id);
txn_info.set_timeout_ms(36000);
UniqueIdPB unique_id_pb;
unique_id_pb.set_hi(100);
unique_id_pb.set_lo(10);
txn_info.mutable_request_id()->CopyFrom(unique_id_pb);
req.mutable_txn_info()->CopyFrom(txn_info);
BeginTxnResponse res;
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
txn_id = res.txn_id();
}
// abort txn
{
brpc::Controller cntl;
AbortTxnRequest req;
req.set_cloud_unique_id(cloud_unique_id);
ASSERT_GT(txn_id, 0);
req.set_txn_id(txn_id);
req.set_reason("test");
AbortTxnResponse res;
meta_service->abort_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
ASSERT_EQ(res.txn_info().status(), TxnStatusPB::TXN_STATUS_ABORTED);
}
}
{
brpc::Controller cntl;
BeginTxnRequest req;
req.set_cloud_unique_id(cloud_unique_id);
TxnInfoPB txn_info;
txn_info.set_db_id(db_id);
txn_info.set_label(label);
txn_info.add_table_ids(table_id);
UniqueIdPB unique_id_pb;
unique_id_pb.set_hi(100);
unique_id_pb.set_lo(10);
txn_info.mutable_request_id()->CopyFrom(unique_id_pb);
txn_info.set_timeout_ms(36000);
req.mutable_txn_info()->CopyFrom(txn_info);
BeginTxnResponse res;
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT);
ASSERT_TRUE(res.status().msg().find("too many aborted txn for label") !=
std::string::npos);
}
}
}

TEST(MetaServiceTest, PrecommitTest1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -851,25 +851,24 @@ private void cleanLabelInternal(long dbId, String label, boolean isReplay) {
}
} else {
List<LoadJob> jobs = labelToJob.get(label);
if (jobs == null) {
// no job for this label, just return
return;
}
Iterator<LoadJob> iter = jobs.iterator();
while (iter.hasNext()) {
LoadJob job = iter.next();
if (!job.isCompleted()) {
continue;
if (jobs != null) {
// stream load labelToJob is null
Iterator<LoadJob> iter = jobs.iterator();
while (iter.hasNext()) {
LoadJob job = iter.next();
if (!job.isCompleted()) {
continue;
}
if (job instanceof BulkLoadJob) {
((BulkLoadJob) job).recycleProgress();
}
iter.remove();
idToLoadJob.remove(job.getId());
++counter;
}
if (job instanceof BulkLoadJob) {
((BulkLoadJob) job).recycleProgress();
if (jobs.isEmpty()) {
labelToJob.remove(label);
}
iter.remove();
idToLoadJob.remove(job.getId());
++counter;
}
if (jobs.isEmpty()) {
labelToJob.remove(label);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
1, NULL, "xxx", 1
2, NULL, "yyy", 2
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
1, 1, "xxx", 1
2, 1, "yyy", 2
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import java.util.Random;

suite("test_stream_load_with_data_quality", "p0") {
if (!isCloudMode()) {
return;
}

def tableName = "test_stream_load_with_data_quality"
sql "DROP TABLE IF EXISTS ${tableName}"

sql """
CREATE TABLE ${tableName}
(
siteid INT DEFAULT '10',
citycode SMALLINT NOT NULL,
username VARCHAR(32) DEFAULT '',
pv BIGINT SUM DEFAULT '0'
)
AGGREGATE KEY(siteid, citycode, username)
DISTRIBUTED BY HASH(siteid) BUCKETS 1;
"""

String label = UUID.randomUUID().toString().replaceAll("-", "")

// meta-service max_num_aborted_txn is 100
for (int i = 0; i < 100; i++) {
streamLoad {
set 'label', "${label}"
set 'column_separator', ','
table "${tableName}"
time 10000
file 'test_stream_load_with_data_quality.csv'
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("fail", json.Status.toLowerCase())
assertTrue(json.Message.contains("too many filtered rows"))
assertEquals(2, json.NumberTotalRows)
assertEquals(2, json.NumberFilteredRows)
}
}
}

streamLoad {
set 'label', "${label}"
set 'column_separator', ','
table "${tableName}"
time 10000
file 'test_stream_load_with_data_quality.csv'
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("fail", json.Status.toLowerCase())
assertTrue(json.Message.contains("too many aborted txn"))
}
}

String dbName = "regression_test_load_p0_stream_load"
test {
sql "clean label ${label} from ${dbName};"
}

streamLoad {
set 'label', "${label}"
set 'column_separator', ','
table "${tableName}"
time 10000
file 'test_stream_load_with_data_quality2.csv'
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
}
}

test {
sql "clean label ${label} from ${dbName};"
}

// meta-service max_num_aborted_txn is 100
for (int i = 0; i < 99; i++) {
streamLoad {
set 'label', "${label}"
set 'column_separator', ','
table "${tableName}"
time 10000
file 'test_stream_load_with_data_quality.csv'
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("fail", json.Status.toLowerCase())
assertTrue(json.Message.contains("too many filtered rows"))
assertEquals(2, json.NumberTotalRows)
assertEquals(2, json.NumberFilteredRows)
}
}
}

streamLoad {
set 'label', "${label}"
set 'column_separator', ','
table "${tableName}"
time 10000
file 'test_stream_load_with_data_quality2.csv'
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
}
}

streamLoad {
set 'label', "${label}"
set 'column_separator', ','
table "${tableName}"
time 10000
file 'test_stream_load_with_data_quality2.csv'
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("label already exists", json.Status.toLowerCase())
assertTrue(json.Message.contains("has already been used"))
}
}

test {
sql "clean label ${label} from ${dbName};"
}

streamLoad {
set 'label', "${label}"
set 'column_separator', ','
table "${tableName}"
time 10000
file 'test_stream_load_with_data_quality2.csv'
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
}
}

test {
sql "clean label ${label} from ${dbName};"
}

sql "DROP TABLE IF EXISTS ${tableName}"
}

0 comments on commit 390ef1d

Please sign in to comment.