Skip to content

Commit

Permalink
[cherry-pick](branch-2.1) Fix some group commit fault (#40319)
Browse files Browse the repository at this point in the history
## Proposed changes

Pick #39986 #40120

<!--Describe your changes.-->
  • Loading branch information
Yukang-Lian authored Sep 5, 2024
1 parent 40d10bd commit 52393f8
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 51 deletions.
120 changes: 73 additions & 47 deletions fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class LoadAction extends RestBaseController {

@RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + "}/_load", method = RequestMethod.PUT)
public Object load(HttpServletRequest request, HttpServletResponse response,
@PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) {
@PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) {
if (needRedirect(request.getScheme())) {
return redirectToHttps(request);
}
Expand All @@ -87,20 +87,28 @@ public Object load(HttpServletRequest request, HttpServletResponse response,

@RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + "}/_stream_load", method = RequestMethod.PUT)
public Object streamLoad(HttpServletRequest request,
HttpServletResponse response,
@PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) {
HttpServletResponse response,
@PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) {
boolean groupCommit = false;
String groupCommitStr = request.getHeader("group_commit");
if (groupCommitStr != null && groupCommitStr.equalsIgnoreCase("async_mode")) {
groupCommit = true;
try {
if (isGroupCommitBlock(db, table)) {
String msg = "insert table " + table + " is blocked on schema change";
return new RestBaseResult(msg);
if (groupCommitStr != null) {
if (!groupCommitStr.equalsIgnoreCase("async_mode") && !groupCommitStr.equalsIgnoreCase("sync_mode")
&& !groupCommitStr.equalsIgnoreCase("off_mode")) {
return new RestBaseResult("Header `group_commit` can only be `sync_mode`, `async_mode` or `off_mode`.");
}
if (!groupCommitStr.equalsIgnoreCase("off_mode")) {
groupCommit = true;
if (groupCommitStr.equalsIgnoreCase("async_mode")) {
try {
if (isGroupCommitBlock(db, table)) {
String msg = "insert table " + table + " is blocked on schema change";
return new RestBaseResult(msg);
}
} catch (Exception e) {
LOG.info("exception:" + e);
return new RestBaseResult(e.getMessage());
}
}
} catch (Exception e) {
LOG.info("exception:" + e);
return new RestBaseResult(e.getMessage());
}
}
if (needRedirect(request.getScheme())) {
Expand Down Expand Up @@ -131,21 +139,32 @@ public Object streamLoadWithSql(HttpServletRequest request, HttpServletResponse
boolean groupCommit = false;
long tableId = -1;
String groupCommitStr = request.getHeader("group_commit");
if (groupCommitStr != null && groupCommitStr.equalsIgnoreCase("async_mode")) {
groupCommit = true;
try {
String[] pair = parseDbAndTb(sql);
Database db = Env.getCurrentInternalCatalog()
.getDbOrException(pair[0], s -> new TException("database is invalid for dbName: " + s));
Table tbl = db.getTableOrException(pair[1], s -> new TException("table is invalid: " + s));
tableId = tbl.getId();
if (isGroupCommitBlock(pair[0], pair[1])) {
String msg = "insert table " + pair[1] + " is blocked on schema change";
return new RestBaseResult(msg);
if (groupCommitStr != null) {
if (!groupCommitStr.equalsIgnoreCase("async_mode") && !groupCommitStr.equalsIgnoreCase("sync_mode")
&& !groupCommitStr.equalsIgnoreCase("off_mode")) {
return new RestBaseResult("Header `group_commit` can only be `sync_mode`, `async_mode` or `off_mode`.");
}
if (!groupCommitStr.equalsIgnoreCase("off_mode")) {
try {
groupCommit = true;
String[] pair = parseDbAndTb(sql);
Database db = Env.getCurrentInternalCatalog()
.getDbOrException(pair[0], s -> new TException("database is invalid for dbName: " + s));
Table tbl = db.getTableOrException(pair[1], s -> new TException("table is invalid: " + s));
tableId = tbl.getId();

// async mode needs to write WAL, we need to block load during waiting WAL.
if (groupCommitStr.equalsIgnoreCase("async_mode")) {
if (isGroupCommitBlock(pair[0], pair[1])) {
String msg = "insert table " + pair[1] + " is blocked on schema change";
return new RestBaseResult(msg);
}

}
} catch (Exception e) {
LOG.info("exception:" + e);
return new RestBaseResult(e.getMessage());
}
} catch (Exception e) {
LOG.info("exception:" + e);
return new RestBaseResult(e.getMessage());
}
}
executeCheckPassword(request, response);
Expand Down Expand Up @@ -207,8 +226,8 @@ private String[] parseDbAndTb(String sql) throws Exception {

@RequestMapping(path = "/api/{" + DB_KEY + "}/_stream_load_2pc", method = RequestMethod.PUT)
public Object streamLoad2PC(HttpServletRequest request,
HttpServletResponse response,
@PathVariable(value = DB_KEY) String db) {
HttpServletResponse response,
@PathVariable(value = DB_KEY) String db) {
if (needRedirect(request.getScheme())) {
return redirectToHttps(request);
}
Expand All @@ -219,9 +238,9 @@ public Object streamLoad2PC(HttpServletRequest request,

@RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + "}/_stream_load_2pc", method = RequestMethod.PUT)
public Object streamLoad2PC_table(HttpServletRequest request,
HttpServletResponse response,
@PathVariable(value = DB_KEY) String db,
@PathVariable(value = TABLE_KEY) String table) {
HttpServletResponse response,
@PathVariable(value = DB_KEY) String db,
@PathVariable(value = TABLE_KEY) String table) {
if (needRedirect(request.getScheme())) {
return redirectToHttps(request);
}
Expand Down Expand Up @@ -361,21 +380,7 @@ private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit, HttpServ
throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
}
if (groupCommit) {
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
ctx.setThreadLocalInfo();
ctx.setRemoteIP(request.getRemoteAddr());
// We set this variable to fulfill required field 'user' in
// TMasterOpRequest(FrontendService.thrift)
ctx.setQualifiedUser(Auth.ADMIN_USER);
ctx.setThreadLocalInfo();

try {
backend = Env.getCurrentEnv().getGroupCommitManager()
.selectBackendForGroupCommit(tableId, ctx, false);
} catch (DdlException e) {
throw new RuntimeException(e);
}
backend = selectBackendForGroupCommit(request, tableId);
} else {
backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
}
Expand All @@ -402,7 +407,7 @@ private boolean checkClusterToken(String token) {
// temporarily addressing the users' needs for audit logs.
// So this function is not widely tested under general scenario
private Object executeWithClusterToken(HttpServletRequest request, String db,
String table, boolean isStreamLoad) {
String table, boolean isStreamLoad) {
try {
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
Expand Down Expand Up @@ -473,4 +478,25 @@ private Object executeWithClusterToken(HttpServletRequest request, String db,
ConnectContext.remove();
}
}

private Backend selectBackendForGroupCommit(HttpServletRequest req, long tableId)
throws LoadException {
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
ctx.setThreadLocalInfo();
ctx.setRemoteIP(req.getRemoteAddr());
// We set this variable to fulfill required field 'user' in
// TMasterOpRequest(FrontendService.thrift)
ctx.setQualifiedUser(Auth.ADMIN_USER);
ctx.setThreadLocalInfo();

Backend backend = null;
try {
backend = Env.getCurrentEnv().getGroupCommitManager()
.selectBackendForGroupCommit(tableId, ctx);
} catch (DdlException e) {
throw new LoadException(e.getMessage(), e);
}
return backend;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ private long getWalQueueSize(Backend backend, PGetWalQueueSizeRequest request) {
return size;
}

public Backend selectBackendForGroupCommit(long tableId, ConnectContext context, boolean isCloud)
public Backend selectBackendForGroupCommit(long tableId, ConnectContext context)
throws LoadException, DdlException {
// If a group commit request is sent to the follower FE, we will send this request to the master FE. master FE
// can select a BE and return this BE id to follower FE.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public GroupCommitPlanner(Database db, OlapTable table, List<String> targetColum
public PGroupCommitInsertResponse executeGroupCommitInsert(ConnectContext ctx,
List<InternalService.PDataRow> rows)
throws DdlException, RpcException, ExecutionException, InterruptedException {
backend = ctx.getInsertGroupCommit(this.table.getId());
selectBackends(ctx);
if (backend == null || !backend.isAlive() || backend.isDecommissioned()) {
List<Long> allBackendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
if (allBackendIds.isEmpty()) {
Expand Down Expand Up @@ -162,7 +162,7 @@ public PGroupCommitInsertResponse executeGroupCommitInsert(ConnectContext ctx,
.setRequest(execPlanFragmentParamsBytes)
.setCompact(false).setVersion(InternalService.PFragmentRequestVersion.VERSION_2).build())
.setLoadId(Types.PUniqueId.newBuilder().setHi(loadId.hi).setLo(loadId.lo)
.build()).addAllData(rows)
.build()).addAllData(rows)
.build();
Future<PGroupCommitInsertResponse> future = BackendServiceProxy.getInstance()
.groupCommitInsert(new TNetworkAddress(backend.getHost(), backend.getBrpcPort()), request);
Expand Down Expand Up @@ -206,7 +206,7 @@ private static void processExprVal(Expr expr, InternalService.PDataRow.Builder r
protected void selectBackends(ConnectContext ctx) throws DdlException {
try {
backend = Env.getCurrentEnv().getGroupCommitManager()
.selectBackendForGroupCommit(this.table.getId(), ctx, false);
.selectBackendForGroupCommit(this.table.getId(), ctx);
} catch (LoadException e) {
throw new DdlException("No suitable backend");
}
Expand Down

0 comments on commit 52393f8

Please sign in to comment.