Skip to content

Commit

Permalink
[fix](merge-cloud) fix inconsistency between cloud mode and local mod…
Browse files Browse the repository at this point in the history
…e for 2PC (#33917)
  • Loading branch information
liaoxin01 authored and dataroaring committed Apr 21, 2024
1 parent def7640 commit 127b877
Show file tree
Hide file tree
Showing 11 changed files with 360 additions and 3 deletions.
1 change: 1 addition & 0 deletions cloud/src/common/bvars.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ BvarLatencyRecorderWithTag g_bvar_ms_get_delete_bitmap_update_lock("ms",
"get_delete_bitmap_update_lock");
BvarLatencyRecorderWithTag g_bvar_ms_get_instance("ms", "get_instance");
BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach("ms", "get_rl_task_commit_attach");
BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id("ms", "get_txn_id");

BvarLatencyRecorderWithTag g_bvar_ms_start_tablet_job("ms", "start_tablet_job");
BvarLatencyRecorderWithTag g_bvar_ms_finish_tablet_job("ms", "finish_tablet_job");
Expand Down
1 change: 1 addition & 0 deletions cloud/src/common/bvars.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ extern BvarLatencyRecorderWithTag g_bvar_ms_get_cluster_status;
extern BvarLatencyRecorderWithTag g_bvar_ms_set_cluster_status;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_instance;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id;

// txn_kv's bvars
extern bvar::LatencyRecorder g_bvar_txn_kv_get;
Expand Down
8 changes: 8 additions & 0 deletions cloud/src/meta-service/meta_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@ class MetaServiceImpl : public cloud::MetaService {
GetRLTaskCommitAttachResponse* response,
::google::protobuf::Closure* done) override;

void get_txn_id(::google::protobuf::RpcController* controller, const GetTxnIdRequest* request,
GetTxnIdResponse* response, ::google::protobuf::Closure* done) override;

// ATTN: If you add a new method, please also add the corresponding implementation in `MetaServiceProxy`.

std::pair<MetaServiceCode, std::string> get_instance_info(const std::string& instance_id,
Expand Down Expand Up @@ -585,6 +588,11 @@ class MetaServiceProxy final : public MetaService {
done);
}

void get_txn_id(::google::protobuf::RpcController* controller, const GetTxnIdRequest* request,
GetTxnIdResponse* response, ::google::protobuf::Closure* done) override {
call_impl(&cloud::MetaService::get_txn_id, controller, request, response, done);
}

private:
template <typename Request, typename Response>
using MetaServiceMethod = void (cloud::MetaService::*)(::google::protobuf::RpcController*,
Expand Down
122 changes: 122 additions & 0 deletions cloud/src/meta-service/meta_service_txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1969,4 +1969,126 @@ void MetaServiceImpl::clean_txn_label(::google::protobuf::RpcController* control
code = MetaServiceCode::OK;
}

// get txn id by label
// 1. When the requested status is not empty, return the txnid
// corresponding to the status. There may be multiple
// requested status, just match one.
// 2. When the requested status is empty, return the latest txnid.
void MetaServiceImpl::get_txn_id(::google::protobuf::RpcController* controller,
const GetTxnIdRequest* request, GetTxnIdResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(get_txn_id);
if (!request->has_db_id()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "missing db id";
LOG(WARNING) << msg;
return;
}

std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
ss << "cannot find instance_id with cloud_unique_id="
<< (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id);
msg = ss.str();
LOG(WARNING) << msg;
return;
}
RPC_RATE_LIMIT(get_txn_id)
const int64_t db_id = request->db_id();
std::string label = request->label();
const std::string label_key = txn_label_key({instance_id, db_id, label});
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
LOG(WARNING) << "failed to create txn. err=" << err << " db_id=" << db_id
<< " label=" << label;
code = cast_as<ErrCategory::CREATE>(err);
ss << "txn_kv_->create_txn() failed, err=" << err << " label=" << label
<< " db_id=" << db_id;
msg = ss.str();
return;
}

std::string label_val;
err = txn->get(label_key, &label_val);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
code = cast_as<ErrCategory::READ>(err);
ss << "txn->get failed(), err=" << err << " label=" << label;
msg = ss.str();
return;
}

if (label_val.size() <= VERSION_STAMP_LEN) {
code = MetaServiceCode::TXN_ID_NOT_FOUND;
ss << "transaction not found, label=" << label;
return;
}

TxnLabelPB label_pb;
//label_val.size() > VERSION_STAMP_LEN means label has previous txn ids.
if (!label_pb.ParseFromArray(label_val.data(), label_val.size() - VERSION_STAMP_LEN)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "label_pb->ParseFromString() failed, label=" << label;
msg = ss.str();
return;
}
if (label_pb.txn_ids_size() == 0) {
code = MetaServiceCode::TXN_ID_NOT_FOUND;
ss << "transaction not found, label=" << label;
msg = ss.str();
return;
}

// find the latest txn
if (request->txn_status_size() == 0) {
response->set_txn_id(*label_pb.txn_ids().rbegin());
return;
}

for (auto& cur_txn_id : label_pb.txn_ids()) {
const std::string cur_info_key = txn_info_key({instance_id, db_id, cur_txn_id});
std::string cur_info_val;
err = txn->get(cur_info_key, &cur_info_val);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
code = cast_as<ErrCategory::READ>(err);
ss << "txn->get() failed, cur_txn_id=" << cur_txn_id << " label=" << label
<< " err=" << err;
msg = ss.str();
return;
}

if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
//label_to_idx and txn info inconsistency.
code = MetaServiceCode::TXN_ID_NOT_FOUND;
ss << "txn->get() failed, cur_txn_id=" << cur_txn_id << " label=" << label
<< " err=" << err;
msg = ss.str();
return;
}

TxnInfoPB cur_txn_info;
if (!cur_txn_info.ParseFromString(cur_info_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
ss << "cur_txn_info->ParseFromString() failed, cur_txn_id=" << cur_txn_id
<< " label=" << label << " err=" << err;
msg = ss.str();
return;
}

VLOG_DEBUG << "cur_txn_info=" << cur_txn_info.ShortDebugString();
for (auto txn_status : request->txn_status()) {
if (cur_txn_info.status() == txn_status) {
response->set_txn_id(cur_txn_id);
return;
}
}
}
code = MetaServiceCode::TXN_ID_NOT_FOUND;
ss << "transaction not found, label=" << label;
msg = ss.str();
return;
}

} // namespace doris::cloud
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ public Cloud.GetTxnResponse getTxn(Cloud.GetTxnRequest request) {
return blockingStub.getTxn(request);
}

public Cloud.GetTxnIdResponse getTxnId(Cloud.GetTxnIdRequest request) {
return blockingStub.getTxnId(request);
}

public Cloud.GetCurrentMaxTxnResponse getCurrentMaxTxnId(Cloud.GetCurrentMaxTxnRequest request) {
return blockingStub.getCurrentMaxTxnId(request);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,16 @@ public Cloud.GetTxnResponse getTxn(Cloud.GetTxnRequest request)
}
}

public Cloud.GetTxnIdResponse getTxnId(Cloud.GetTxnIdRequest request)
throws RpcException {
try {
final MetaServiceClient client = getProxy();
return client.getTxnId(request);
} catch (Exception e) {
throw new RpcException("", e.getMessage(), e);
}
}

public Cloud.GetCurrentMaxTxnResponse getCurrentMaxTxnId(Cloud.GetCurrentMaxTxnRequest request)
throws RpcException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.apache.doris.cloud.proto.Cloud.GetCurrentMaxTxnResponse;
import org.apache.doris.cloud.proto.Cloud.GetDeleteBitmapUpdateLockRequest;
import org.apache.doris.cloud.proto.Cloud.GetDeleteBitmapUpdateLockResponse;
import org.apache.doris.cloud.proto.Cloud.GetTxnIdRequest;
import org.apache.doris.cloud.proto.Cloud.GetTxnIdResponse;
import org.apache.doris.cloud.proto.Cloud.GetTxnRequest;
import org.apache.doris.cloud.proto.Cloud.GetTxnResponse;
import org.apache.doris.cloud.proto.Cloud.LoadJobSourceTypePB;
Expand All @@ -51,6 +53,7 @@
import org.apache.doris.cloud.proto.Cloud.PrecommitTxnResponse;
import org.apache.doris.cloud.proto.Cloud.TableStatsPB;
import org.apache.doris.cloud.proto.Cloud.TxnInfoPB;
import org.apache.doris.cloud.proto.Cloud.TxnStatusPB;
import org.apache.doris.cloud.proto.Cloud.UniqueIdPB;
import org.apache.doris.cloud.rpc.MetaServiceProxy;
import org.apache.doris.common.AnalysisException;
Expand Down Expand Up @@ -90,6 +93,7 @@
import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TransactionCommitFailedException;
import org.apache.doris.transaction.TransactionIdGenerator;
import org.apache.doris.transaction.TransactionNotFoundException;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
Expand Down Expand Up @@ -248,7 +252,7 @@ public long beginTransaction(long dbId, List<Long> tableIdList, String label, TU
throw new DuplicatedRequestException(DebugUtil.printId(requestId),
beginTxnResponse.getDupTxnId(), beginTxnResponse.getStatus().getMsg());
case TXN_LABEL_ALREADY_USED:
throw new LabelAlreadyUsedException(label);
throw new LabelAlreadyUsedException(beginTxnResponse.getStatus().getMsg(), false);
default:
if (MetricRepo.isInit) {
MetricRepo.COUNTER_TXN_REJECT.increase(1L);
Expand Down Expand Up @@ -483,6 +487,9 @@ private void commitTransaction(long dbId, List<Table> tableList, long transactio
internalMsgBuilder.append(commitTxnResponse.getStatus().getCode());
throw new UserException("internal error, " + internalMsgBuilder.toString());
}
if (is2PC && commitTxnResponse.getStatus().getCode() == MetaServiceCode.TXN_ALREADY_VISIBLE) {
throw new UserException(commitTxnResponse.getStatus().getMsg());
}

TransactionState txnState = TxnUtil.transactionStateFromPb(commitTxnResponse.getTxnInfo());
TxnStateChangeCallback cb = callbackFactory.getCallback(txnState.getCallbackId());
Expand Down Expand Up @@ -1103,7 +1110,39 @@ public TransactionState getTransactionState(long dbId, long transactionId) {
@Override
public Long getTransactionIdByLabel(Long dbId, String label, List<TransactionStatus> statusList)
throws UserException {
throw new UserException(NOT_SUPPORTED_MSG);
LOG.info("try to get transaction id by label, dbId:{}, label:{}", dbId, label);
GetTxnIdRequest.Builder builder = GetTxnIdRequest.newBuilder();
builder.setDbId(dbId);
builder.setLabel(label);
builder.setCloudUniqueId(Config.cloud_unique_id);
for (TransactionStatus status : statusList) {
if (status == TransactionStatus.PREPARE) {
builder.addTxnStatus(TxnStatusPB.TXN_STATUS_PREPARED);
} else if (status == TransactionStatus.PRECOMMITTED) {
builder.addTxnStatus(TxnStatusPB.TXN_STATUS_PRECOMMITTED);
} else if (status == TransactionStatus.COMMITTED) {
builder.addTxnStatus(TxnStatusPB.TXN_STATUS_COMMITTED);
}
}

final GetTxnIdRequest getTxnIdRequest = builder.build();
GetTxnIdResponse getTxnIdResponse = null;
try {
LOG.info("getTxnRequest:{}", getTxnIdRequest);
getTxnIdResponse = MetaServiceProxy
.getInstance().getTxnId(getTxnIdRequest);
LOG.info("getTxnIdReponse: {}", getTxnIdResponse);
} catch (RpcException e) {
LOG.info("getTransactionId exception: {}", e.getMessage());
throw new TransactionNotFoundException("transaction not found, label=" + label);
}

if (getTxnIdResponse.getStatus().getCode() != MetaServiceCode.OK) {
LOG.info("getTransactionState exception: {}, {}", getTxnIdResponse.getStatus().getCode(),
getTxnIdResponse.getStatus().getMsg());
throw new TransactionNotFoundException("transaction not found, label=" + label);
}
return getTxnIdResponse.getTxnId();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ public class LabelAlreadyUsedException extends DdlException {
private String jobStatus;

public LabelAlreadyUsedException(String label) {
super("Label [" + label + "] has already been used.");
this(label, true);
}

public LabelAlreadyUsedException(String msg, boolean isLabel) {
super(isLabel ? "Label [" + msg + "] has already been used." : msg);
}

public LabelAlreadyUsedException(TransactionState txn) {
Expand Down
13 changes: 13 additions & 0 deletions gensrc/proto/cloud.proto
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,18 @@ message GetTxnResponse {
optional TxnInfoPB txn_info = 2;
}

message GetTxnIdRequest {
optional string cloud_unique_id = 1; // For auth
optional int64 db_id = 2;
optional string label = 3;
repeated TxnStatusPB txn_status = 4;
}

message GetTxnIdResponse {
optional MetaServiceResponseStatus status = 1;
optional int64 txn_id = 2;
}

message GetCurrentMaxTxnRequest {
optional string cloud_unique_id = 1; // For auth
}
Expand Down Expand Up @@ -1344,6 +1356,7 @@ service MetaService {
rpc get_current_max_txn_id(GetCurrentMaxTxnRequest) returns (GetCurrentMaxTxnResponse);
rpc check_txn_conflict(CheckTxnConflictRequest) returns (CheckTxnConflictResponse);
rpc clean_txn_label(CleanTxnLabelRequest) returns (CleanTxnLabelResponse);
rpc get_txn_id(GetTxnIdRequest) returns (GetTxnIdResponse);

rpc get_version(GetVersionRequest) returns (GetVersionResponse);
rpc create_tablets(CreateTabletsRequest) returns (CreateTabletsResponse);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql_2pc_commit --
1 -50 1 2 1 \N
2 -50 1 44 1 \N

Loading

0 comments on commit 127b877

Please sign in to comment.