diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index eef902dea825da..5c74164ae33c36 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -48,7 +48,6 @@ import org.apache.doris.common.io.Text; import org.apache.doris.common.util.DbUtil; import org.apache.doris.common.util.TimeUtils; -import org.apache.doris.load.GroupCommitManager.SchemaChangeStatus; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.task.AgentBatchTask; import org.apache.doris.task.AgentTask; @@ -602,8 +601,8 @@ protected void runRunningJob() throws AlterCancelException { private void waitWalFinished() { // wait wal done here - Env.getCurrentEnv().getGroupCommitManager().setStatus(tableId, SchemaChangeStatus.BLOCK); - LOG.info("block table {}", tableId); + Env.getCurrentEnv().getGroupCommitManager().blockTable(tableId); + LOG.info("block group commit for table={} when schema change", tableId); List aliveBeIds = Env.getCurrentSystemInfo().getAllBackendIds(true); long expireTime = System.currentTimeMillis() + Config.check_wal_queue_timeout_threshold; while (true) { @@ -611,21 +610,21 @@ private void waitWalFinished() { boolean walFinished = Env.getCurrentEnv().getGroupCommitManager() .isPreviousWalFinished(tableId, aliveBeIds); if (walFinished) { - LOG.info("all wal is finished"); + LOG.info("all wal is finished for table={}", tableId); break; } else if (System.currentTimeMillis() > expireTime) { - LOG.warn("waitWalFinished time out"); + LOG.warn("waitWalFinished time out for table={}", tableId); break; } else { try { Thread.sleep(100); } catch (InterruptedException ie) { - LOG.info("schema change job sleep wait for wal InterruptedException: ", ie); + LOG.warn("failed to wait for wal for table={} when schema change", tableId, ie); } } } - Env.getCurrentEnv().getGroupCommitManager().setStatus(tableId, SchemaChangeStatus.NORMAL); - LOG.info("release table {}", tableId); + Env.getCurrentEnv().getGroupCommitManager().unblockTable(tableId); + LOG.info("unblock group commit for table={} when schema change", tableId); } private void onFinished(OlapTable tbl) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java index 935dcf3629330f..86551ba0735b0f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java @@ -83,7 +83,8 @@ private void runAlterJobV2() { } List backendTabletIds = invertedIndex.getTabletIdsByBackendId(beId); - if (Config.drop_backend_after_decommission && checkTablets(beId, backendTabletIds) && checkWal(backend)) { + boolean hasWal = checkWal(backend); + if (Config.drop_backend_after_decommission && checkTablets(beId, backendTabletIds) && hasWal) { try { systemInfoService.dropBackend(beId); LOG.info("no available tablet on decommission backend {}, drop it", beId); @@ -94,8 +95,9 @@ private void runAlterJobV2() { continue; } - LOG.info("backend {} lefts {} replicas to decommission: {}", beId, backendTabletIds.size(), - backendTabletIds.subList(0, Math.min(10, backendTabletIds.size()))); + LOG.info("backend {} lefts {} replicas to decommission: {}{}", beId, backendTabletIds.size(), + backendTabletIds.subList(0, Math.min(10, backendTabletIds.size())), + hasWal ? "; and has unfinished WALs" : ""); } } @@ -197,8 +199,7 @@ private boolean checkTablets(Long beId, List backendTabletIds) { } private boolean checkWal(Backend backend) { - return Env.getCurrentEnv().getGroupCommitManager() - .getAllWalQueueSize(backend) == 0; + return Env.getCurrentEnv().getGroupCommitManager().getAllWalQueueSize(backend) == 0; } private List checkDecommission(DecommissionBackendClause decommissionBackendClause) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index d69ca40cecad0d..96dbb2e0edfa93 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -1140,7 +1140,7 @@ public void analyzeGroupCommit(Analyzer analyzer) throws AnalysisException { return; } boolean partialUpdate = ConnectContext.get().getSessionVariable().isEnableUniqueKeyPartialUpdate(); - if (!partialUpdate && ConnectContext.get().getSessionVariable().isEnableInsertGroupCommit() + if (!isExplain() && !partialUpdate && ConnectContext.get().getSessionVariable().isEnableInsertGroupCommit() && ConnectContext.get().getSessionVariable().getSqlMode() != SqlModeHelper.MODE_NO_BACKSLASH_ESCAPES && targetTable instanceof OlapTable && ((OlapTable) targetTable).getTableProperty().getUseSchemaLightChange() diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/CheckWalSizeAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/CheckWalSizeAction.java index f7822580fb760b..fdc39e8badd0ff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/CheckWalSizeAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/CheckWalSizeAction.java @@ -84,8 +84,7 @@ public Object execute(HttpServletRequest request, HttpServletResponse response) List backends = getBackends(hostInfos); List backendsList = new ArrayList<>(); for (Backend backend : backends) { - long size = Env.getCurrentEnv().getGroupCommitManager() - .getAllWalQueueSize(backend); + long size = Env.getCurrentEnv().getGroupCommitManager().getAllWalQueueSize(backend); backendsList.add(backend.getHost() + ":" + backend.getHeartbeatPort() + ":" + size); } return ResponseEntityBuilder.ok(backendsList); diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index a78a7e9fa58f0c..6952bd37b5c91b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -88,12 +88,11 @@ public Object streamLoad(HttpServletRequest request, @PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) { boolean groupCommit = false; String groupCommitStr = request.getHeader("group_commit"); - if (groupCommitStr != null && groupCommitStr.equals("async_mode")) { + if (groupCommitStr != null && groupCommitStr.equalsIgnoreCase("async_mode")) { groupCommit = true; try { - String[] pair = new String[] {db, table}; - if (isGroupCommitBlock(pair)) { - String msg = "insert table " + pair[1] + " is blocked on schema change"; + if (isGroupCommitBlock(db, table)) { + String msg = "insert table " + table + " is blocked on schema change"; return new RestBaseResult(msg); } } catch (Exception e) { @@ -122,19 +121,17 @@ public Object streamLoad(HttpServletRequest request, } } - @RequestMapping(path = "/api/_http_stream", - method = RequestMethod.PUT) - public Object streamLoadWithSql(HttpServletRequest request, - HttpServletResponse response) { + @RequestMapping(path = "/api/_http_stream", method = RequestMethod.PUT) + public Object streamLoadWithSql(HttpServletRequest request, HttpServletResponse response) { String sql = request.getHeader("sql"); LOG.info("streaming load sql={}", sql); boolean groupCommit = false; String groupCommitStr = request.getHeader("group_commit"); - if (groupCommitStr != null && groupCommitStr.equals("async_mode")) { + if (groupCommitStr != null && groupCommitStr.equalsIgnoreCase("async_mode")) { groupCommit = true; try { String[] pair = parseDbAndTb(sql); - if (isGroupCommitBlock(pair)) { + if (isGroupCommitBlock(pair[0], pair[1])) { String msg = "insert table " + pair[1] + " is blocked on schema change"; return new RestBaseResult(msg); } @@ -164,11 +161,11 @@ public Object streamLoadWithSql(HttpServletRequest request, } } - private boolean isGroupCommitBlock(String[] pair) throws TException { - String fullDbName = getFullDbName(pair[0]); + private boolean isGroupCommitBlock(String db, String table) throws TException { + String fullDbName = getFullDbName(db); Database dbObj = Env.getCurrentInternalCatalog() .getDbOrException(fullDbName, s -> new TException("database is invalid for dbName: " + s)); - Table tblObj = dbObj.getTableOrException(pair[1], s -> new TException("table is invalid: " + s)); + Table tblObj = dbObj.getTableOrException(table, s -> new TException("table is invalid: " + s)); return Env.getCurrentEnv().getGroupCommitManager().isBlock(tblObj.getId()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java index 3b9719b2594d72..12410945e9f87d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java @@ -17,7 +17,6 @@ package org.apache.doris.load; - import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest; @@ -30,31 +29,27 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.Set; import java.util.concurrent.Future; public class GroupCommitManager { - public enum SchemaChangeStatus { - BLOCK, NORMAL - } - private static final Logger LOG = LogManager.getLogger(GroupCommitManager.class); - private final Map statusMap = new ConcurrentHashMap<>(); + private Set blockedTableIds = new HashSet<>(); public boolean isBlock(long tableId) { - if (statusMap.containsKey(tableId)) { - return statusMap.get(tableId) == SchemaChangeStatus.BLOCK; - } - return false; + return blockedTableIds.contains(tableId); + } + + public void blockTable(long tableId) { + blockedTableIds.add(tableId); } - public void setStatus(long tableId, SchemaChangeStatus status) { - LOG.debug("Setting status for tableId {}: {}", tableId, status); - statusMap.put(tableId, status); + public void unblockTable(long tableId) { + blockedTableIds.remove(tableId); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java index 8b9f6b18331971..b69ece3b9aed81 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java @@ -153,9 +153,6 @@ public PGroupCommitInsertResponse executeGroupCommitInsert(ConnectContext ctx, } } PGroupCommitInsertRequest request = PGroupCommitInsertRequest.newBuilder() - .setDbId(db.getId()) - .setTableId(table.getId()) - .setBaseSchemaVersion(table.getBaseSchemaVersion()) .setExecPlanFragmentRequest(InternalService.PExecPlanFragmentRequest.newBuilder() .setRequest(execPlanFragmentParamsBytes) .setCompact(false).setVersion(InternalService.PFragmentRequestVersion.VERSION_2).build()) diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 433144b304b975..cf45d0395229e7 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -766,11 +766,11 @@ message PGlobResponse { } message PGroupCommitInsertRequest { - optional int64 db_id = 1; - optional int64 table_id = 2; + optional int64 db_id = 1; // deprecated + optional int64 table_id = 2; // deprecated // Descriptors.TDescriptorTable // optional bytes desc_tbl = 3; - optional int64 base_schema_version = 4; + optional int64 base_schema_version = 4; // deprecated // TExecPlanFragmentParams -> TPlanFragment -> PlanNodes.TPlan // optional bytes plan_node = 5;