diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index 030aced411f3e6..2865f86c8f6079 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -70,7 +70,7 @@ public class LoadAction extends RestBaseController { @RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + "}/_load", method = RequestMethod.PUT) public Object load(HttpServletRequest request, HttpServletResponse response, - @PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) { + @PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) { if (needRedirect(request.getScheme())) { return redirectToHttps(request); } @@ -87,20 +87,29 @@ public Object load(HttpServletRequest request, HttpServletResponse response, @RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + "}/_stream_load", method = RequestMethod.PUT) public Object streamLoad(HttpServletRequest request, - HttpServletResponse response, - @PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) { + HttpServletResponse response, + @PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) { + LOG.info("streamload action, db: {}, tbl: {}, headers: {}", db, table, getAllHeaders(request)); boolean groupCommit = false; String groupCommitStr = request.getHeader("group_commit"); - if (groupCommitStr != null && groupCommitStr.equalsIgnoreCase("async_mode")) { - groupCommit = true; - try { - if (isGroupCommitBlock(db, table)) { - String msg = "insert table " + table + " is blocked on schema change"; - return new RestBaseResult(msg); + if (groupCommitStr != null) { + if (!groupCommitStr.equalsIgnoreCase("async_mode") && !groupCommitStr.equalsIgnoreCase("sync_mode") + && !groupCommitStr.equalsIgnoreCase("off_mode")) { + return new RestBaseResult("Header `group_commit` can only be `sync_mode`, `async_mode` or `off_mode`."); + } + if (!groupCommitStr.equalsIgnoreCase("off_mode")) { + groupCommit = true; + if (groupCommitStr.equalsIgnoreCase("async_mode")) { + try { + if (isGroupCommitBlock(db, table)) { + String msg = "insert table " + table + GroupCommitPlanner.SCHEMA_CHANGE; + return new RestBaseResult(msg); + } + } catch (Exception e) { + LOG.info("exception:" + e); + return new RestBaseResult(e.getMessage()); + } } - } catch (Exception e) { - LOG.info("exception:" + e); - return new RestBaseResult(e.getMessage()); } } if (needRedirect(request.getScheme())) { @@ -131,21 +140,32 @@ public Object streamLoadWithSql(HttpServletRequest request, HttpServletResponse boolean groupCommit = false; long tableId = -1; String groupCommitStr = request.getHeader("group_commit"); - if (groupCommitStr != null && groupCommitStr.equalsIgnoreCase("async_mode")) { - groupCommit = true; - try { - String[] pair = parseDbAndTb(sql); - Database db = Env.getCurrentInternalCatalog() - .getDbOrException(pair[0], s -> new TException("database is invalid for dbName: " + s)); - Table tbl = db.getTableOrException(pair[1], s -> new TException("table is invalid: " + s)); - tableId = tbl.getId(); - if (isGroupCommitBlock(pair[0], pair[1])) { - String msg = "insert table " + pair[1] + " is blocked on schema change"; - return new RestBaseResult(msg); + if (groupCommitStr != null) { + if (!groupCommitStr.equalsIgnoreCase("async_mode") && !groupCommitStr.equalsIgnoreCase("sync_mode") + && !groupCommitStr.equalsIgnoreCase("off_mode")) { + return new RestBaseResult("Header `group_commit` can only be `sync_mode`, `async_mode` or `off_mode`."); + } + if (!groupCommitStr.equalsIgnoreCase("off_mode")) { + try { + groupCommit = true; + String[] pair = parseDbAndTb(sql); + Database db = Env.getCurrentInternalCatalog() + .getDbOrException(pair[0], s -> new TException("database is invalid for dbName: " + s)); + Table tbl = db.getTableOrException(pair[1], s -> new TException("table is invalid: " + s)); + tableId = tbl.getId(); + + // async mode needs to write WAL, we need to block load during waiting WAL. + if (groupCommitStr.equalsIgnoreCase("async_mode")) { + if (isGroupCommitBlock(pair[0], pair[1])) { + String msg = "insert table " + pair[1] + GroupCommitPlanner.SCHEMA_CHANGE; + return new RestBaseResult(msg); + } + + } + } catch (Exception e) { + LOG.info("exception:" + e); + return new RestBaseResult(e.getMessage()); } - } catch (Exception e) { - LOG.info("exception:" + e); - return new RestBaseResult(e.getMessage()); } } executeCheckPassword(request, response); @@ -207,8 +227,9 @@ private String[] parseDbAndTb(String sql) throws Exception { @RequestMapping(path = "/api/{" + DB_KEY + "}/_stream_load_2pc", method = RequestMethod.PUT) public Object streamLoad2PC(HttpServletRequest request, - HttpServletResponse response, - @PathVariable(value = DB_KEY) String db) { + HttpServletResponse response, + @PathVariable(value = DB_KEY) String db) { + LOG.info("streamload action 2PC, db: {}, headers: {}", db, getAllHeaders(request)); if (needRedirect(request.getScheme())) { return redirectToHttps(request); } @@ -219,9 +240,10 @@ public Object streamLoad2PC(HttpServletRequest request, @RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + "}/_stream_load_2pc", method = RequestMethod.PUT) public Object streamLoad2PC_table(HttpServletRequest request, - HttpServletResponse response, - @PathVariable(value = DB_KEY) String db, - @PathVariable(value = TABLE_KEY) String table) { + HttpServletResponse response, + @PathVariable(value = DB_KEY) String db, + @PathVariable(value = TABLE_KEY) String table) { + LOG.info("streamload action 2PC, db: {}, tbl: {}, headers: {}", db, table, getAllHeaders(request)); if (needRedirect(request.getScheme())) { return redirectToHttps(request); } @@ -361,21 +383,7 @@ private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit, HttpServ throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy); } if (groupCommit) { - ConnectContext ctx = new ConnectContext(); - ctx.setEnv(Env.getCurrentEnv()); - ctx.setThreadLocalInfo(); - ctx.setRemoteIP(request.getRemoteAddr()); - // We set this variable to fulfill required field 'user' in - // TMasterOpRequest(FrontendService.thrift) - ctx.setQualifiedUser(Auth.ADMIN_USER); - ctx.setThreadLocalInfo(); - - try { - backend = Env.getCurrentEnv().getGroupCommitManager() - .selectBackendForGroupCommit(tableId, ctx, false); - } catch (DdlException e) { - throw new RuntimeException(e); - } + backend = selectBackendForGroupCommit("", request, tableId); } else { backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0)); } @@ -402,7 +410,7 @@ private boolean checkClusterToken(String token) { // temporarily addressing the users' needs for audit logs. // So this function is not widely tested under general scenario private Object executeWithClusterToken(HttpServletRequest request, String db, - String table, boolean isStreamLoad) { + String table, boolean isStreamLoad) { try { ConnectContext ctx = new ConnectContext(); ctx.setEnv(Env.getCurrentEnv()); @@ -473,4 +481,36 @@ private Object executeWithClusterToken(HttpServletRequest request, String db, ConnectContext.remove(); } } + + private String getAllHeaders(HttpServletRequest request) { + StringBuilder headers = new StringBuilder(); + Enumeration headerNames = request.getHeaderNames(); + while (headerNames.hasMoreElements()) { + String headerName = headerNames.nextElement(); + String headerValue = request.getHeader(headerName); + headers.append(headerName).append(":").append(headerValue).append(", "); + } + return headers.toString(); + } + + private Backend selectBackendForGroupCommit(String clusterName, HttpServletRequest req, long tableId) + throws LoadException { + ConnectContext ctx = new ConnectContext(); + ctx.setEnv(Env.getCurrentEnv()); + ctx.setThreadLocalInfo(); + ctx.setRemoteIP(req.getRemoteAddr()); + // We set this variable to fulfill required field 'user' in + // TMasterOpRequest(FrontendService.thrift) + ctx.setQualifiedUser(Auth.ADMIN_USER); + ctx.setThreadLocalInfo(); + + Backend backend = null; + try { + backend = Env.getCurrentEnv().getGroupCommitManager() + .selectBackendForGroupCommit(tableId, ctx, isCloud); + } catch (DdlException e) { + throw new LoadException(e.getMessage(), e); + } + return backend; + } }