Skip to content

Commit

Permalink
[bugfix](external)Memory leak problem for external table with insert …
Browse files Browse the repository at this point in the history
…operation (#40440)

## Proposed changes

Get the corresponding transaction through `txnId`, and then update the
file list returned from be according to the transaction.
  • Loading branch information
wuwenchi committed Oct 8, 2024
1 parent c21b9f5 commit a8b7884
Show file tree
Hide file tree
Showing 14 changed files with 163 additions and 148 deletions.
8 changes: 8 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.transaction.DbUsedDataQuotaInfoCollector;
import org.apache.doris.transaction.GlobalExternalTransactionInfoMgr;
import org.apache.doris.transaction.GlobalTransactionMgrIface;
import org.apache.doris.transaction.PublishVersionDaemon;

Expand Down Expand Up @@ -566,6 +567,8 @@ public class Env {

private final SplitSourceManager splitSourceManager;

private final GlobalExternalTransactionInfoMgr globalExternalTransactionInfoMgr;

private final List<String> forceSkipJournalIds = Arrays.asList(Config.force_skip_journal_ids);

// if a config is relative to a daemon thread. record the relation here. we will proactively change interval of it.
Expand Down Expand Up @@ -814,6 +817,7 @@ public Env(boolean isCheckpointCatalog) {
this.dnsCache = new DNSCache();
this.sqlCacheManager = new NereidsSqlCacheManager();
this.splitSourceManager = new SplitSourceManager();
this.globalExternalTransactionInfoMgr = new GlobalExternalTransactionInfoMgr();
}

public static void destroyCheckpoint() {
Expand Down Expand Up @@ -6535,6 +6539,10 @@ public SplitSourceManager getSplitSourceManager() {
return splitSourceManager;
}

public GlobalExternalTransactionInfoMgr getGlobalExternalTransactionInfoMgr() {
return globalExternalTransactionInfoMgr;
}

public StatisticsJobAppender getStatisticsJobAppender() {
return statisticsJobAppender;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@
* The derived class should implement the abstract method for certain type of target table
*/
public abstract class AbstractInsertExecutor {
protected static final long INVALID_TXN_ID = -1L;
private static final Logger LOG = LogManager.getLogger(AbstractInsertExecutor.class);

protected long jobId;
protected final ConnectContext ctx;
protected final Coordinator coordinator;
Expand All @@ -62,6 +64,7 @@ public abstract class AbstractInsertExecutor {
protected String errMsg = "";
protected Optional<InsertCommandContext> insertCtx;
protected final boolean emptyInsert;
protected long txnId = INVALID_TXN_ID;

/**
* Constructor
Expand Down Expand Up @@ -93,7 +96,9 @@ public String getLabelName() {
return labelName;
}

public abstract long getTxnId();
public long getTxnId() {
return txnId;
}

/**
* begin transaction if necessary
Expand All @@ -108,7 +113,7 @@ public String getLabelName() {
/**
* Do something before exec
*/
protected abstract void beforeExec();
protected abstract void beforeExec() throws UserException;

/**
* Do something after exec finished
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@
* Insert executor for base external table
*/
public abstract class BaseExternalTableInsertExecutor extends AbstractInsertExecutor {
protected static final long INVALID_TXN_ID = -1L;
private static final Logger LOG = LogManager.getLogger(BaseExternalTableInsertExecutor.class);
protected long txnId = INVALID_TXN_ID;
protected TransactionStatus txnStatus = TransactionStatus.ABORTED;
protected final TransactionManager transactionManager;
protected final String catalogName;
Expand All @@ -70,16 +68,6 @@ public BaseExternalTableInsertExecutor(ConnectContext ctx, ExternalTable table,
}
}

@Override
public long getTxnId() {
return txnId;
}

/**
* collect commit infos from BEs
*/
protected abstract void setCollectCommitInfoFunc();

/**
* At this time, FE has successfully collected all commit information from BEs.
* Before commit this txn, commit information need to be analyzed and processed.
Expand All @@ -94,7 +82,6 @@ public long getTxnId() {
@Override
public void beginTransaction() {
txnId = transactionManager.begin();
setCollectCommitInfoFunc();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,7 @@ public HiveInsertExecutor(ConnectContext ctx, HMSExternalTable table,
}

@Override
public void setCollectCommitInfoFunc() {
HMSTransaction transaction = (HMSTransaction) transactionManager.getTransaction(txnId);
coordinator.setHivePartitionUpdateFunc(transaction::updateHivePartitionUpdates);
}

@Override
protected void beforeExec() {
protected void beforeExec() throws UserException {
// check params
HMSTransaction transaction = (HMSTransaction) transactionManager.getTransaction(txnId);
Preconditions.checkArgument(insertCtx.isPresent(), "insert context must be present");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,7 @@ public IcebergInsertExecutor(ConnectContext ctx, IcebergExternalTable table,
}

@Override
public void setCollectCommitInfoFunc() {
IcebergTransaction transaction = (IcebergTransaction) transactionManager.getTransaction(txnId);
coordinator.setIcebergCommitDataFunc(transaction::updateIcebergCommitData);
}

@Override
protected void beforeExec() {
protected void beforeExec() throws UserException {
String dbName = ((IcebergExternalTable) table).getDbName();
String tbName = table.getName();
SimpleTableInfo tableInfo = new SimpleTableInfo(dbName, tbName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor
executor.setProfileType(ProfileType.LOAD);
// We exposed @StmtExecutor#cancel as a unified entry point for statement interruption,
// so we need to set this here
insertExecutor.getCoordinator().setTxnId(insertExecutor.getTxnId());
executor.setCoord(insertExecutor.getCoordinator());
return insertExecutor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,6 @@ protected void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink p
// do nothing
}

@Override
protected void setCollectCommitInfoFunc() {
// do nothing
}

@Override
protected void doBeforeCommit() throws UserException {
// do nothing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@
* Insert executor for olap table
*/
public class OlapInsertExecutor extends AbstractInsertExecutor {
protected static final long INVALID_TXN_ID = -1L;
private static final Logger LOG = LogManager.getLogger(OlapInsertExecutor.class);
protected long txnId = INVALID_TXN_ID;
protected TransactionStatus txnStatus = TransactionStatus.ABORTED;

/**
Expand All @@ -85,11 +83,6 @@ public OlapInsertExecutor(ConnectContext ctx, Table table,
super(ctx, table, labelName, planner, insertCtx, emptyInsert);
}

@Override
public long getTxnId() {
return txnId;
}

@Override
public void beginTransaction() {
if (isGroupCommitHttpStream()) {
Expand Down
32 changes: 12 additions & 20 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.ExternalScanNode;
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.hive.HMSTransaction;
import org.apache.doris.datasource.iceberg.IcebergTransaction;
import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mysql.MysqlCommand;
Expand Down Expand Up @@ -93,8 +95,6 @@
import org.apache.doris.thrift.TFileScanRange;
import org.apache.doris.thrift.TFileScanRangeParams;
import org.apache.doris.thrift.TFragmentInstanceReport;
import org.apache.doris.thrift.THivePartitionUpdate;
import org.apache.doris.thrift.TIcebergCommitData;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPaloScanRange;
import org.apache.doris.thrift.TPipelineFragmentParams;
Expand Down Expand Up @@ -250,12 +250,6 @@ public class Coordinator implements CoordInterface {
private final List<TTabletCommitInfo> commitInfos = Lists.newArrayList();
private final List<TErrorTabletInfo> errorTabletInfos = Lists.newArrayList();

// Collect all hivePartitionUpdates obtained from be
Consumer<List<THivePartitionUpdate>> hivePartitionUpdateFunc;

// Collect all icebergCommitData obtained from be
Consumer<List<TIcebergCommitData>> icebergCommitDataFunc;

// Input parameter
private long jobId = -1; // job which this task belongs to
private TUniqueId queryId;
Expand Down Expand Up @@ -486,6 +480,10 @@ public long getTxnId() {
return txnId;
}

public void setTxnId(long txnId) {
this.txnId = txnId;
}

public String getLabel() {
return label;
}
Expand Down Expand Up @@ -2378,14 +2376,6 @@ private void updateScanRangeNumByScanRange(TScanRangeParams param) {
// TODO: more ranges?
}

public void setHivePartitionUpdateFunc(Consumer<List<THivePartitionUpdate>> hivePartitionUpdateFunc) {
this.hivePartitionUpdateFunc = hivePartitionUpdateFunc;
}

public void setIcebergCommitDataFunc(Consumer<List<TIcebergCommitData>> icebergCommitDataFunc) {
this.icebergCommitDataFunc = icebergCommitDataFunc;
}

// update job progress from BE
public void updateFragmentExecStatus(TReportExecStatusParams params) {
PipelineExecContext ctx = pipelineExecContexts.get(Pair.of(params.getFragmentId(), params.getBackendId()));
Expand Down Expand Up @@ -2438,11 +2428,13 @@ public void updateFragmentExecStatus(TReportExecStatusParams params) {
if (params.isSetErrorTabletInfos()) {
updateErrorTabletInfos(params.getErrorTabletInfos());
}
if (params.isSetHivePartitionUpdates() && hivePartitionUpdateFunc != null) {
hivePartitionUpdateFunc.accept(params.getHivePartitionUpdates());
if (params.isSetHivePartitionUpdates()) {
((HMSTransaction) Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr().getTxnById(txnId))
.updateHivePartitionUpdates(params.getHivePartitionUpdates());
}
if (params.isSetIcebergCommitDatas() && icebergCommitDataFunc != null) {
icebergCommitDataFunc.accept(params.getIcebergCommitDatas());
if (params.isSetIcebergCommitDatas()) {
((IcebergTransaction) Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr().getTxnById(txnId))
.updateIcebergCommitData(params.getIcebergCommitDatas());
}

if (ctx.done) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.transaction;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.operations.ExternalMetadataOps;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public abstract class AbstractExternalTransactionManager<T extends Transaction> implements TransactionManager {
private static final Logger LOG = LogManager.getLogger(AbstractExternalTransactionManager.class);
private final Map<Long, T> transactions = new ConcurrentHashMap<>();
protected final ExternalMetadataOps ops;

public AbstractExternalTransactionManager(ExternalMetadataOps ops) {
this.ops = ops;
}

abstract T createTransaction();

@Override
public long begin() {
long id = Env.getCurrentEnv().getNextId();
T transaction = createTransaction();
transactions.put(id, transaction);
Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr().putTxnById(id, transaction);
return id;
}

@Override
public void commit(long id) throws UserException {
getTransactionWithException(id).commit();
transactions.remove(id);
Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr().removeTxnById(id);
}

@Override
public void rollback(long id) {
try {
getTransactionWithException(id).rollback();
} catch (TransactionNotFoundException e) {
LOG.warn(e.getMessage(), e);
} finally {
transactions.remove(id);
Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr().removeTxnById(id);
}
}

@Override
public Transaction getTransaction(long id) throws UserException {
return getTransactionWithException(id);
}

private Transaction getTransactionWithException(long id) throws TransactionNotFoundException {
Transaction txn = transactions.get(id);
if (txn == null) {
throw new TransactionNotFoundException("Can't find transaction for " + id);
}
return txn;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.transaction;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class GlobalExternalTransactionInfoMgr {
public Map<Long, Transaction> idToTxn = new ConcurrentHashMap<>();

public Transaction getTxnById(long txnId) {
if (idToTxn.containsKey(txnId)) {
return idToTxn.get(txnId);
}
throw new RuntimeException("Can't find txn for " + txnId);
}

public void putTxnById(long txnId, Transaction txn) {
if (idToTxn.containsKey(txnId)) {
throw new RuntimeException("Duplicate txnId for " + txnId);
}
idToTxn.put(txnId, txn);
}

public void removeTxnById(long txnId) {
idToTxn.remove(txnId);
}
}
Loading

0 comments on commit a8b7884

Please sign in to comment.