diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index 030aced411f3e6..c3a77a13b3dc36 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -70,7 +70,7 @@ public class LoadAction extends RestBaseController { @RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + "}/_load", method = RequestMethod.PUT) public Object load(HttpServletRequest request, HttpServletResponse response, - @PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) { + @PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) { if (needRedirect(request.getScheme())) { return redirectToHttps(request); } @@ -87,20 +87,28 @@ public Object load(HttpServletRequest request, HttpServletResponse response, @RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + "}/_stream_load", method = RequestMethod.PUT) public Object streamLoad(HttpServletRequest request, - HttpServletResponse response, - @PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) { + HttpServletResponse response, + @PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) { boolean groupCommit = false; String groupCommitStr = request.getHeader("group_commit"); - if (groupCommitStr != null && groupCommitStr.equalsIgnoreCase("async_mode")) { - groupCommit = true; - try { - if (isGroupCommitBlock(db, table)) { - String msg = "insert table " + table + " is blocked on schema change"; - return new RestBaseResult(msg); + if (groupCommitStr != null) { + if (!groupCommitStr.equalsIgnoreCase("async_mode") && !groupCommitStr.equalsIgnoreCase("sync_mode") + && !groupCommitStr.equalsIgnoreCase("off_mode")) { + return new RestBaseResult("Header `group_commit` can only be `sync_mode`, `async_mode` or `off_mode`."); + } + if (!groupCommitStr.equalsIgnoreCase("off_mode")) { + groupCommit = true; + if (groupCommitStr.equalsIgnoreCase("async_mode")) { + try { + if (isGroupCommitBlock(db, table)) { + String msg = "insert table " + table + " is blocked on schema change"; + return new RestBaseResult(msg); + } + } catch (Exception e) { + LOG.info("exception:" + e); + return new RestBaseResult(e.getMessage()); + } } - } catch (Exception e) { - LOG.info("exception:" + e); - return new RestBaseResult(e.getMessage()); } } if (needRedirect(request.getScheme())) { @@ -131,21 +139,32 @@ public Object streamLoadWithSql(HttpServletRequest request, HttpServletResponse boolean groupCommit = false; long tableId = -1; String groupCommitStr = request.getHeader("group_commit"); - if (groupCommitStr != null && groupCommitStr.equalsIgnoreCase("async_mode")) { - groupCommit = true; - try { - String[] pair = parseDbAndTb(sql); - Database db = Env.getCurrentInternalCatalog() - .getDbOrException(pair[0], s -> new TException("database is invalid for dbName: " + s)); - Table tbl = db.getTableOrException(pair[1], s -> new TException("table is invalid: " + s)); - tableId = tbl.getId(); - if (isGroupCommitBlock(pair[0], pair[1])) { - String msg = "insert table " + pair[1] + " is blocked on schema change"; - return new RestBaseResult(msg); + if (groupCommitStr != null) { + if (!groupCommitStr.equalsIgnoreCase("async_mode") && !groupCommitStr.equalsIgnoreCase("sync_mode") + && !groupCommitStr.equalsIgnoreCase("off_mode")) { + return new RestBaseResult("Header `group_commit` can only be `sync_mode`, `async_mode` or `off_mode`."); + } + if (!groupCommitStr.equalsIgnoreCase("off_mode")) { + try { + groupCommit = true; + String[] pair = parseDbAndTb(sql); + Database db = Env.getCurrentInternalCatalog() + .getDbOrException(pair[0], s -> new TException("database is invalid for dbName: " + s)); + Table tbl = db.getTableOrException(pair[1], s -> new TException("table is invalid: " + s)); + tableId = tbl.getId(); + + // async mode needs to write WAL, we need to block load during waiting WAL. + if (groupCommitStr.equalsIgnoreCase("async_mode")) { + if (isGroupCommitBlock(pair[0], pair[1])) { + String msg = "insert table " + pair[1] + " is blocked on schema change"; + return new RestBaseResult(msg); + } + + } + } catch (Exception e) { + LOG.info("exception:" + e); + return new RestBaseResult(e.getMessage()); } - } catch (Exception e) { - LOG.info("exception:" + e); - return new RestBaseResult(e.getMessage()); } } executeCheckPassword(request, response); @@ -207,8 +226,8 @@ private String[] parseDbAndTb(String sql) throws Exception { @RequestMapping(path = "/api/{" + DB_KEY + "}/_stream_load_2pc", method = RequestMethod.PUT) public Object streamLoad2PC(HttpServletRequest request, - HttpServletResponse response, - @PathVariable(value = DB_KEY) String db) { + HttpServletResponse response, + @PathVariable(value = DB_KEY) String db) { if (needRedirect(request.getScheme())) { return redirectToHttps(request); } @@ -219,9 +238,9 @@ public Object streamLoad2PC(HttpServletRequest request, @RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + "}/_stream_load_2pc", method = RequestMethod.PUT) public Object streamLoad2PC_table(HttpServletRequest request, - HttpServletResponse response, - @PathVariable(value = DB_KEY) String db, - @PathVariable(value = TABLE_KEY) String table) { + HttpServletResponse response, + @PathVariable(value = DB_KEY) String db, + @PathVariable(value = TABLE_KEY) String table) { if (needRedirect(request.getScheme())) { return redirectToHttps(request); } @@ -361,21 +380,7 @@ private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit, HttpServ throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy); } if (groupCommit) { - ConnectContext ctx = new ConnectContext(); - ctx.setEnv(Env.getCurrentEnv()); - ctx.setThreadLocalInfo(); - ctx.setRemoteIP(request.getRemoteAddr()); - // We set this variable to fulfill required field 'user' in - // TMasterOpRequest(FrontendService.thrift) - ctx.setQualifiedUser(Auth.ADMIN_USER); - ctx.setThreadLocalInfo(); - - try { - backend = Env.getCurrentEnv().getGroupCommitManager() - .selectBackendForGroupCommit(tableId, ctx, false); - } catch (DdlException e) { - throw new RuntimeException(e); - } + backend = selectBackendForGroupCommit(request, tableId); } else { backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0)); } @@ -402,7 +407,7 @@ private boolean checkClusterToken(String token) { // temporarily addressing the users' needs for audit logs. // So this function is not widely tested under general scenario private Object executeWithClusterToken(HttpServletRequest request, String db, - String table, boolean isStreamLoad) { + String table, boolean isStreamLoad) { try { ConnectContext ctx = new ConnectContext(); ctx.setEnv(Env.getCurrentEnv()); @@ -473,4 +478,25 @@ private Object executeWithClusterToken(HttpServletRequest request, String db, ConnectContext.remove(); } } + + private Backend selectBackendForGroupCommit(HttpServletRequest req, long tableId) + throws LoadException { + ConnectContext ctx = new ConnectContext(); + ctx.setEnv(Env.getCurrentEnv()); + ctx.setThreadLocalInfo(); + ctx.setRemoteIP(req.getRemoteAddr()); + // We set this variable to fulfill required field 'user' in + // TMasterOpRequest(FrontendService.thrift) + ctx.setQualifiedUser(Auth.ADMIN_USER); + ctx.setThreadLocalInfo(); + + Backend backend = null; + try { + backend = Env.getCurrentEnv().getGroupCommitManager() + .selectBackendForGroupCommit(tableId, ctx); + } catch (DdlException e) { + throw new LoadException(e.getMessage(), e); + } + return backend; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java index f8c37d647abe53..656a9a8a0ae7f4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java @@ -181,7 +181,7 @@ private long getWalQueueSize(Backend backend, PGetWalQueueSizeRequest request) { return size; } - public Backend selectBackendForGroupCommit(long tableId, ConnectContext context, boolean isCloud) + public Backend selectBackendForGroupCommit(long tableId, ConnectContext context) throws LoadException, DdlException { // If a group commit request is sent to the follower FE, we will send this request to the master FE. master FE // can select a BE and return this BE id to follower FE. diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java index 0b051aeb88806b..9b1044b2f7ef88 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java @@ -134,7 +134,7 @@ public GroupCommitPlanner(Database db, OlapTable table, List targetColum public PGroupCommitInsertResponse executeGroupCommitInsert(ConnectContext ctx, List rows) throws DdlException, RpcException, ExecutionException, InterruptedException { - backend = ctx.getInsertGroupCommit(this.table.getId()); + selectBackends(ctx); if (backend == null || !backend.isAlive() || backend.isDecommissioned()) { List allBackendIds = Env.getCurrentSystemInfo().getAllBackendIds(true); if (allBackendIds.isEmpty()) { @@ -162,7 +162,7 @@ public PGroupCommitInsertResponse executeGroupCommitInsert(ConnectContext ctx, .setRequest(execPlanFragmentParamsBytes) .setCompact(false).setVersion(InternalService.PFragmentRequestVersion.VERSION_2).build()) .setLoadId(Types.PUniqueId.newBuilder().setHi(loadId.hi).setLo(loadId.lo) - .build()).addAllData(rows) + .build()).addAllData(rows) .build(); Future future = BackendServiceProxy.getInstance() .groupCommitInsert(new TNetworkAddress(backend.getHost(), backend.getBrpcPort()), request); @@ -206,7 +206,7 @@ private static void processExprVal(Expr expr, InternalService.PDataRow.Builder r protected void selectBackends(ConnectContext ctx) throws DdlException { try { backend = Env.getCurrentEnv().getGroupCommitManager() - .selectBackendForGroupCommit(this.table.getId(), ctx, false); + .selectBackendForGroupCommit(this.table.getId(), ctx); } catch (LoadException e) { throw new DdlException("No suitable backend"); }