diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 3db787f821acfd..3421bea3876303 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -288,6 +288,7 @@ import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.transaction.DbUsedDataQuotaInfoCollector; +import org.apache.doris.transaction.GlobalExternalTransactionInfoMgr; import org.apache.doris.transaction.GlobalTransactionMgrIface; import org.apache.doris.transaction.PublishVersionDaemon; @@ -566,6 +567,8 @@ public class Env { private final SplitSourceManager splitSourceManager; + private final GlobalExternalTransactionInfoMgr globalExternalTransactionInfoMgr; + private final List forceSkipJournalIds = Arrays.asList(Config.force_skip_journal_ids); // if a config is relative to a daemon thread. record the relation here. we will proactively change interval of it. @@ -814,6 +817,7 @@ public Env(boolean isCheckpointCatalog) { this.dnsCache = new DNSCache(); this.sqlCacheManager = new NereidsSqlCacheManager(); this.splitSourceManager = new SplitSourceManager(); + this.globalExternalTransactionInfoMgr = new GlobalExternalTransactionInfoMgr(); } public static void destroyCheckpoint() { @@ -6535,6 +6539,10 @@ public SplitSourceManager getSplitSourceManager() { return splitSourceManager; } + public GlobalExternalTransactionInfoMgr getGlobalExternalTransactionInfoMgr() { + return globalExternalTransactionInfoMgr; + } + public StatisticsJobAppender getStatisticsJobAppender() { return statisticsJobAppender; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java index cafffab295eba1..de3fc5eb9530bb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java @@ -48,7 +48,9 @@ * The derived class should implement the abstract method for certain type of target table */ public abstract class AbstractInsertExecutor { + protected static final long INVALID_TXN_ID = -1L; private static final Logger LOG = LogManager.getLogger(AbstractInsertExecutor.class); + protected long jobId; protected final ConnectContext ctx; protected final Coordinator coordinator; @@ -62,6 +64,7 @@ public abstract class AbstractInsertExecutor { protected String errMsg = ""; protected Optional insertCtx; protected final boolean emptyInsert; + protected long txnId = INVALID_TXN_ID; /** * Constructor @@ -93,7 +96,9 @@ public String getLabelName() { return labelName; } - public abstract long getTxnId(); + public long getTxnId() { + return txnId; + } /** * begin transaction if necessary @@ -108,7 +113,7 @@ public String getLabelName() { /** * Do something before exec */ - protected abstract void beforeExec(); + protected abstract void beforeExec() throws UserException; /** * Do something after exec finished diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java index e456d171df5986..082f1bab7d66f8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java @@ -46,9 +46,7 @@ * Insert executor for base external table */ public abstract class BaseExternalTableInsertExecutor extends AbstractInsertExecutor { - protected static final long INVALID_TXN_ID = -1L; private static final Logger LOG = LogManager.getLogger(BaseExternalTableInsertExecutor.class); - protected long txnId = INVALID_TXN_ID; protected TransactionStatus txnStatus = TransactionStatus.ABORTED; protected final TransactionManager transactionManager; protected final String catalogName; @@ -70,16 +68,6 @@ public BaseExternalTableInsertExecutor(ConnectContext ctx, ExternalTable table, } } - @Override - public long getTxnId() { - return txnId; - } - - /** - * collect commit infos from BEs - */ - protected abstract void setCollectCommitInfoFunc(); - /** * At this time, FE has successfully collected all commit information from BEs. * Before commit this txn, commit information need to be analyzed and processed. @@ -94,7 +82,6 @@ public long getTxnId() { @Override public void beginTransaction() { txnId = transactionManager.begin(); - setCollectCommitInfoFunc(); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java index 10ff27add86708..99464ccfc01a90 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java @@ -49,13 +49,7 @@ public HiveInsertExecutor(ConnectContext ctx, HMSExternalTable table, } @Override - public void setCollectCommitInfoFunc() { - HMSTransaction transaction = (HMSTransaction) transactionManager.getTransaction(txnId); - coordinator.setHivePartitionUpdateFunc(transaction::updateHivePartitionUpdates); - } - - @Override - protected void beforeExec() { + protected void beforeExec() throws UserException { // check params HMSTransaction transaction = (HMSTransaction) transactionManager.getTransaction(txnId); Preconditions.checkArgument(insertCtx.isPresent(), "insert context must be present"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java index 86b1f1ef0b7e2d..fe8ff063571e1f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java @@ -47,13 +47,7 @@ public IcebergInsertExecutor(ConnectContext ctx, IcebergExternalTable table, } @Override - public void setCollectCommitInfoFunc() { - IcebergTransaction transaction = (IcebergTransaction) transactionManager.getTransaction(txnId); - coordinator.setIcebergCommitDataFunc(transaction::updateIcebergCommitData); - } - - @Override - protected void beforeExec() { + protected void beforeExec() throws UserException { String dbName = ((IcebergExternalTable) table).getDbName(); String tbName = table.getName(); SimpleTableInfo tableInfo = new SimpleTableInfo(dbName, tbName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index 38d0d8386307cf..74f75d2d7d5dd9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -237,6 +237,7 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor executor.setProfileType(ProfileType.LOAD); // We exposed @StmtExecutor#cancel as a unified entry point for statement interruption, // so we need to set this here + insertExecutor.getCoordinator().setTxnId(insertExecutor.getTxnId()); executor.setCoord(insertExecutor.getCoordinator()); return insertExecutor; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertExecutor.java index 928b17edf38933..fb41f71083a753 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertExecutor.java @@ -90,11 +90,6 @@ protected void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink p // do nothing } - @Override - protected void setCollectCommitInfoFunc() { - // do nothing - } - @Override protected void doBeforeCommit() throws UserException { // do nothing diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java index e38ee40bc9a7a5..658b154b017167 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java @@ -72,9 +72,7 @@ * Insert executor for olap table */ public class OlapInsertExecutor extends AbstractInsertExecutor { - protected static final long INVALID_TXN_ID = -1L; private static final Logger LOG = LogManager.getLogger(OlapInsertExecutor.class); - protected long txnId = INVALID_TXN_ID; protected TransactionStatus txnStatus = TransactionStatus.ABORTED; /** @@ -85,11 +83,6 @@ public OlapInsertExecutor(ConnectContext ctx, Table table, super(ctx, table, labelName, planner, insertCtx, emptyInsert); } - @Override - public long getTxnId() { - return txnId; - } - @Override public void beginTransaction() { if (isGroupCommitHttpStream()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 98eca5c0a8844e..fe3bd2bd686ff0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -36,6 +36,8 @@ import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.ExternalScanNode; import org.apache.doris.datasource.FileQueryScanNode; +import org.apache.doris.datasource.hive.HMSTransaction; +import org.apache.doris.datasource.iceberg.IcebergTransaction; import org.apache.doris.load.loadv2.LoadJob; import org.apache.doris.metric.MetricRepo; import org.apache.doris.mysql.MysqlCommand; @@ -93,8 +95,6 @@ import org.apache.doris.thrift.TFileScanRange; import org.apache.doris.thrift.TFileScanRangeParams; import org.apache.doris.thrift.TFragmentInstanceReport; -import org.apache.doris.thrift.THivePartitionUpdate; -import org.apache.doris.thrift.TIcebergCommitData; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPaloScanRange; import org.apache.doris.thrift.TPipelineFragmentParams; @@ -250,12 +250,6 @@ public class Coordinator implements CoordInterface { private final List commitInfos = Lists.newArrayList(); private final List errorTabletInfos = Lists.newArrayList(); - // Collect all hivePartitionUpdates obtained from be - Consumer> hivePartitionUpdateFunc; - - // Collect all icebergCommitData obtained from be - Consumer> icebergCommitDataFunc; - // Input parameter private long jobId = -1; // job which this task belongs to private TUniqueId queryId; @@ -486,6 +480,10 @@ public long getTxnId() { return txnId; } + public void setTxnId(long txnId) { + this.txnId = txnId; + } + public String getLabel() { return label; } @@ -2378,14 +2376,6 @@ private void updateScanRangeNumByScanRange(TScanRangeParams param) { // TODO: more ranges? } - public void setHivePartitionUpdateFunc(Consumer> hivePartitionUpdateFunc) { - this.hivePartitionUpdateFunc = hivePartitionUpdateFunc; - } - - public void setIcebergCommitDataFunc(Consumer> icebergCommitDataFunc) { - this.icebergCommitDataFunc = icebergCommitDataFunc; - } - // update job progress from BE public void updateFragmentExecStatus(TReportExecStatusParams params) { PipelineExecContext ctx = pipelineExecContexts.get(Pair.of(params.getFragmentId(), params.getBackendId())); @@ -2438,11 +2428,13 @@ public void updateFragmentExecStatus(TReportExecStatusParams params) { if (params.isSetErrorTabletInfos()) { updateErrorTabletInfos(params.getErrorTabletInfos()); } - if (params.isSetHivePartitionUpdates() && hivePartitionUpdateFunc != null) { - hivePartitionUpdateFunc.accept(params.getHivePartitionUpdates()); + if (params.isSetHivePartitionUpdates()) { + ((HMSTransaction) Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr().getTxnById(txnId)) + .updateHivePartitionUpdates(params.getHivePartitionUpdates()); } - if (params.isSetIcebergCommitDatas() && icebergCommitDataFunc != null) { - icebergCommitDataFunc.accept(params.getIcebergCommitDatas()); + if (params.isSetIcebergCommitDatas()) { + ((IcebergTransaction) Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr().getTxnById(txnId)) + .updateIcebergCommitData(params.getIcebergCommitDatas()); } if (ctx.done) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/AbstractExternalTransactionManager.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/AbstractExternalTransactionManager.java new file mode 100644 index 00000000000000..da80b8f77bd6f0 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/AbstractExternalTransactionManager.java @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.transaction; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.operations.ExternalMetadataOps; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public abstract class AbstractExternalTransactionManager implements TransactionManager { + private static final Logger LOG = LogManager.getLogger(AbstractExternalTransactionManager.class); + private final Map transactions = new ConcurrentHashMap<>(); + protected final ExternalMetadataOps ops; + + public AbstractExternalTransactionManager(ExternalMetadataOps ops) { + this.ops = ops; + } + + abstract T createTransaction(); + + @Override + public long begin() { + long id = Env.getCurrentEnv().getNextId(); + T transaction = createTransaction(); + transactions.put(id, transaction); + Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr().putTxnById(id, transaction); + return id; + } + + @Override + public void commit(long id) throws UserException { + getTransactionWithException(id).commit(); + transactions.remove(id); + Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr().removeTxnById(id); + } + + @Override + public void rollback(long id) { + try { + getTransactionWithException(id).rollback(); + } catch (TransactionNotFoundException e) { + LOG.warn(e.getMessage(), e); + } finally { + transactions.remove(id); + Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr().removeTxnById(id); + } + } + + @Override + public Transaction getTransaction(long id) throws UserException { + return getTransactionWithException(id); + } + + private Transaction getTransactionWithException(long id) throws TransactionNotFoundException { + Transaction txn = transactions.get(id); + if (txn == null) { + throw new TransactionNotFoundException("Can't find transaction for " + id); + } + return txn; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalExternalTransactionInfoMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalExternalTransactionInfoMgr.java new file mode 100644 index 00000000000000..e516c648dff9b0 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalExternalTransactionInfoMgr.java @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.transaction; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class GlobalExternalTransactionInfoMgr { + public Map idToTxn = new ConcurrentHashMap<>(); + + public Transaction getTxnById(long txnId) { + if (idToTxn.containsKey(txnId)) { + return idToTxn.get(txnId); + } + throw new RuntimeException("Can't find txn for " + txnId); + } + + public void putTxnById(long txnId, Transaction txn) { + if (idToTxn.containsKey(txnId)) { + throw new RuntimeException("Duplicate txnId for " + txnId); + } + idToTxn.put(txnId, txn); + } + + public void removeTxnById(long txnId) { + idToTxn.remove(txnId); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/HiveTransactionManager.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/HiveTransactionManager.java index c48210ad452ad9..65f0c2bd5e3cb3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/HiveTransactionManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/HiveTransactionManager.java @@ -17,65 +17,26 @@ package org.apache.doris.transaction; -import org.apache.doris.catalog.Env; -import org.apache.doris.common.UserException; import org.apache.doris.datasource.hive.HMSTransaction; import org.apache.doris.datasource.hive.HiveMetadataOps; import org.apache.doris.fs.FileSystemProvider; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; -public class HiveTransactionManager implements TransactionManager { - - private final Map transactions = new ConcurrentHashMap<>(); - private final HiveMetadataOps ops; +public class HiveTransactionManager extends AbstractExternalTransactionManager { private final FileSystemProvider fileSystemProvider; - private final Executor fileSystemExecutor; public HiveTransactionManager(HiveMetadataOps ops, FileSystemProvider fileSystemProvider, Executor fileSystemExecutor) { - this.ops = ops; + super(ops); this.fileSystemProvider = fileSystemProvider; this.fileSystemExecutor = fileSystemExecutor; } @Override - public long begin() { - long id = Env.getCurrentEnv().getNextId(); - HMSTransaction hiveTransaction = new HMSTransaction(ops, fileSystemProvider, fileSystemExecutor); - transactions.put(id, hiveTransaction); - return id; - } - - @Override - public void commit(long id) throws UserException { - getTransactionWithException(id).commit(); - transactions.remove(id); - } - - @Override - public void rollback(long id) { - try { - getTransactionWithException(id).rollback(); - } finally { - transactions.remove(id); - } - } - - @Override - public HMSTransaction getTransaction(long id) { - return getTransactionWithException(id); - } - - public HMSTransaction getTransactionWithException(long id) { - HMSTransaction hiveTransaction = transactions.get(id); - if (hiveTransaction == null) { - throw new RuntimeException("Can't find transaction for " + id); - } - return hiveTransaction; + HMSTransaction createTransaction() { + return new HMSTransaction((HiveMetadataOps) ops, fileSystemProvider, fileSystemExecutor); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java index f373c13368558f..8f4d25a19b3ac5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java @@ -18,56 +18,17 @@ package org.apache.doris.transaction; -import org.apache.doris.catalog.Env; -import org.apache.doris.common.UserException; import org.apache.doris.datasource.iceberg.IcebergMetadataOps; import org.apache.doris.datasource.iceberg.IcebergTransaction; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -public class IcebergTransactionManager implements TransactionManager { - - private final Map transactions = new ConcurrentHashMap<>(); - private final IcebergMetadataOps ops; +public class IcebergTransactionManager extends AbstractExternalTransactionManager { public IcebergTransactionManager(IcebergMetadataOps ops) { - this.ops = ops; + super(ops); } @Override - public long begin() { - long id = Env.getCurrentEnv().getNextId(); - IcebergTransaction icebergTransaction = new IcebergTransaction(ops); - transactions.put(id, icebergTransaction); - return id; - } - - @Override - public void commit(long id) throws UserException { - getTransactionWithException(id).commit(); - transactions.remove(id); - } - - @Override - public void rollback(long id) { - try { - getTransactionWithException(id).rollback(); - } finally { - transactions.remove(id); - } - } - - @Override - public IcebergTransaction getTransaction(long id) { - return getTransactionWithException(id); - } - - public IcebergTransaction getTransactionWithException(long id) { - IcebergTransaction icebergTransaction = transactions.get(id); - if (icebergTransaction == null) { - throw new RuntimeException("Can't find transaction for " + id); - } - return icebergTransaction; + IcebergTransaction createTransaction() { + return new IcebergTransaction((IcebergMetadataOps) ops); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManager.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManager.java index ca9cbb917ec277..fbff324ae914b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManager.java @@ -27,5 +27,5 @@ public interface TransactionManager { void rollback(long id); - Transaction getTransaction(long id); + Transaction getTransaction(long id) throws UserException; }