Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](merge-cloud) fix inconsistency between cloud mode and local mode for 2PC #33917

Merged
merged 1 commit into from
Apr 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cloud/src/common/bvars.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ BvarLatencyRecorderWithTag g_bvar_ms_get_delete_bitmap_update_lock("ms",
"get_delete_bitmap_update_lock");
BvarLatencyRecorderWithTag g_bvar_ms_get_instance("ms", "get_instance");
BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach("ms", "get_rl_task_commit_attach");
BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id("ms", "get_txn_id");

BvarLatencyRecorderWithTag g_bvar_ms_start_tablet_job("ms", "start_tablet_job");
BvarLatencyRecorderWithTag g_bvar_ms_finish_tablet_job("ms", "finish_tablet_job");
Expand Down
1 change: 1 addition & 0 deletions cloud/src/common/bvars.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ extern BvarLatencyRecorderWithTag g_bvar_ms_get_cluster_status;
extern BvarLatencyRecorderWithTag g_bvar_ms_set_cluster_status;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_instance;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id;

// txn_kv's bvars
extern bvar::LatencyRecorder g_bvar_txn_kv_get;
Expand Down
8 changes: 8 additions & 0 deletions cloud/src/meta-service/meta_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@ class MetaServiceImpl : public cloud::MetaService {
GetRLTaskCommitAttachResponse* response,
::google::protobuf::Closure* done) override;

void get_txn_id(::google::protobuf::RpcController* controller, const GetTxnIdRequest* request,
GetTxnIdResponse* response, ::google::protobuf::Closure* done) override;

// ATTN: If you add a new method, please also add the corresponding implementation in `MetaServiceProxy`.

std::pair<MetaServiceCode, std::string> get_instance_info(const std::string& instance_id,
Expand Down Expand Up @@ -585,6 +588,11 @@ class MetaServiceProxy final : public MetaService {
done);
}

void get_txn_id(::google::protobuf::RpcController* controller, const GetTxnIdRequest* request,
GetTxnIdResponse* response, ::google::protobuf::Closure* done) override {
call_impl(&cloud::MetaService::get_txn_id, controller, request, response, done);
}

private:
template <typename Request, typename Response>
using MetaServiceMethod = void (cloud::MetaService::*)(::google::protobuf::RpcController*,
Expand Down
122 changes: 122 additions & 0 deletions cloud/src/meta-service/meta_service_txn.cpp
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls add UT to cover all the newly added code: line/branch coverage 90+%

Original file line number Diff line number Diff line change
Expand Up @@ -1969,4 +1969,126 @@ void MetaServiceImpl::clean_txn_label(::google::protobuf::RpcController* control
code = MetaServiceCode::OK;
}

// get txn id by label
// 1. When the requested status is not empty, return the txnid
// corresponding to the status. There may be multiple
// requested status, just match one.
// 2. When the requested status is empty, return the latest txnid.
void MetaServiceImpl::get_txn_id(::google::protobuf::RpcController* controller,
liaoxin01 marked this conversation as resolved.
Show resolved Hide resolved
const GetTxnIdRequest* request, GetTxnIdResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(get_txn_id);
if (!request->has_db_id()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "missing db id";
LOG(WARNING) << msg;
return;
}

std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
ss << "cannot find instance_id with cloud_unique_id="
<< (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id);
msg = ss.str();
LOG(WARNING) << msg;
return;
}
RPC_RATE_LIMIT(get_txn_id)
const int64_t db_id = request->db_id();
std::string label = request->label();
const std::string label_key = txn_label_key({instance_id, db_id, label});
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
LOG(WARNING) << "failed to create txn. err=" << err << " db_id=" << db_id
<< " label=" << label;
code = cast_as<ErrCategory::CREATE>(err);
ss << "txn_kv_->create_txn() failed, err=" << err << " label=" << label
<< " db_id=" << db_id;
msg = ss.str();
return;
}

std::string label_val;
err = txn->get(label_key, &label_val);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
code = cast_as<ErrCategory::READ>(err);
ss << "txn->get failed(), err=" << err << " label=" << label;
msg = ss.str();
return;
}

if (label_val.size() <= VERSION_STAMP_LEN) {
code = MetaServiceCode::TXN_ID_NOT_FOUND;
ss << "transaction not found, label=" << label;
return;
}

TxnLabelPB label_pb;
//label_val.size() > VERSION_STAMP_LEN means label has previous txn ids.
if (!label_pb.ParseFromArray(label_val.data(), label_val.size() - VERSION_STAMP_LEN)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "label_pb->ParseFromString() failed, label=" << label;
msg = ss.str();
return;
}
if (label_pb.txn_ids_size() == 0) {
code = MetaServiceCode::TXN_ID_NOT_FOUND;
ss << "transaction not found, label=" << label;
msg = ss.str();
return;
liaoxin01 marked this conversation as resolved.
Show resolved Hide resolved
}

// find the latest txn
if (request->txn_status_size() == 0) {
response->set_txn_id(*label_pb.txn_ids().rbegin());
return;
}

for (auto& cur_txn_id : label_pb.txn_ids()) {
const std::string cur_info_key = txn_info_key({instance_id, db_id, cur_txn_id});
std::string cur_info_val;
err = txn->get(cur_info_key, &cur_info_val);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
code = cast_as<ErrCategory::READ>(err);
ss << "txn->get() failed, cur_txn_id=" << cur_txn_id << " label=" << label
<< " err=" << err;
msg = ss.str();
return;
}

if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
//label_to_idx and txn info inconsistency.
code = MetaServiceCode::TXN_ID_NOT_FOUND;
ss << "txn->get() failed, cur_txn_id=" << cur_txn_id << " label=" << label
<< " err=" << err;
msg = ss.str();
return;
}

TxnInfoPB cur_txn_info;
if (!cur_txn_info.ParseFromString(cur_info_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "cur_txn_info->ParseFromString() failed, cur_txn_id=" << cur_txn_id
<< " label=" << label << " err=" << err;
msg = ss.str();
return;
}

VLOG_DEBUG << "cur_txn_info=" << cur_txn_info.ShortDebugString();
for (auto txn_status : request->txn_status()) {
if (cur_txn_info.status() == txn_status) {
response->set_txn_id(cur_txn_id);
return;
}
}
}
code = MetaServiceCode::TXN_ID_NOT_FOUND;
ss << "transaction not found, label=" << label;
msg = ss.str();
return;
liaoxin01 marked this conversation as resolved.
Show resolved Hide resolved
}

} // namespace doris::cloud
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ public Cloud.GetTxnResponse getTxn(Cloud.GetTxnRequest request) {
return blockingStub.getTxn(request);
}

public Cloud.GetTxnIdResponse getTxnId(Cloud.GetTxnIdRequest request) {
return blockingStub.getTxnId(request);
}

public Cloud.GetCurrentMaxTxnResponse getCurrentMaxTxnId(Cloud.GetCurrentMaxTxnRequest request) {
return blockingStub.getCurrentMaxTxnId(request);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,16 @@ public Cloud.GetTxnResponse getTxn(Cloud.GetTxnRequest request)
}
}

public Cloud.GetTxnIdResponse getTxnId(Cloud.GetTxnIdRequest request)
throws RpcException {
try {
final MetaServiceClient client = getProxy();
return client.getTxnId(request);
} catch (Exception e) {
throw new RpcException("", e.getMessage(), e);
}
}

public Cloud.GetCurrentMaxTxnResponse getCurrentMaxTxnId(Cloud.GetCurrentMaxTxnRequest request)
throws RpcException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.apache.doris.cloud.proto.Cloud.GetCurrentMaxTxnResponse;
import org.apache.doris.cloud.proto.Cloud.GetDeleteBitmapUpdateLockRequest;
import org.apache.doris.cloud.proto.Cloud.GetDeleteBitmapUpdateLockResponse;
import org.apache.doris.cloud.proto.Cloud.GetTxnIdRequest;
import org.apache.doris.cloud.proto.Cloud.GetTxnIdResponse;
import org.apache.doris.cloud.proto.Cloud.GetTxnRequest;
import org.apache.doris.cloud.proto.Cloud.GetTxnResponse;
import org.apache.doris.cloud.proto.Cloud.LoadJobSourceTypePB;
Expand All @@ -51,6 +53,7 @@
import org.apache.doris.cloud.proto.Cloud.PrecommitTxnResponse;
import org.apache.doris.cloud.proto.Cloud.TableStatsPB;
import org.apache.doris.cloud.proto.Cloud.TxnInfoPB;
import org.apache.doris.cloud.proto.Cloud.TxnStatusPB;
import org.apache.doris.cloud.proto.Cloud.UniqueIdPB;
import org.apache.doris.cloud.rpc.MetaServiceProxy;
import org.apache.doris.common.AnalysisException;
Expand Down Expand Up @@ -90,6 +93,7 @@
import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TransactionCommitFailedException;
import org.apache.doris.transaction.TransactionIdGenerator;
import org.apache.doris.transaction.TransactionNotFoundException;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
Expand Down Expand Up @@ -248,7 +252,7 @@ public long beginTransaction(long dbId, List<Long> tableIdList, String label, TU
throw new DuplicatedRequestException(DebugUtil.printId(requestId),
beginTxnResponse.getDupTxnId(), beginTxnResponse.getStatus().getMsg());
case TXN_LABEL_ALREADY_USED:
throw new LabelAlreadyUsedException(label);
throw new LabelAlreadyUsedException(beginTxnResponse.getStatus().getMsg(), false);
default:
if (MetricRepo.isInit) {
MetricRepo.COUNTER_TXN_REJECT.increase(1L);
Expand Down Expand Up @@ -483,6 +487,9 @@ private void commitTransaction(long dbId, List<Table> tableList, long transactio
internalMsgBuilder.append(commitTxnResponse.getStatus().getCode());
throw new UserException("internal error, " + internalMsgBuilder.toString());
}
if (is2PC && commitTxnResponse.getStatus().getCode() == MetaServiceCode.TXN_ALREADY_VISIBLE) {
throw new UserException(commitTxnResponse.getStatus().getMsg());
}

TransactionState txnState = TxnUtil.transactionStateFromPb(commitTxnResponse.getTxnInfo());
TxnStateChangeCallback cb = callbackFactory.getCallback(txnState.getCallbackId());
Expand Down Expand Up @@ -1103,7 +1110,39 @@ public TransactionState getTransactionState(long dbId, long transactionId) {
@Override
public Long getTransactionIdByLabel(Long dbId, String label, List<TransactionStatus> statusList)
throws UserException {
throw new UserException(NOT_SUPPORTED_MSG);
LOG.info("try to get transaction id by label, dbId:{}, label:{}", dbId, label);
GetTxnIdRequest.Builder builder = GetTxnIdRequest.newBuilder();
builder.setDbId(dbId);
builder.setLabel(label);
builder.setCloudUniqueId(Config.cloud_unique_id);
for (TransactionStatus status : statusList) {
if (status == TransactionStatus.PREPARE) {
builder.addTxnStatus(TxnStatusPB.TXN_STATUS_PREPARED);
} else if (status == TransactionStatus.PRECOMMITTED) {
builder.addTxnStatus(TxnStatusPB.TXN_STATUS_PRECOMMITTED);
} else if (status == TransactionStatus.COMMITTED) {
builder.addTxnStatus(TxnStatusPB.TXN_STATUS_COMMITTED);
}
}

final GetTxnIdRequest getTxnIdRequest = builder.build();
GetTxnIdResponse getTxnIdResponse = null;
try {
LOG.info("getTxnRequest:{}", getTxnIdRequest);
getTxnIdResponse = MetaServiceProxy
.getInstance().getTxnId(getTxnIdRequest);
LOG.info("getTxnIdReponse: {}", getTxnIdResponse);
} catch (RpcException e) {
LOG.info("getTransactionId exception: {}", e.getMessage());
throw new TransactionNotFoundException("transaction not found, label=" + label);
}

if (getTxnIdResponse.getStatus().getCode() != MetaServiceCode.OK) {
LOG.info("getTransactionState exception: {}, {}", getTxnIdResponse.getStatus().getCode(),
getTxnIdResponse.getStatus().getMsg());
throw new TransactionNotFoundException("transaction not found, label=" + label);
}
return getTxnIdResponse.getTxnId();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ public class LabelAlreadyUsedException extends DdlException {
private String jobStatus;

public LabelAlreadyUsedException(String label) {
super("Label [" + label + "] has already been used.");
this(label, true);
}

public LabelAlreadyUsedException(String msg, boolean isLabel) {
super(isLabel ? "Label [" + msg + "] has already been used." : msg);
}

public LabelAlreadyUsedException(TransactionState txn) {
Expand Down
13 changes: 13 additions & 0 deletions gensrc/proto/cloud.proto
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,18 @@ message GetTxnResponse {
optional TxnInfoPB txn_info = 2;
}

message GetTxnIdRequest {
optional string cloud_unique_id = 1; // For auth
optional int64 db_id = 2;
optional string label = 3;
repeated TxnStatusPB txn_status = 4;
}

message GetTxnIdResponse {
optional MetaServiceResponseStatus status = 1;
optional int64 txn_id = 2;
liaoxin01 marked this conversation as resolved.
Show resolved Hide resolved
}

message GetCurrentMaxTxnRequest {
optional string cloud_unique_id = 1; // For auth
}
Expand Down Expand Up @@ -1345,6 +1357,7 @@ service MetaService {
rpc get_current_max_txn_id(GetCurrentMaxTxnRequest) returns (GetCurrentMaxTxnResponse);
rpc check_txn_conflict(CheckTxnConflictRequest) returns (CheckTxnConflictResponse);
rpc clean_txn_label(CleanTxnLabelRequest) returns (CleanTxnLabelResponse);
rpc get_txn_id(GetTxnIdRequest) returns (GetTxnIdResponse);

rpc get_version(GetVersionRequest) returns (GetVersionResponse);
rpc create_tablets(CreateTabletsRequest) returns (CreateTabletsResponse);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql_2pc_commit --
1 -50 1 2 1 \N
2 -50 1 44 1 \N

Loading
Loading