diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlCommand.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlCommand.java index f8a03029d5a383..b83be01405d87a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlCommand.java @@ -47,6 +47,8 @@ public enum MysqlCommand { COM_STMT_SEND_LONG_DATA("COM_STMT_SEND_LONG_DATA", 24), COM_STMT_CLOSE("COM_STMT_CLOSE", 25), COM_STMT_RESET("COM_STMT_RESET", 26), + COM_SET_OPTION("COM_RESET_CONNECTION", 27), + COM_STMT_FETCH("COM_RESET_CONNECTION", 28), COM_DAEMON("COM_DAEMON", 29), COM_RESET_CONNECTION("COM_RESET_CONNECTION", 31); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlHandshakePacket.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlHandshakePacket.java index c2ba21a23ee321..566f4ac3f7e0ca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlHandshakePacket.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlHandshakePacket.java @@ -33,7 +33,7 @@ public class MysqlHandshakePacket extends MysqlPacket { private static final MysqlCapability SSL_CAPABILITY = MysqlCapability.SSL_CAPABILITY; // status flags not supported in palo private static final int STATUS_FLAGS = 0; - private static final String AUTH_PLUGIN_NAME = "mysql_native_password"; + public static final String AUTH_PLUGIN_NAME = "mysql_native_password"; // connection id used in KILL statement. private int connectionId; diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java index 3d198adb6e5821..b442aaa3241024 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java @@ -137,6 +137,7 @@ public static boolean negotiate(ConnectContext context) throws IOException { serializer.reset(); MysqlHandshakePacket handshakePacket = new MysqlHandshakePacket(context.getConnectionId()); handshakePacket.writeTo(serializer); + context.setMysqlHandshakePacket(handshakePacket); try { channel.sendAndFlush(serializer.toByteBuffer()); } catch (IOException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index a10f8a49a39baa..07cb110c0ee302 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -44,6 +44,7 @@ import org.apache.doris.mysql.MysqlCapability; import org.apache.doris.mysql.MysqlChannel; import org.apache.doris.mysql.MysqlCommand; +import org.apache.doris.mysql.MysqlHandshakePacket; import org.apache.doris.mysql.MysqlSslContext; import org.apache.doris.mysql.ProxyMysqlChannel; import org.apache.doris.nereids.StatementContext; @@ -200,6 +201,8 @@ public class ConnectContext { // In this case, `skipAuth` needs to be set to `true` to skip the permission check of `AlterTable` private boolean skipAuth = false; + private MysqlHandshakePacket mysqlHandshakePacket; + public void setUserQueryTimeout(int queryTimeout) { if (queryTimeout > 0) { sessionVariable.setQueryTimeoutS(queryTimeout); @@ -952,5 +955,13 @@ public boolean isSkipAuth() { public void setSkipAuth(boolean skipAuth) { this.skipAuth = skipAuth; } + + public void setMysqlHandshakePacket(MysqlHandshakePacket mysqlHandshakePacket) { + this.mysqlHandshakePacket = mysqlHandshakePacket; + } + + public byte[] getAuthPluginData() { + return mysqlHandshakePacket == null ? null : mysqlHandshakePacket.getAuthPluginData(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index 1ee08fb298e67f..c994d524c3e37a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -33,6 +33,7 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.AuthenticationException; import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; @@ -44,25 +45,31 @@ import org.apache.doris.common.util.SqlUtils; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.metric.MetricRepo; import org.apache.doris.mysql.MysqlChannel; import org.apache.doris.mysql.MysqlCommand; +import org.apache.doris.mysql.MysqlHandshakePacket; import org.apache.doris.mysql.MysqlPacket; import org.apache.doris.mysql.MysqlProto; import org.apache.doris.mysql.MysqlSerializer; import org.apache.doris.mysql.MysqlServerStatusFlag; +import org.apache.doris.mysql.privilege.Auth; import org.apache.doris.nereids.glue.LogicalPlanAdapter; import org.apache.doris.nereids.minidump.MinidumpUtils; import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.stats.StatsErrorEstimator; import org.apache.doris.proto.Data; import org.apache.doris.qe.QueryState.MysqlStateType; +import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TExprNode; import org.apache.doris.thrift.TMasterOpRequest; import org.apache.doris.thrift.TMasterOpResult; import org.apache.doris.thrift.TUniqueId; +import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanKind; @@ -174,10 +181,19 @@ private void handlePing() { ctx.getState().setOk(); } - private void handleStmtReset() { + // Do nothing for now. + protected void handleStatistics() { ctx.getState().setOk(); } + // Do nothing for now. + protected void handleDebug() { + ctx.getState().setOk(); + } + + protected void handleStmtReset() { + } + private void handleStmtClose() { packetBuf = packetBuf.order(ByteOrder.LITTLE_ENDIAN); int stmtId = packetBuf.getInt(); @@ -517,6 +533,15 @@ private void dispatch() throws IOException { case COM_PING: handlePing(); break; + case COM_STATISTICS: + handleStatistics(); + break; + case COM_DEBUG: + handleDebug(); + break; + case COM_CHANGE_USER: + handleChangeUser(); + break; case COM_STMT_RESET: handleStmtReset(); break; @@ -746,6 +771,96 @@ public TMasterOpResult proxyExecute(TMasterOpRequest request) throws TException return result; } + private void handleChangeUser() throws IOException { + // Random bytes generated when creating connection. + byte[] authPluginData = ctx.getAuthPluginData(); + Preconditions.checkNotNull(authPluginData, "Auth plugin data is null."); + String userName = new String(MysqlProto.readNulTerminateString(packetBuf)); + userName = ClusterNamespace.getFullName(SystemInfoService.DEFAULT_CLUSTER, userName); + int passwordLen = MysqlProto.readInt1(packetBuf); + byte[] password = MysqlProto.readFixedString(packetBuf, passwordLen); + String db = new String(MysqlProto.readNulTerminateString(packetBuf)); + // Read the character set. + MysqlProto.readInt2(packetBuf); + String authPluginName = new String(MysqlProto.readNulTerminateString(packetBuf)); + + // Send Protocol::AuthSwitchRequest to client if auth plugin name is not mysql_native_password + if (!MysqlHandshakePacket.AUTH_PLUGIN_NAME.equals(authPluginName)) { + MysqlChannel channel = ctx.mysqlChannel; + MysqlSerializer serializer = MysqlSerializer.newInstance(); + serializer.writeInt1((byte) 0xfe); + serializer.writeNulTerminateString(MysqlHandshakePacket.AUTH_PLUGIN_NAME); + serializer.writeBytes(authPluginData); + serializer.writeInt1(0); + channel.sendAndFlush(serializer.toByteBuffer()); + // Server receive auth switch response packet from client. + ByteBuffer authSwitchResponse = channel.fetchOnePacket(); + int length = authSwitchResponse.limit(); + password = new byte[length]; + System.arraycopy(authSwitchResponse.array(), 0, password, 0, length); + } + + // For safety, not allowed to change to root or admin. + if (Auth.ROOT_USER.equals(userName) || Auth.ADMIN_USER.equals(userName)) { + ctx.getState().setError(ErrorCode.ERR_ACCESS_DENIED_ERROR, "Change to root or admin is forbidden"); + return; + } + + // Check password. + List currentUserIdentity = Lists.newArrayList(); + try { + Env.getCurrentEnv().getAuth() + .checkPassword(userName, ctx.remoteIP, password, authPluginData, currentUserIdentity); + } catch (AuthenticationException e) { + ctx.getState().setError(ErrorCode.ERR_ACCESS_DENIED_ERROR, "Authentication failed."); + return; + } + ctx.setCurrentUserIdentity(currentUserIdentity.get(0)); + ctx.setQualifiedUser(userName); + + // Change default db if set. + if (Strings.isNullOrEmpty(db)) { + ctx.changeDefaultCatalog(InternalCatalog.INTERNAL_CATALOG_NAME); + } else { + String catalogName = null; + String dbName = null; + String[] dbNames = db.split("\\."); + if (dbNames.length == 1) { + dbName = db; + } else if (dbNames.length == 2) { + catalogName = dbNames[0]; + dbName = dbNames[1]; + } else if (dbNames.length > 2) { + ctx.getState().setError(ErrorCode.ERR_BAD_DB_ERROR, "Only one dot can be in the name: " + db); + return; + } + dbName = ClusterNamespace.getFullName(ctx.getClusterName(), dbName); + + // check catalog and db exists + if (catalogName != null) { + CatalogIf catalogIf = ctx.getEnv().getCatalogMgr().getCatalog(catalogName); + if (catalogIf == null) { + ctx.getState().setError(ErrorCode.ERR_BAD_DB_ERROR, "No match catalog in doris: " + db); + return; + } + if (catalogIf.getDbNullable(dbName) == null) { + ctx.getState().setError(ErrorCode.ERR_BAD_DB_ERROR, "No match database in doris: " + db); + return; + } + } + try { + if (catalogName != null) { + ctx.getEnv().changeCatalog(ctx, catalogName); + } + Env.getCurrentEnv().changeDb(ctx, dbName); + } catch (DdlException e) { + ctx.getState().setError(e.getMysqlErrorCode(), e.getMessage()); + return; + } + } + ctx.getState().setOk(); + } + // Process a MySQL request public void processOnce() throws IOException { // set status of query to OK.