Skip to content

Commit

Permalink
[Improvement](set) enable admin_set_frontend_config can apply to all …
Browse files Browse the repository at this point in the history
…fe (#34751)
  • Loading branch information
Yulei-Yang authored May 14, 2024
1 parent f1eaf16 commit 17b1df5
Show file tree
Hide file tree
Showing 4 changed files with 248 additions and 3 deletions.
10 changes: 9 additions & 1 deletion fe/fe-core/src/main/cup/sql_parser.cup
Original file line number Diff line number Diff line change
Expand Up @@ -7242,7 +7242,15 @@ admin_stmt ::=
:}
| KW_ADMIN KW_SET KW_FRONTEND KW_CONFIG opt_key_value_map:configs
{:
RESULT = new AdminSetConfigStmt(AdminSetConfigStmt.ConfigType.FRONTEND, configs);
RESULT = new AdminSetConfigStmt(AdminSetConfigStmt.ConfigType.FRONTEND, configs, false);
:}
| KW_ADMIN KW_SET KW_ALL KW_FRONTENDS KW_CONFIG opt_key_value_map:configs
{:
RESULT = new AdminSetConfigStmt(AdminSetConfigStmt.ConfigType.FRONTEND, configs, true);
:}
| KW_ADMIN KW_SET KW_FRONTEND KW_CONFIG opt_key_value_map:configs KW_ALL
{:
RESULT = new AdminSetConfigStmt(AdminSetConfigStmt.ConfigType.FRONTEND, configs, true);
:}
// deprecated
| KW_ADMIN KW_SHOW KW_FRONTEND KW_CONFIG opt_wild_where
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.doris.common.UserException;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;

import com.google.common.collect.Maps;

Expand All @@ -38,22 +39,25 @@ public enum ConfigType {
BACKEND
}

private boolean applyToAll;
private ConfigType type;
private Map<String, String> configs;

private RedirectStatus redirectStatus = RedirectStatus.NO_FORWARD;

public AdminSetConfigStmt(ConfigType type, Map<String, String> configs) {
public AdminSetConfigStmt(ConfigType type, Map<String, String> configs, boolean applyToAll) {
this.type = type;
this.configs = configs;
if (this.configs == null) {
this.configs = Maps.newHashMap();
}
this.applyToAll = applyToAll;

// we have to analyze configs here to determine whether to forward it to master
for (String key : this.configs.keySet()) {
if (ConfigBase.checkIsMasterOnly(key)) {
redirectStatus = RedirectStatus.FORWARD_NO_SYNC;
this.applyToAll = false;
}
}
}
Expand All @@ -66,6 +70,10 @@ public Map<String, String> getConfigs() {
return configs;
}

public boolean isApplyToAll() {
return applyToAll;
}

@Override
public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
super.analyze(analyzer);
Expand All @@ -87,4 +95,13 @@ public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
public RedirectStatus getRedirectStatus() {
return redirectStatus;
}

public OriginStatement getLocalSetStmt() {
OriginStatement stmt = this.getOrigStmt();
Object[] keyArr = configs.keySet().toArray();
String sql = String.format("ADMIN SET FRONTEND CONFIG (\"%s\" = \"%s\");",
keyArr[0].toString(), configs.get(keyArr[0].toString()));

return new OriginStatement(sql, stmt.idx);
}
}
19 changes: 18 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@
import org.apache.doris.policy.PolicyMgr;
import org.apache.doris.qe.AuditEventProcessor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.FEOpExecutor;
import org.apache.doris.qe.GlobalVariable;
import org.apache.doris.qe.JournalObservable;
import org.apache.doris.qe.VariableMgr;
Expand Down Expand Up @@ -5070,7 +5071,7 @@ public void replayDropGlobalFunction(FunctionSearchDesc functionSearchDesc) {
globalFunctionMgr.replayDropFunction(functionSearchDesc);
}

public void setConfig(AdminSetConfigStmt stmt) throws DdlException {
public void setConfig(AdminSetConfigStmt stmt) throws Exception {
Map<String, String> configs = stmt.getConfigs();
Preconditions.checkState(configs.size() == 1);

Expand All @@ -5081,6 +5082,22 @@ public void setConfig(AdminSetConfigStmt stmt) throws DdlException {
throw new DdlException(e.getMessage());
}
}

if (stmt.isApplyToAll()) {
for (Frontend fe : Env.getCurrentEnv().getFrontends(null /* all */)) {
if (!fe.isAlive() || fe.getHost().equals(Env.getCurrentEnv().getSelfNode().getHost())) {
continue;
}

TNetworkAddress feAddr = new TNetworkAddress(fe.getHost(), fe.getRpcPort());
FEOpExecutor executor = new FEOpExecutor(feAddr, stmt.getLocalSetStmt(), ConnectContext.get(), false);
executor.execute();
if (executor.getStatusCode() != TStatusCode.OK.getValue()) {
throw new DdlException(String.format("failed to apply to fe %s:%s, error message: %s",
fe.getHost(), fe.getRpcPort(), executor.getErrMsg()));
}
}
}
}

public void replayBackendReplicasInfo(BackendReplicasInfo backendReplicasInfo) {
Expand Down
203 changes: 203 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.qe;

import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.TExpr;
import org.apache.doris.thrift.TExprNode;
import org.apache.doris.thrift.TMasterOpRequest;
import org.apache.doris.thrift.TMasterOpResult;
import org.apache.doris.thrift.TNetworkAddress;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;

import java.util.Map;

public class FEOpExecutor {
private static final Logger LOG = LogManager.getLogger(FEOpExecutor.class);

private static final float RPC_TIMEOUT_COEFFICIENT = 1.2f;

private final OriginStatement originStmt;
private final ConnectContext ctx;
private TMasterOpResult result;
private TNetworkAddress feAddr;

// the total time of thrift connectTime, readTime and writeTime
private int thriftTimeoutMs;

private boolean shouldNotRetry;

public FEOpExecutor(TNetworkAddress feAddress, OriginStatement originStmt, ConnectContext ctx, boolean isQuery) {
this.feAddr = feAddress;
this.originStmt = originStmt;
this.ctx = ctx;
this.thriftTimeoutMs = (int) (ctx.getExecTimeout() * 1000 * RPC_TIMEOUT_COEFFICIENT);
// if isQuery=false, we shouldn't retry twice when catch exception because of Idempotency
this.shouldNotRetry = !isQuery;
}

public void execute() throws Exception {
result = forward(feAddr, buildStmtForwardParams());
}

// Send request to specific fe
private TMasterOpResult forward(TNetworkAddress thriftAddress, TMasterOpRequest params) throws Exception {
if (!ctx.getEnv().isReady()) {
throw new Exception("Env is not ready");
}

FrontendService.Client client;
try {
client = ClientPool.frontendPool.borrowObject(thriftAddress, thriftTimeoutMs);
} catch (Exception e) {
// may throw NullPointerException. add err msg
throw new Exception("Failed to get fe client: " + thriftAddress.toString(), e);
}
final StringBuilder forwardMsg = new StringBuilder("forward to FE " + thriftAddress.toString());
forwardMsg.append(", statement id: ").append(ctx.getStmtId());
LOG.info(forwardMsg.toString());

boolean isReturnToPool = false;
try {
final TMasterOpResult result = client.forward(params);
isReturnToPool = true;
return result;
} catch (TTransportException e) {
// wrap the raw exception.
forwardMsg.append(" : failed");
Exception exception = new ForwardToFEException(forwardMsg.toString(), e);

boolean ok = ClientPool.frontendPool.reopen(client, thriftTimeoutMs);
if (!ok) {
throw exception;
}
if (shouldNotRetry || e.getType() == TTransportException.TIMED_OUT) {
throw exception;
} else {
LOG.warn(forwardMsg.append(" twice").toString(), e);
try {
TMasterOpResult result = client.forward(params);
isReturnToPool = true;
return result;
} catch (TException ex) {
throw exception;
}
}
} finally {
if (isReturnToPool) {
ClientPool.frontendPool.returnObject(thriftAddress, client);
} else {
ClientPool.frontendPool.invalidateObject(thriftAddress, client);
}
}
}

private TMasterOpRequest buildStmtForwardParams() {
TMasterOpRequest params = new TMasterOpRequest();
// node ident
params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
params.setSql(originStmt.originStmt);
params.setStmtIdx(originStmt.idx);
params.setUser(ctx.getQualifiedUser());
params.setDefaultCatalog(ctx.getDefaultCatalog());
params.setDefaultDatabase(ctx.getDatabase());
params.setDb(ctx.getDatabase());
params.setUserIp(ctx.getRemoteIP());
params.setStmtId(ctx.getStmtId());
params.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift());

String cluster = ctx.getClusterName();
if (!Strings.isNullOrEmpty(cluster)) {
params.setCluster(cluster);
}

// query options
params.setQueryOptions(ctx.getSessionVariable().getQueryOptionVariables());
// session variables
params.setSessionVariables(ctx.getSessionVariable().getForwardVariables());
params.setUserVariables(getForwardUserVariables(ctx.getUserVars()));
if (null != ctx.queryId()) {
params.setQueryId(ctx.queryId());
}
return params;
}

public int getStatusCode() {
if (result == null || !result.isSetStatusCode()) {
return ErrorCode.ERR_UNKNOWN_ERROR.getCode();
}
return result.getStatusCode();
}

public String getErrMsg() {
if (result == null) {
return ErrorCode.ERR_UNKNOWN_ERROR.getErrorMsg();
}
if (!result.isSetErrMessage()) {
return "";
}
return result.getErrMessage();
}

private Map<String, TExprNode> getForwardUserVariables(Map<String, LiteralExpr> userVariables) {
Map<String, TExprNode> forwardVariables = Maps.newHashMap();
for (Map.Entry<String, LiteralExpr> entry : userVariables.entrySet()) {
LiteralExpr literalExpr = entry.getValue();
TExpr tExpr = literalExpr.treeToThrift();
TExprNode tExprNode = tExpr.nodes.get(0);
forwardVariables.put(entry.getKey(), tExprNode);
}
return forwardVariables;
}

public static class ForwardToFEException extends RuntimeException {

private static final Map<Integer, String> TYPE_MSG_MAP =
ImmutableMap.<Integer, String>builder()
.put(TTransportException.UNKNOWN, "Unknown exception")
.put(TTransportException.NOT_OPEN, "Connection is not open")
.put(TTransportException.ALREADY_OPEN, "Connection has already opened up")
.put(TTransportException.TIMED_OUT, "Connection timeout")
.put(TTransportException.END_OF_FILE, "EOF")
.put(TTransportException.CORRUPTED_DATA, "Corrupted data")
.build();

private final String msg;

public ForwardToFEException(String msg, TTransportException exception) {
this.msg = msg + ", cause: " + TYPE_MSG_MAP.get(exception.getType()) + ", " + exception.getMessage();
}

@Override
public String getMessage() {
return msg;
}
}
}

0 comments on commit 17b1df5

Please sign in to comment.