Skip to content

Commit

Permalink
[fix](mysql)Support COM_CHANGE_USER and other mysql command. (#40932) (
Browse files Browse the repository at this point in the history
…#41261)

backport: #40932
  • Loading branch information
Jibing-Li authored Sep 25, 2024
1 parent 2d1dac6 commit 8cdb27a
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public enum MysqlCommand {
COM_STMT_SEND_LONG_DATA("COM_STMT_SEND_LONG_DATA", 24),
COM_STMT_CLOSE("COM_STMT_CLOSE", 25),
COM_STMT_RESET("COM_STMT_RESET", 26),
COM_SET_OPTION("COM_RESET_CONNECTION", 27),
COM_STMT_FETCH("COM_RESET_CONNECTION", 28),
COM_DAEMON("COM_DAEMON", 29),
COM_RESET_CONNECTION("COM_RESET_CONNECTION", 31);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class MysqlHandshakePacket extends MysqlPacket {
private static final MysqlCapability SSL_CAPABILITY = MysqlCapability.SSL_CAPABILITY;
// status flags not supported in palo
private static final int STATUS_FLAGS = 0;
private static final String AUTH_PLUGIN_NAME = "mysql_native_password";
public static final String AUTH_PLUGIN_NAME = "mysql_native_password";

// connection id used in KILL statement.
private int connectionId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public static boolean negotiate(ConnectContext context) throws IOException {
serializer.reset();
MysqlHandshakePacket handshakePacket = new MysqlHandshakePacket(context.getConnectionId());
handshakePacket.writeTo(serializer);
context.setMysqlHandshakePacket(handshakePacket);
try {
channel.sendAndFlush(serializer.toByteBuffer());
} catch (IOException e) {
Expand Down
11 changes: 11 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.doris.mysql.MysqlCapability;
import org.apache.doris.mysql.MysqlChannel;
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.mysql.MysqlHandshakePacket;
import org.apache.doris.mysql.MysqlSslContext;
import org.apache.doris.mysql.ProxyMysqlChannel;
import org.apache.doris.nereids.StatementContext;
Expand Down Expand Up @@ -200,6 +201,8 @@ public class ConnectContext {
// In this case, `skipAuth` needs to be set to `true` to skip the permission check of `AlterTable`
private boolean skipAuth = false;

private MysqlHandshakePacket mysqlHandshakePacket;

public void setUserQueryTimeout(int queryTimeout) {
if (queryTimeout > 0) {
sessionVariable.setQueryTimeoutS(queryTimeout);
Expand Down Expand Up @@ -952,5 +955,13 @@ public boolean isSkipAuth() {
public void setSkipAuth(boolean skipAuth) {
this.skipAuth = skipAuth;
}

public void setMysqlHandshakePacket(MysqlHandshakePacket mysqlHandshakePacket) {
this.mysqlHandshakePacket = mysqlHandshakePacket;
}

public byte[] getAuthPluginData() {
return mysqlHandshakePacket == null ? null : mysqlHandshakePacket.getAuthPluginData();
}
}

117 changes: 116 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.doris.catalog.TableIf;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.AuthenticationException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
Expand All @@ -44,25 +45,31 @@
import org.apache.doris.common.util.SqlUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mysql.MysqlChannel;
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.mysql.MysqlHandshakePacket;
import org.apache.doris.mysql.MysqlPacket;
import org.apache.doris.mysql.MysqlProto;
import org.apache.doris.mysql.MysqlSerializer;
import org.apache.doris.mysql.MysqlServerStatusFlag;
import org.apache.doris.mysql.privilege.Auth;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.minidump.MinidumpUtils;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.stats.StatsErrorEstimator;
import org.apache.doris.proto.Data;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TExprNode;
import org.apache.doris.thrift.TMasterOpRequest;
import org.apache.doris.thrift.TMasterOpResult;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
Expand Down Expand Up @@ -174,10 +181,19 @@ private void handlePing() {
ctx.getState().setOk();
}

private void handleStmtReset() {
// Do nothing for now.
protected void handleStatistics() {
ctx.getState().setOk();
}

// Do nothing for now.
protected void handleDebug() {
ctx.getState().setOk();
}

protected void handleStmtReset() {
}

private void handleStmtClose() {
packetBuf = packetBuf.order(ByteOrder.LITTLE_ENDIAN);
int stmtId = packetBuf.getInt();
Expand Down Expand Up @@ -517,6 +533,15 @@ private void dispatch() throws IOException {
case COM_PING:
handlePing();
break;
case COM_STATISTICS:
handleStatistics();
break;
case COM_DEBUG:
handleDebug();
break;
case COM_CHANGE_USER:
handleChangeUser();
break;
case COM_STMT_RESET:
handleStmtReset();
break;
Expand Down Expand Up @@ -746,6 +771,96 @@ public TMasterOpResult proxyExecute(TMasterOpRequest request) throws TException
return result;
}

private void handleChangeUser() throws IOException {
// Random bytes generated when creating connection.
byte[] authPluginData = ctx.getAuthPluginData();
Preconditions.checkNotNull(authPluginData, "Auth plugin data is null.");
String userName = new String(MysqlProto.readNulTerminateString(packetBuf));
userName = ClusterNamespace.getFullName(SystemInfoService.DEFAULT_CLUSTER, userName);
int passwordLen = MysqlProto.readInt1(packetBuf);
byte[] password = MysqlProto.readFixedString(packetBuf, passwordLen);
String db = new String(MysqlProto.readNulTerminateString(packetBuf));
// Read the character set.
MysqlProto.readInt2(packetBuf);
String authPluginName = new String(MysqlProto.readNulTerminateString(packetBuf));

// Send Protocol::AuthSwitchRequest to client if auth plugin name is not mysql_native_password
if (!MysqlHandshakePacket.AUTH_PLUGIN_NAME.equals(authPluginName)) {
MysqlChannel channel = ctx.mysqlChannel;
MysqlSerializer serializer = MysqlSerializer.newInstance();
serializer.writeInt1((byte) 0xfe);
serializer.writeNulTerminateString(MysqlHandshakePacket.AUTH_PLUGIN_NAME);
serializer.writeBytes(authPluginData);
serializer.writeInt1(0);
channel.sendAndFlush(serializer.toByteBuffer());
// Server receive auth switch response packet from client.
ByteBuffer authSwitchResponse = channel.fetchOnePacket();
int length = authSwitchResponse.limit();
password = new byte[length];
System.arraycopy(authSwitchResponse.array(), 0, password, 0, length);
}

// For safety, not allowed to change to root or admin.
if (Auth.ROOT_USER.equals(userName) || Auth.ADMIN_USER.equals(userName)) {
ctx.getState().setError(ErrorCode.ERR_ACCESS_DENIED_ERROR, "Change to root or admin is forbidden");
return;
}

// Check password.
List<UserIdentity> currentUserIdentity = Lists.newArrayList();
try {
Env.getCurrentEnv().getAuth()
.checkPassword(userName, ctx.remoteIP, password, authPluginData, currentUserIdentity);
} catch (AuthenticationException e) {
ctx.getState().setError(ErrorCode.ERR_ACCESS_DENIED_ERROR, "Authentication failed.");
return;
}
ctx.setCurrentUserIdentity(currentUserIdentity.get(0));
ctx.setQualifiedUser(userName);

// Change default db if set.
if (Strings.isNullOrEmpty(db)) {
ctx.changeDefaultCatalog(InternalCatalog.INTERNAL_CATALOG_NAME);
} else {
String catalogName = null;
String dbName = null;
String[] dbNames = db.split("\\.");
if (dbNames.length == 1) {
dbName = db;
} else if (dbNames.length == 2) {
catalogName = dbNames[0];
dbName = dbNames[1];
} else if (dbNames.length > 2) {
ctx.getState().setError(ErrorCode.ERR_BAD_DB_ERROR, "Only one dot can be in the name: " + db);
return;
}
dbName = ClusterNamespace.getFullName(ctx.getClusterName(), dbName);

// check catalog and db exists
if (catalogName != null) {
CatalogIf catalogIf = ctx.getEnv().getCatalogMgr().getCatalog(catalogName);
if (catalogIf == null) {
ctx.getState().setError(ErrorCode.ERR_BAD_DB_ERROR, "No match catalog in doris: " + db);
return;
}
if (catalogIf.getDbNullable(dbName) == null) {
ctx.getState().setError(ErrorCode.ERR_BAD_DB_ERROR, "No match database in doris: " + db);
return;
}
}
try {
if (catalogName != null) {
ctx.getEnv().changeCatalog(ctx, catalogName);
}
Env.getCurrentEnv().changeDb(ctx, dbName);
} catch (DdlException e) {
ctx.getState().setError(e.getMysqlErrorCode(), e.getMessage());
return;
}
}
ctx.getState().setOk();
}

// Process a MySQL request
public void processOnce() throws IOException {
// set status of query to OK.
Expand Down

0 comments on commit 8cdb27a

Please sign in to comment.