Skip to content

Commit

Permalink
[fix](group commit) Fix some group commit case (#30132)
Browse files Browse the repository at this point in the history
  • Loading branch information
mymeiyi authored Jan 21, 2024
1 parent 7823686 commit 4a86690
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.DbUtil;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.load.GroupCommitManager.SchemaChangeStatus;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTask;
Expand Down Expand Up @@ -602,30 +601,30 @@ protected void runRunningJob() throws AlterCancelException {

private void waitWalFinished() {
// wait wal done here
Env.getCurrentEnv().getGroupCommitManager().setStatus(tableId, SchemaChangeStatus.BLOCK);
LOG.info("block table {}", tableId);
Env.getCurrentEnv().getGroupCommitManager().blockTable(tableId);
LOG.info("block group commit for table={} when schema change", tableId);
List<Long> aliveBeIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
long expireTime = System.currentTimeMillis() + Config.check_wal_queue_timeout_threshold;
while (true) {
LOG.info("wait for wal queue size to be empty");
boolean walFinished = Env.getCurrentEnv().getGroupCommitManager()
.isPreviousWalFinished(tableId, aliveBeIds);
if (walFinished) {
LOG.info("all wal is finished");
LOG.info("all wal is finished for table={}", tableId);
break;
} else if (System.currentTimeMillis() > expireTime) {
LOG.warn("waitWalFinished time out");
LOG.warn("waitWalFinished time out for table={}", tableId);
break;
} else {
try {
Thread.sleep(100);
} catch (InterruptedException ie) {
LOG.info("schema change job sleep wait for wal InterruptedException: ", ie);
LOG.warn("failed to wait for wal for table={} when schema change", tableId, ie);
}
}
}
Env.getCurrentEnv().getGroupCommitManager().setStatus(tableId, SchemaChangeStatus.NORMAL);
LOG.info("release table {}", tableId);
Env.getCurrentEnv().getGroupCommitManager().unblockTable(tableId);
LOG.info("unblock group commit for table={} when schema change", tableId);
}

private void onFinished(OlapTable tbl) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ private void runAlterJobV2() {
}

List<Long> backendTabletIds = invertedIndex.getTabletIdsByBackendId(beId);
if (Config.drop_backend_after_decommission && checkTablets(beId, backendTabletIds) && checkWal(backend)) {
boolean hasWal = checkWal(backend);
if (Config.drop_backend_after_decommission && checkTablets(beId, backendTabletIds) && hasWal) {
try {
systemInfoService.dropBackend(beId);
LOG.info("no available tablet on decommission backend {}, drop it", beId);
Expand All @@ -94,8 +95,9 @@ private void runAlterJobV2() {
continue;
}

LOG.info("backend {} lefts {} replicas to decommission: {}", beId, backendTabletIds.size(),
backendTabletIds.subList(0, Math.min(10, backendTabletIds.size())));
LOG.info("backend {} lefts {} replicas to decommission: {}{}", beId, backendTabletIds.size(),
backendTabletIds.subList(0, Math.min(10, backendTabletIds.size())),
hasWal ? "; and has unfinished WALs" : "");
}
}

Expand Down Expand Up @@ -197,8 +199,7 @@ private boolean checkTablets(Long beId, List<Long> backendTabletIds) {
}

private boolean checkWal(Backend backend) {
return Env.getCurrentEnv().getGroupCommitManager()
.getAllWalQueueSize(backend) == 0;
return Env.getCurrentEnv().getGroupCommitManager().getAllWalQueueSize(backend) == 0;
}

private List<Backend> checkDecommission(DecommissionBackendClause decommissionBackendClause)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1140,7 +1140,7 @@ public void analyzeGroupCommit(Analyzer analyzer) throws AnalysisException {
return;
}
boolean partialUpdate = ConnectContext.get().getSessionVariable().isEnableUniqueKeyPartialUpdate();
if (!partialUpdate && ConnectContext.get().getSessionVariable().isEnableInsertGroupCommit()
if (!isExplain() && !partialUpdate && ConnectContext.get().getSessionVariable().isEnableInsertGroupCommit()
&& ConnectContext.get().getSessionVariable().getSqlMode() != SqlModeHelper.MODE_NO_BACKSLASH_ESCAPES
&& targetTable instanceof OlapTable
&& ((OlapTable) targetTable).getTableProperty().getUseSchemaLightChange()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ public Object execute(HttpServletRequest request, HttpServletResponse response)
List<Backend> backends = getBackends(hostInfos);
List<String> backendsList = new ArrayList<>();
for (Backend backend : backends) {
long size = Env.getCurrentEnv().getGroupCommitManager()
.getAllWalQueueSize(backend);
long size = Env.getCurrentEnv().getGroupCommitManager().getAllWalQueueSize(backend);
backendsList.add(backend.getHost() + ":" + backend.getHeartbeatPort() + ":" + size);
}
return ResponseEntityBuilder.ok(backendsList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,11 @@ public Object streamLoad(HttpServletRequest request,
@PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) {
boolean groupCommit = false;
String groupCommitStr = request.getHeader("group_commit");
if (groupCommitStr != null && groupCommitStr.equals("async_mode")) {
if (groupCommitStr != null && groupCommitStr.equalsIgnoreCase("async_mode")) {
groupCommit = true;
try {
String[] pair = new String[] {db, table};
if (isGroupCommitBlock(pair)) {
String msg = "insert table " + pair[1] + " is blocked on schema change";
if (isGroupCommitBlock(db, table)) {
String msg = "insert table " + table + " is blocked on schema change";
return new RestBaseResult(msg);
}
} catch (Exception e) {
Expand Down Expand Up @@ -122,19 +121,17 @@ public Object streamLoad(HttpServletRequest request,
}
}

@RequestMapping(path = "/api/_http_stream",
method = RequestMethod.PUT)
public Object streamLoadWithSql(HttpServletRequest request,
HttpServletResponse response) {
@RequestMapping(path = "/api/_http_stream", method = RequestMethod.PUT)
public Object streamLoadWithSql(HttpServletRequest request, HttpServletResponse response) {
String sql = request.getHeader("sql");
LOG.info("streaming load sql={}", sql);
boolean groupCommit = false;
String groupCommitStr = request.getHeader("group_commit");
if (groupCommitStr != null && groupCommitStr.equals("async_mode")) {
if (groupCommitStr != null && groupCommitStr.equalsIgnoreCase("async_mode")) {
groupCommit = true;
try {
String[] pair = parseDbAndTb(sql);
if (isGroupCommitBlock(pair)) {
if (isGroupCommitBlock(pair[0], pair[1])) {
String msg = "insert table " + pair[1] + " is blocked on schema change";
return new RestBaseResult(msg);
}
Expand Down Expand Up @@ -164,11 +161,11 @@ public Object streamLoadWithSql(HttpServletRequest request,
}
}

private boolean isGroupCommitBlock(String[] pair) throws TException {
String fullDbName = getFullDbName(pair[0]);
private boolean isGroupCommitBlock(String db, String table) throws TException {
String fullDbName = getFullDbName(db);
Database dbObj = Env.getCurrentInternalCatalog()
.getDbOrException(fullDbName, s -> new TException("database is invalid for dbName: " + s));
Table tblObj = dbObj.getTableOrException(pair[1], s -> new TException("table is invalid: " + s));
Table tblObj = dbObj.getTableOrException(table, s -> new TException("table is invalid: " + s));
return Env.getCurrentEnv().getGroupCommitManager().isBlock(tblObj.getId());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.doris.load;


import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest;
Expand All @@ -30,31 +29,27 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Set;
import java.util.concurrent.Future;

public class GroupCommitManager {

public enum SchemaChangeStatus {
BLOCK, NORMAL
}

private static final Logger LOG = LogManager.getLogger(GroupCommitManager.class);

private final Map<Long, SchemaChangeStatus> statusMap = new ConcurrentHashMap<>();
private Set<Long> blockedTableIds = new HashSet<>();

public boolean isBlock(long tableId) {
if (statusMap.containsKey(tableId)) {
return statusMap.get(tableId) == SchemaChangeStatus.BLOCK;
}
return false;
return blockedTableIds.contains(tableId);
}

public void blockTable(long tableId) {
blockedTableIds.add(tableId);
}

public void setStatus(long tableId, SchemaChangeStatus status) {
LOG.debug("Setting status for tableId {}: {}", tableId, status);
statusMap.put(tableId, status);
public void unblockTable(long tableId) {
blockedTableIds.remove(tableId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,6 @@ public PGroupCommitInsertResponse executeGroupCommitInsert(ConnectContext ctx,
}
}
PGroupCommitInsertRequest request = PGroupCommitInsertRequest.newBuilder()
.setDbId(db.getId())
.setTableId(table.getId())
.setBaseSchemaVersion(table.getBaseSchemaVersion())
.setExecPlanFragmentRequest(InternalService.PExecPlanFragmentRequest.newBuilder()
.setRequest(execPlanFragmentParamsBytes)
.setCompact(false).setVersion(InternalService.PFragmentRequestVersion.VERSION_2).build())
Expand Down
6 changes: 3 additions & 3 deletions gensrc/proto/internal_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -766,11 +766,11 @@ message PGlobResponse {
}

message PGroupCommitInsertRequest {
optional int64 db_id = 1;
optional int64 table_id = 2;
optional int64 db_id = 1; // deprecated
optional int64 table_id = 2; // deprecated
// Descriptors.TDescriptorTable
// optional bytes desc_tbl = 3;
optional int64 base_schema_version = 4;
optional int64 base_schema_version = 4; // deprecated

// TExecPlanFragmentParams -> TPlanFragment -> PlanNodes.TPlan
// optional bytes plan_node = 5;
Expand Down

0 comments on commit 4a86690

Please sign in to comment.