diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 6159221f1e6ded..c6eb2ec7eaba35 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -7242,7 +7242,15 @@ admin_stmt ::= :} | KW_ADMIN KW_SET KW_FRONTEND KW_CONFIG opt_key_value_map:configs {: - RESULT = new AdminSetConfigStmt(AdminSetConfigStmt.ConfigType.FRONTEND, configs); + RESULT = new AdminSetConfigStmt(AdminSetConfigStmt.ConfigType.FRONTEND, configs, false); + :} + | KW_ADMIN KW_SET KW_ALL KW_FRONTENDS KW_CONFIG opt_key_value_map:configs + {: + RESULT = new AdminSetConfigStmt(AdminSetConfigStmt.ConfigType.FRONTEND, configs, true); + :} + | KW_ADMIN KW_SET KW_FRONTEND KW_CONFIG opt_key_value_map:configs KW_ALL + {: + RESULT = new AdminSetConfigStmt(AdminSetConfigStmt.ConfigType.FRONTEND, configs, true); :} // deprecated | KW_ADMIN KW_SHOW KW_FRONTEND KW_CONFIG opt_wild_where diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminSetConfigStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminSetConfigStmt.java index 166f9a700964db..1d2e22ee878b13 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminSetConfigStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminSetConfigStmt.java @@ -25,6 +25,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.OriginStatement; import com.google.common.collect.Maps; @@ -38,22 +39,25 @@ public enum ConfigType { BACKEND } + private boolean applyToAll; private ConfigType type; private Map configs; private RedirectStatus redirectStatus = RedirectStatus.NO_FORWARD; - public AdminSetConfigStmt(ConfigType type, Map configs) { + public AdminSetConfigStmt(ConfigType type, Map configs, boolean applyToAll) { this.type = type; this.configs = configs; if (this.configs == null) { this.configs = Maps.newHashMap(); } + this.applyToAll = applyToAll; // we have to analyze configs here to determine whether to forward it to master for (String key : this.configs.keySet()) { if (ConfigBase.checkIsMasterOnly(key)) { redirectStatus = RedirectStatus.FORWARD_NO_SYNC; + this.applyToAll = false; } } } @@ -66,6 +70,10 @@ public Map getConfigs() { return configs; } + public boolean isApplyToAll() { + return applyToAll; + } + @Override public void analyze(Analyzer analyzer) throws AnalysisException, UserException { super.analyze(analyzer); @@ -87,4 +95,13 @@ public void analyze(Analyzer analyzer) throws AnalysisException, UserException { public RedirectStatus getRedirectStatus() { return redirectStatus; } + + public OriginStatement getLocalSetStmt() { + OriginStatement stmt = this.getOrigStmt(); + Object[] keyArr = configs.keySet().toArray(); + String sql = String.format("ADMIN SET FRONTEND CONFIG (\"%s\" = \"%s\");", + keyArr[0].toString(), configs.get(keyArr[0].toString())); + + return new OriginStatement(sql, stmt.idx); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 50bbe3aba72bdb..58802dbb21d6f9 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -206,6 +206,7 @@ import org.apache.doris.policy.PolicyMgr; import org.apache.doris.qe.AuditEventProcessor; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.FEOpExecutor; import org.apache.doris.qe.GlobalVariable; import org.apache.doris.qe.JournalObservable; import org.apache.doris.qe.VariableMgr; @@ -5070,7 +5071,7 @@ public void replayDropGlobalFunction(FunctionSearchDesc functionSearchDesc) { globalFunctionMgr.replayDropFunction(functionSearchDesc); } - public void setConfig(AdminSetConfigStmt stmt) throws DdlException { + public void setConfig(AdminSetConfigStmt stmt) throws Exception { Map configs = stmt.getConfigs(); Preconditions.checkState(configs.size() == 1); @@ -5081,6 +5082,22 @@ public void setConfig(AdminSetConfigStmt stmt) throws DdlException { throw new DdlException(e.getMessage()); } } + + if (stmt.isApplyToAll()) { + for (Frontend fe : Env.getCurrentEnv().getFrontends(null /* all */)) { + if (!fe.isAlive() || fe.getHost().equals(Env.getCurrentEnv().getSelfNode().getHost())) { + continue; + } + + TNetworkAddress feAddr = new TNetworkAddress(fe.getHost(), fe.getRpcPort()); + FEOpExecutor executor = new FEOpExecutor(feAddr, stmt.getLocalSetStmt(), ConnectContext.get(), false); + executor.execute(); + if (executor.getStatusCode() != TStatusCode.OK.getValue()) { + throw new DdlException(String.format("failed to apply to fe %s:%s, error message: %s", + fe.getHost(), fe.getRpcPort(), executor.getErrMsg())); + } + } + } } public void replayBackendReplicasInfo(BackendReplicasInfo backendReplicasInfo) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java new file mode 100644 index 00000000000000..f1fa13b258544e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java @@ -0,0 +1,203 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe; + +import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.ClientPool; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.thrift.FrontendService; +import org.apache.doris.thrift.TExpr; +import org.apache.doris.thrift.TExprNode; +import org.apache.doris.thrift.TMasterOpRequest; +import org.apache.doris.thrift.TMasterOpResult; +import org.apache.doris.thrift.TNetworkAddress; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransportException; + +import java.util.Map; + +public class FEOpExecutor { + private static final Logger LOG = LogManager.getLogger(FEOpExecutor.class); + + private static final float RPC_TIMEOUT_COEFFICIENT = 1.2f; + + private final OriginStatement originStmt; + private final ConnectContext ctx; + private TMasterOpResult result; + private TNetworkAddress feAddr; + + // the total time of thrift connectTime, readTime and writeTime + private int thriftTimeoutMs; + + private boolean shouldNotRetry; + + public FEOpExecutor(TNetworkAddress feAddress, OriginStatement originStmt, ConnectContext ctx, boolean isQuery) { + this.feAddr = feAddress; + this.originStmt = originStmt; + this.ctx = ctx; + this.thriftTimeoutMs = (int) (ctx.getExecTimeout() * 1000 * RPC_TIMEOUT_COEFFICIENT); + // if isQuery=false, we shouldn't retry twice when catch exception because of Idempotency + this.shouldNotRetry = !isQuery; + } + + public void execute() throws Exception { + result = forward(feAddr, buildStmtForwardParams()); + } + + // Send request to specific fe + private TMasterOpResult forward(TNetworkAddress thriftAddress, TMasterOpRequest params) throws Exception { + if (!ctx.getEnv().isReady()) { + throw new Exception("Env is not ready"); + } + + FrontendService.Client client; + try { + client = ClientPool.frontendPool.borrowObject(thriftAddress, thriftTimeoutMs); + } catch (Exception e) { + // may throw NullPointerException. add err msg + throw new Exception("Failed to get fe client: " + thriftAddress.toString(), e); + } + final StringBuilder forwardMsg = new StringBuilder("forward to FE " + thriftAddress.toString()); + forwardMsg.append(", statement id: ").append(ctx.getStmtId()); + LOG.info(forwardMsg.toString()); + + boolean isReturnToPool = false; + try { + final TMasterOpResult result = client.forward(params); + isReturnToPool = true; + return result; + } catch (TTransportException e) { + // wrap the raw exception. + forwardMsg.append(" : failed"); + Exception exception = new ForwardToFEException(forwardMsg.toString(), e); + + boolean ok = ClientPool.frontendPool.reopen(client, thriftTimeoutMs); + if (!ok) { + throw exception; + } + if (shouldNotRetry || e.getType() == TTransportException.TIMED_OUT) { + throw exception; + } else { + LOG.warn(forwardMsg.append(" twice").toString(), e); + try { + TMasterOpResult result = client.forward(params); + isReturnToPool = true; + return result; + } catch (TException ex) { + throw exception; + } + } + } finally { + if (isReturnToPool) { + ClientPool.frontendPool.returnObject(thriftAddress, client); + } else { + ClientPool.frontendPool.invalidateObject(thriftAddress, client); + } + } + } + + private TMasterOpRequest buildStmtForwardParams() { + TMasterOpRequest params = new TMasterOpRequest(); + // node ident + params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost()); + params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort()); + params.setSql(originStmt.originStmt); + params.setStmtIdx(originStmt.idx); + params.setUser(ctx.getQualifiedUser()); + params.setDefaultCatalog(ctx.getDefaultCatalog()); + params.setDefaultDatabase(ctx.getDatabase()); + params.setDb(ctx.getDatabase()); + params.setUserIp(ctx.getRemoteIP()); + params.setStmtId(ctx.getStmtId()); + params.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift()); + + String cluster = ctx.getClusterName(); + if (!Strings.isNullOrEmpty(cluster)) { + params.setCluster(cluster); + } + + // query options + params.setQueryOptions(ctx.getSessionVariable().getQueryOptionVariables()); + // session variables + params.setSessionVariables(ctx.getSessionVariable().getForwardVariables()); + params.setUserVariables(getForwardUserVariables(ctx.getUserVars())); + if (null != ctx.queryId()) { + params.setQueryId(ctx.queryId()); + } + return params; + } + + public int getStatusCode() { + if (result == null || !result.isSetStatusCode()) { + return ErrorCode.ERR_UNKNOWN_ERROR.getCode(); + } + return result.getStatusCode(); + } + + public String getErrMsg() { + if (result == null) { + return ErrorCode.ERR_UNKNOWN_ERROR.getErrorMsg(); + } + if (!result.isSetErrMessage()) { + return ""; + } + return result.getErrMessage(); + } + + private Map getForwardUserVariables(Map userVariables) { + Map forwardVariables = Maps.newHashMap(); + for (Map.Entry entry : userVariables.entrySet()) { + LiteralExpr literalExpr = entry.getValue(); + TExpr tExpr = literalExpr.treeToThrift(); + TExprNode tExprNode = tExpr.nodes.get(0); + forwardVariables.put(entry.getKey(), tExprNode); + } + return forwardVariables; + } + + public static class ForwardToFEException extends RuntimeException { + + private static final Map TYPE_MSG_MAP = + ImmutableMap.builder() + .put(TTransportException.UNKNOWN, "Unknown exception") + .put(TTransportException.NOT_OPEN, "Connection is not open") + .put(TTransportException.ALREADY_OPEN, "Connection has already opened up") + .put(TTransportException.TIMED_OUT, "Connection timeout") + .put(TTransportException.END_OF_FILE, "EOF") + .put(TTransportException.CORRUPTED_DATA, "Corrupted data") + .build(); + + private final String msg; + + public ForwardToFEException(String msg, TTransportException exception) { + this.msg = msg + ", cause: " + TYPE_MSG_MAP.get(exception.getType()) + ", " + exception.getMessage(); + } + + @Override + public String getMessage() { + return msg; + } + } +}