diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp index ab0b5934b50998..1aa436bb603d87 100644 --- a/cloud/src/common/bvars.cpp +++ b/cloud/src/common/bvars.cpp @@ -70,6 +70,7 @@ BvarLatencyRecorderWithTag g_bvar_ms_get_delete_bitmap_update_lock("ms", "get_delete_bitmap_update_lock"); BvarLatencyRecorderWithTag g_bvar_ms_get_instance("ms", "get_instance"); BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach("ms", "get_rl_task_commit_attach"); +BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id("ms", "get_txn_id"); BvarLatencyRecorderWithTag g_bvar_ms_start_tablet_job("ms", "start_tablet_job"); BvarLatencyRecorderWithTag g_bvar_ms_finish_tablet_job("ms", "finish_tablet_job"); diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h index dbdbfa834e9812..b55e1051cd94f3 100644 --- a/cloud/src/common/bvars.h +++ b/cloud/src/common/bvars.h @@ -171,6 +171,7 @@ extern BvarLatencyRecorderWithTag g_bvar_ms_get_cluster_status; extern BvarLatencyRecorderWithTag g_bvar_ms_set_cluster_status; extern BvarLatencyRecorderWithTag g_bvar_ms_get_instance; extern BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach; +extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id; // txn_kv's bvars extern bvar::LatencyRecorder g_bvar_txn_kv_get; diff --git a/cloud/src/meta-service/meta_service.h b/cloud/src/meta-service/meta_service.h index 4b7ae82979537d..4dc4113f341f45 100644 --- a/cloud/src/meta-service/meta_service.h +++ b/cloud/src/meta-service/meta_service.h @@ -254,6 +254,9 @@ class MetaServiceImpl : public cloud::MetaService { GetRLTaskCommitAttachResponse* response, ::google::protobuf::Closure* done) override; + void get_txn_id(::google::protobuf::RpcController* controller, const GetTxnIdRequest* request, + GetTxnIdResponse* response, ::google::protobuf::Closure* done) override; + // ATTN: If you add a new method, please also add the corresponding implementation in `MetaServiceProxy`. std::pair get_instance_info(const std::string& instance_id, @@ -585,6 +588,11 @@ class MetaServiceProxy final : public MetaService { done); } + void get_txn_id(::google::protobuf::RpcController* controller, const GetTxnIdRequest* request, + GetTxnIdResponse* response, ::google::protobuf::Closure* done) override { + call_impl(&cloud::MetaService::get_txn_id, controller, request, response, done); + } + private: template using MetaServiceMethod = void (cloud::MetaService::*)(::google::protobuf::RpcController*, diff --git a/cloud/src/meta-service/meta_service_txn.cpp b/cloud/src/meta-service/meta_service_txn.cpp index 139206ede2088b..0afdc31d10a095 100644 --- a/cloud/src/meta-service/meta_service_txn.cpp +++ b/cloud/src/meta-service/meta_service_txn.cpp @@ -1969,4 +1969,126 @@ void MetaServiceImpl::clean_txn_label(::google::protobuf::RpcController* control code = MetaServiceCode::OK; } +// get txn id by label +// 1. When the requested status is not empty, return the txnid +// corresponding to the status. There may be multiple +// requested status, just match one. +// 2. When the requested status is empty, return the latest txnid. +void MetaServiceImpl::get_txn_id(::google::protobuf::RpcController* controller, + const GetTxnIdRequest* request, GetTxnIdResponse* response, + ::google::protobuf::Closure* done) { + RPC_PREPROCESS(get_txn_id); + if (!request->has_db_id()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "missing db id"; + LOG(WARNING) << msg; + return; + } + + std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : ""; + instance_id = get_instance_id(resource_mgr_, cloud_unique_id); + if (instance_id.empty()) { + code = MetaServiceCode::INVALID_ARGUMENT; + ss << "cannot find instance_id with cloud_unique_id=" + << (cloud_unique_id.empty() ? "(empty)" : cloud_unique_id); + msg = ss.str(); + LOG(WARNING) << msg; + return; + } + RPC_RATE_LIMIT(get_txn_id) + const int64_t db_id = request->db_id(); + std::string label = request->label(); + const std::string label_key = txn_label_key({instance_id, db_id, label}); + std::unique_ptr txn; + TxnErrorCode err = txn_kv_->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + LOG(WARNING) << "failed to create txn. err=" << err << " db_id=" << db_id + << " label=" << label; + code = cast_as(err); + ss << "txn_kv_->create_txn() failed, err=" << err << " label=" << label + << " db_id=" << db_id; + msg = ss.str(); + return; + } + + std::string label_val; + err = txn->get(label_key, &label_val); + if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { + code = cast_as(err); + ss << "txn->get failed(), err=" << err << " label=" << label; + msg = ss.str(); + return; + } + + if (label_val.size() <= VERSION_STAMP_LEN) { + code = MetaServiceCode::TXN_ID_NOT_FOUND; + ss << "transaction not found, label=" << label; + return; + } + + TxnLabelPB label_pb; + //label_val.size() > VERSION_STAMP_LEN means label has previous txn ids. + if (!label_pb.ParseFromArray(label_val.data(), label_val.size() - VERSION_STAMP_LEN)) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + ss << "label_pb->ParseFromString() failed, label=" << label; + msg = ss.str(); + return; + } + if (label_pb.txn_ids_size() == 0) { + code = MetaServiceCode::TXN_ID_NOT_FOUND; + ss << "transaction not found, label=" << label; + msg = ss.str(); + return; + } + + // find the latest txn + if (request->txn_status_size() == 0) { + response->set_txn_id(*label_pb.txn_ids().rbegin()); + return; + } + + for (auto& cur_txn_id : label_pb.txn_ids()) { + const std::string cur_info_key = txn_info_key({instance_id, db_id, cur_txn_id}); + std::string cur_info_val; + err = txn->get(cur_info_key, &cur_info_val); + if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { + code = cast_as(err); + ss << "txn->get() failed, cur_txn_id=" << cur_txn_id << " label=" << label + << " err=" << err; + msg = ss.str(); + return; + } + + if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { + //label_to_idx and txn info inconsistency. + code = MetaServiceCode::TXN_ID_NOT_FOUND; + ss << "txn->get() failed, cur_txn_id=" << cur_txn_id << " label=" << label + << " err=" << err; + msg = ss.str(); + return; + } + + TxnInfoPB cur_txn_info; + if (!cur_txn_info.ParseFromString(cur_info_val)) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + ss << "cur_txn_info->ParseFromString() failed, cur_txn_id=" << cur_txn_id + << " label=" << label << " err=" << err; + msg = ss.str(); + return; + } + + VLOG_DEBUG << "cur_txn_info=" << cur_txn_info.ShortDebugString(); + for (auto txn_status : request->txn_status()) { + if (cur_txn_info.status() == txn_status) { + response->set_txn_id(cur_txn_id); + return; + } + } + } + code = MetaServiceCode::TXN_ID_NOT_FOUND; + ss << "transaction not found, label=" << label; + msg = ss.str(); + return; +} + } // namespace doris::cloud diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java index 847c90a73fd86d..7463a684680646 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java @@ -192,6 +192,10 @@ public Cloud.GetTxnResponse getTxn(Cloud.GetTxnRequest request) { return blockingStub.getTxn(request); } + public Cloud.GetTxnIdResponse getTxnId(Cloud.GetTxnIdRequest request) { + return blockingStub.getTxnId(request); + } + public Cloud.GetCurrentMaxTxnResponse getCurrentMaxTxnId(Cloud.GetCurrentMaxTxnRequest request) { return blockingStub.getCurrentMaxTxnId(request); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java index 052abbcf4ef082..117cfd71bd0ab6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java @@ -226,6 +226,16 @@ public Cloud.GetTxnResponse getTxn(Cloud.GetTxnRequest request) } } + public Cloud.GetTxnIdResponse getTxnId(Cloud.GetTxnIdRequest request) + throws RpcException { + try { + final MetaServiceClient client = getProxy(); + return client.getTxnId(request); + } catch (Exception e) { + throw new RpcException("", e.getMessage(), e); + } + } + public Cloud.GetCurrentMaxTxnResponse getCurrentMaxTxnId(Cloud.GetCurrentMaxTxnRequest request) throws RpcException { try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index d325f1dac2ba88..35b3cd285298d3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -43,6 +43,8 @@ import org.apache.doris.cloud.proto.Cloud.GetCurrentMaxTxnResponse; import org.apache.doris.cloud.proto.Cloud.GetDeleteBitmapUpdateLockRequest; import org.apache.doris.cloud.proto.Cloud.GetDeleteBitmapUpdateLockResponse; +import org.apache.doris.cloud.proto.Cloud.GetTxnIdRequest; +import org.apache.doris.cloud.proto.Cloud.GetTxnIdResponse; import org.apache.doris.cloud.proto.Cloud.GetTxnRequest; import org.apache.doris.cloud.proto.Cloud.GetTxnResponse; import org.apache.doris.cloud.proto.Cloud.LoadJobSourceTypePB; @@ -51,6 +53,7 @@ import org.apache.doris.cloud.proto.Cloud.PrecommitTxnResponse; import org.apache.doris.cloud.proto.Cloud.TableStatsPB; import org.apache.doris.cloud.proto.Cloud.TxnInfoPB; +import org.apache.doris.cloud.proto.Cloud.TxnStatusPB; import org.apache.doris.cloud.proto.Cloud.UniqueIdPB; import org.apache.doris.cloud.rpc.MetaServiceProxy; import org.apache.doris.common.AnalysisException; @@ -90,6 +93,7 @@ import org.apache.doris.transaction.TabletCommitInfo; import org.apache.doris.transaction.TransactionCommitFailedException; import org.apache.doris.transaction.TransactionIdGenerator; +import org.apache.doris.transaction.TransactionNotFoundException; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionState.LoadJobSourceType; import org.apache.doris.transaction.TransactionState.TxnCoordinator; @@ -248,7 +252,7 @@ public long beginTransaction(long dbId, List tableIdList, String label, TU throw new DuplicatedRequestException(DebugUtil.printId(requestId), beginTxnResponse.getDupTxnId(), beginTxnResponse.getStatus().getMsg()); case TXN_LABEL_ALREADY_USED: - throw new LabelAlreadyUsedException(label); + throw new LabelAlreadyUsedException(beginTxnResponse.getStatus().getMsg(), false); default: if (MetricRepo.isInit) { MetricRepo.COUNTER_TXN_REJECT.increase(1L); @@ -483,6 +487,9 @@ private void commitTransaction(long dbId, List tableList, long transactio internalMsgBuilder.append(commitTxnResponse.getStatus().getCode()); throw new UserException("internal error, " + internalMsgBuilder.toString()); } + if (is2PC && commitTxnResponse.getStatus().getCode() == MetaServiceCode.TXN_ALREADY_VISIBLE) { + throw new UserException(commitTxnResponse.getStatus().getMsg()); + } TransactionState txnState = TxnUtil.transactionStateFromPb(commitTxnResponse.getTxnInfo()); TxnStateChangeCallback cb = callbackFactory.getCallback(txnState.getCallbackId()); @@ -1103,7 +1110,39 @@ public TransactionState getTransactionState(long dbId, long transactionId) { @Override public Long getTransactionIdByLabel(Long dbId, String label, List statusList) throws UserException { - throw new UserException(NOT_SUPPORTED_MSG); + LOG.info("try to get transaction id by label, dbId:{}, label:{}", dbId, label); + GetTxnIdRequest.Builder builder = GetTxnIdRequest.newBuilder(); + builder.setDbId(dbId); + builder.setLabel(label); + builder.setCloudUniqueId(Config.cloud_unique_id); + for (TransactionStatus status : statusList) { + if (status == TransactionStatus.PREPARE) { + builder.addTxnStatus(TxnStatusPB.TXN_STATUS_PREPARED); + } else if (status == TransactionStatus.PRECOMMITTED) { + builder.addTxnStatus(TxnStatusPB.TXN_STATUS_PRECOMMITTED); + } else if (status == TransactionStatus.COMMITTED) { + builder.addTxnStatus(TxnStatusPB.TXN_STATUS_COMMITTED); + } + } + + final GetTxnIdRequest getTxnIdRequest = builder.build(); + GetTxnIdResponse getTxnIdResponse = null; + try { + LOG.info("getTxnRequest:{}", getTxnIdRequest); + getTxnIdResponse = MetaServiceProxy + .getInstance().getTxnId(getTxnIdRequest); + LOG.info("getTxnIdReponse: {}", getTxnIdResponse); + } catch (RpcException e) { + LOG.info("getTransactionId exception: {}", e.getMessage()); + throw new TransactionNotFoundException("transaction not found, label=" + label); + } + + if (getTxnIdResponse.getStatus().getCode() != MetaServiceCode.OK) { + LOG.info("getTransactionState exception: {}, {}", getTxnIdResponse.getStatus().getCode(), + getTxnIdResponse.getStatus().getMsg()); + throw new TransactionNotFoundException("transaction not found, label=" + label); + } + return getTxnIdResponse.getTxnId(); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java b/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java index 7c3e7e31c72090..d739f2032f3eed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java @@ -30,7 +30,11 @@ public class LabelAlreadyUsedException extends DdlException { private String jobStatus; public LabelAlreadyUsedException(String label) { - super("Label [" + label + "] has already been used."); + this(label, true); + } + + public LabelAlreadyUsedException(String msg, boolean isLabel) { + super(isLabel ? "Label [" + msg + "] has already been used." : msg); } public LabelAlreadyUsedException(TransactionState txn) { diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index 2bcc3dabfcd4e0..9a5705bc7816ea 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -687,6 +687,18 @@ message GetTxnResponse { optional TxnInfoPB txn_info = 2; } +message GetTxnIdRequest { + optional string cloud_unique_id = 1; // For auth + optional int64 db_id = 2; + optional string label = 3; + repeated TxnStatusPB txn_status = 4; +} + +message GetTxnIdResponse { + optional MetaServiceResponseStatus status = 1; + optional int64 txn_id = 2; +} + message GetCurrentMaxTxnRequest { optional string cloud_unique_id = 1; // For auth } @@ -1344,6 +1356,7 @@ service MetaService { rpc get_current_max_txn_id(GetCurrentMaxTxnRequest) returns (GetCurrentMaxTxnResponse); rpc check_txn_conflict(CheckTxnConflictRequest) returns (CheckTxnConflictResponse); rpc clean_txn_label(CleanTxnLabelRequest) returns (CleanTxnLabelResponse); + rpc get_txn_id(GetTxnIdRequest) returns (GetTxnIdResponse); rpc get_version(GetVersionRequest) returns (GetVersionResponse); rpc create_tablets(CreateTabletsRequest) returns (CreateTabletsResponse); diff --git a/regression-test/data/load_p0/stream_load/test_stream_load_2pc.out b/regression-test/data/load_p0/stream_load/test_stream_load_2pc.out new file mode 100644 index 00000000000000..3e449544055381 --- /dev/null +++ b/regression-test/data/load_p0/stream_load/test_stream_load_2pc.out @@ -0,0 +1,5 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql_2pc_commit -- +1 -50 1 2 1 \N +2 -50 1 44 1 \N + diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load_2pc.groovy b/regression-test/suites/load_p0/stream_load/test_stream_load_2pc.groovy new file mode 100644 index 00000000000000..e05850f5e57d12 --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/test_stream_load_2pc.groovy @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.http.client.methods.RequestBuilder +import org.apache.http.impl.client.HttpClients +import org.apache.http.util.EntityUtils + +import java.text.SimpleDateFormat + +suite("test_stream_load_2pc", "p0") { + def tableName = "test_2pc_table" + InetSocketAddress address = context.config.feHttpInetSocketAddress + String user = context.config.feHttpUser + String password = context.config.feHttpPassword + String db = context.config.getDbNameByFile(context.file) + + def do_streamload_2pc_commit_by_label = { label -> + def command = "curl -X PUT --location-trusted -u ${context.config.feHttpUser}:${context.config.feHttpPassword}" + + " -H label:${label}" + + " -H txn_operation:commit" + + " http://${context.config.feHttpAddress}/api/${db}/${tableName}/_stream_load_2pc" + log.info("http_stream execute 2pc: ${command}") + + def process = command.execute() + code = process.waitFor() + out = process.text + log.info("http_stream 2pc result: ${out}".toString()) + def json2pc = parseJson(out) + return json2pc + } + + def do_streamload_2pc_commit_by_txn_id = { txnId -> + def command = "curl -X PUT --location-trusted -u ${context.config.feHttpUser}:${context.config.feHttpPassword}" + + " -H txn_id:${txnId}" + + " -H txn_operation:commit" + + " http://${context.config.feHttpAddress}/api/${db}/${tableName}/_stream_load_2pc" + log.info("http_stream execute 2pc: ${command}") + + def process = command.execute() + code = process.waitFor() + out = process.text + log.info("http_stream 2pc result: ${out}".toString()) + def json2pc = parseJson(out) + return json2pc + } + + try { + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` bigint(20) NULL DEFAULT "1", + `k2` bigint(20) NULL , + `v1` tinyint(4) NULL, + `v2` tinyint(4) NULL, + `v3` tinyint(4) NULL, + `v4` DATETIME NULL + ) ENGINE=OLAP + DISTRIBUTED BY HASH(`k1`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + + def label = UUID.randomUUID().toString().replaceAll("-", "") + def txnId; + streamLoad { + table "${tableName}" + + set 'label', "${label}" + set 'column_separator', '|' + set 'columns', 'k1, k2, v1, v2, v3' + set 'two_phase_commit', 'true' + + file 'test_two_phase_commit.csv' + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(2, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + txnId = json.TxnId + } + } + + streamLoad { + table "${tableName}" + + set 'label', "${label}" + set 'column_separator', '|' + set 'columns', 'k1, k2, v1, v2, v3' + set 'two_phase_commit', 'true' + + file 'test_two_phase_commit.csv' + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("label already exists", json.Status.toLowerCase()) + assertTrue(json.Message.contains("has already been used, relate to txn")) + } + } + + def json2pc = do_streamload_2pc_commit_by_label.call(label) + assertEquals("success", json2pc.status.toLowerCase()) + + def count = 0 + while (true) { + res = sql "select count(*) from ${tableName}" + if (res[0][0] > 0) { + break + } + if (count >= 60) { + log.error("stream load commit can not visible for long time") + assertEquals(2, res[0][0]) + break + } + sleep(1000) + count++ + } + + qt_sql_2pc_commit "select * from ${tableName} order by k1" + + json2pc = do_streamload_2pc_commit_by_txn_id.call(txnId) + assertTrue(json2pc.msg.contains("is already visible, not pre-committed")) + } finally { + sql """ DROP TABLE IF EXISTS ${tableName} FORCE""" + } + +} +