diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/ProxyMysqlChannel.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/ProxyMysqlChannel.java new file mode 100644 index 000000000000000..ed491df4b87e8b6 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/ProxyMysqlChannel.java @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mysql; + +import com.google.common.collect.Lists; + +import java.nio.ByteBuffer; +import java.util.List; + +/** + * An interceptor for proxy query, keeping all packets to be sent to master mysql channel. + */ +public class ProxyMysqlChannel extends MysqlChannel { + + private final List proxyResultBuffer = Lists.newArrayList(); + + @Override + public void sendOnePacket(ByteBuffer packet) { + proxyResultBuffer.add(packet); + } + + public List getProxyResultBufferList() { + return proxyResultBuffer; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index b5794464d9802ae..af92a284f0244a7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -46,6 +46,7 @@ import org.apache.doris.mysql.MysqlChannel; import org.apache.doris.mysql.MysqlCommand; import org.apache.doris.mysql.MysqlSslContext; +import org.apache.doris.mysql.ProxyMysqlChannel; import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.stats.StatsErrorEstimator; import org.apache.doris.nereids.trees.expressions.literal.Literal; @@ -329,14 +330,20 @@ public void init() { } public ConnectContext() { - this((StreamConnection) null); + this(null); } public ConnectContext(StreamConnection connection) { + this(connection, false); + } + + public ConnectContext(StreamConnection connection, boolean isProxy) { connectType = ConnectType.MYSQL; serverCapability = MysqlCapability.DEFAULT_CAPABILITY; if (connection != null) { mysqlChannel = new MysqlChannel(connection, this); + } else if (isProxy) { + mysqlChannel = new ProxyMysqlChannel(); } else { mysqlChannel = new DummyMysqlChannel(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index b297d192c422c14..ad17d33daca553c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -435,7 +435,11 @@ public void finalizeCommand() throws IOException { && ctx.getState().getStateType() != QueryState.MysqlStateType.ERR) { ShowResultSet resultSet = executor.getShowResultSet(); if (resultSet == null) { - packet = executor.getOutputPacket(); + if (executor.sendProxyQueryResult()) { + packet = getResultPacket(); + } else { + packet = executor.getOutputPacket(); + } } else { executor.sendResultSet(resultSet); packet = getResultPacket(); @@ -584,8 +588,12 @@ public TMasterOpResult proxyExecute(TMasterOpRequest request) { result.setStatusCode(ctx.getState().getErrorCode().getCode()); result.setErrMessage(ctx.getState().getErrorMessage()); } - if (executor != null && executor.getProxyResultSet() != null) { - result.setResultSet(executor.getProxyResultSet().tothrift()); + if (executor != null) { + if (executor.getProxyShowResultSet() != null) { + result.setResultSet(executor.getProxyShowResultSet().tothrift()); + } else if (!executor.getProxyQueryResultBufList().isEmpty()) { + result.setQueryResultBufList(executor.getProxyQueryResultBufList()); + } } return result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java index 7ed7061fbf51d72..40c126b732d03e0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java @@ -35,6 +35,8 @@ import org.apache.thrift.transport.TTransportException; import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; import java.util.Map; public class MasterOpExecutor { @@ -144,7 +146,7 @@ private TMasterOpResult forward(TMasterOpRequest params) throws Exception { private TMasterOpRequest buildStmtForwardParams() { TMasterOpRequest params = new TMasterOpRequest(); - //node ident + // node ident params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost()); params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort()); params.setSql(originStmt.originStmt); @@ -170,7 +172,7 @@ private TMasterOpRequest buildStmtForwardParams() { private TMasterOpRequest buildSyncJournalParmas() { final TMasterOpRequest params = new TMasterOpRequest(); - //node ident + // node ident params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost()); params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort()); params.setSyncJournalOnly(true); @@ -235,6 +237,10 @@ public ShowResultSet getProxyResultSet() { } } + public List getQueryResultBufList() { + return result.isSetQueryResultBufList() ? result.getQueryResultBufList() : Collections.emptyList(); + } + public void setResult(TMasterOpResult result) { this.result = result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 94a9fa5712ce490..15ea1847fabb226 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -100,6 +100,8 @@ import org.apache.doris.common.profile.Profile; import org.apache.doris.common.profile.SummaryProfile; import org.apache.doris.common.profile.SummaryProfile.SummaryBuilder; +import org.apache.doris.common.util.DebugPointUtil; +import org.apache.doris.common.util.DebugPointUtil.DebugPoint; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.MetaLockUtils; import org.apache.doris.common.util.ProfileManager.ProfileType; @@ -115,6 +117,7 @@ import org.apache.doris.mysql.MysqlCommand; import org.apache.doris.mysql.MysqlEofPacket; import org.apache.doris.mysql.MysqlSerializer; +import org.apache.doris.mysql.ProxyMysqlChannel; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.StatementContext; @@ -224,7 +227,7 @@ public class StmtExecutor { private RedirectStatus redirectStatus = null; private Planner planner; private boolean isProxy; - private ShowResultSet proxyResultSet = null; + private ShowResultSet proxyShowResultSet = null; private Data.PQueryStatistics.Builder statisticsForAuditLog; private boolean isCached; private String stmtName; @@ -363,7 +366,7 @@ public boolean isForwardToMaster() { // this is a query stmt, but this non-master FE can not read, forward it to master if (isQuery() && !Env.getCurrentEnv().isMaster() - && !Env.getCurrentEnv().canRead()) { + && (!Env.getCurrentEnv().canRead() || debugForwardAllQueries())) { return true; } @@ -374,6 +377,11 @@ public boolean isForwardToMaster() { } } + private boolean debugForwardAllQueries() { + DebugPoint debugPoint = DebugPointUtil.getDebugPoint("StmtExecutor.forward_all_queries"); + return debugPoint != null && debugPoint.param("forwardAllQueries", true); + } + public ByteBuffer getOutputPacket() { if (masterOpExecutor == null) { return null; @@ -382,8 +390,8 @@ public ByteBuffer getOutputPacket() { } } - public ShowResultSet getProxyResultSet() { - return proxyResultSet; + public ShowResultSet getProxyShowResultSet() { + return proxyShowResultSet; } public ShowResultSet getShowResultSet() { @@ -2356,7 +2364,7 @@ private void handleShow() throws IOException, AnalysisException, DdlException { return; } if (isProxy) { - proxyResultSet = resultSet; + proxyShowResultSet = resultSet; return; } @@ -2851,8 +2859,8 @@ public void setProfileType(ProfileType profileType) { } - public void setProxyResultSet(ShowResultSet proxyResultSet) { - this.proxyResultSet = proxyResultSet; + public void setProxyShowResultSet(ShowResultSet proxyShowResultSet) { + this.proxyShowResultSet = proxyShowResultSet; } public ConnectContext getContext() { @@ -2869,6 +2877,24 @@ public String getOriginStmtInString() { } return ""; } + + public List getProxyQueryResultBufList() { + return ((ProxyMysqlChannel) context.getMysqlChannel()).getProxyResultBufferList(); + } + + public boolean sendProxyQueryResult() throws IOException { + if (masterOpExecutor == null) { + return false; + } + List queryResultBufList = masterOpExecutor.getQueryResultBufList(); + if (queryResultBufList.isEmpty()) { + return false; + } + for (ByteBuffer byteBuffer : queryResultBufList) { + context.getMysqlChannel().sendOnePacket(byteBuffer); + } + return true; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 18623c33f7a8bad..444a0024594b775 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -950,7 +950,7 @@ public TMasterOpResult forward(TMasterOpRequest params) throws TException { // add this log so that we can track this stmt LOG.debug("receive forwarded stmt {} from FE: {}", params.getStmtId(), params.getClientNodeHost()); - ConnectContext context = new ConnectContext(); + ConnectContext context = new ConnectContext(null, true); // Set current connected FE to the client address, so that we can know where this request come from. context.setCurrentConnectedFEIp(params.getClientNodeHost()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index fe64fb14142fd5e..a8f6db6eb9ad64f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -281,7 +281,7 @@ private void sendJobId(List analysisInfos, boolean proxy) { if (!proxy) { ConnectContext.get().getExecutor().sendResultSet(commonResultSet); } else { - ConnectContext.get().getExecutor().setProxyResultSet(commonResultSet); + ConnectContext.get().getExecutor().setProxyShowResultSet(commonResultSet); } } catch (Throwable t) { LOG.warn("Failed to send job id to user", t); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index d96ee346148d300..03926990c575ef5 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -551,6 +551,7 @@ struct TMasterOpResult { 5: optional string status; 6: optional i32 statusCode; 7: optional string errMessage; + 8: optional list queryResultBufList; } struct TUpdateExportTaskStatusRequest { diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index 9e238570acd8c61..04e377d35091860 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -221,15 +221,19 @@ class Suite implements GroovyInterceptable { def user = context.config.jdbcUser def password = context.config.jdbcPassword - def masterFe = cluster.getMasterFe() - for (def i=0; masterFe == null && i<30; i++) { - masterFe = cluster.getMasterFe() + Frontend fe = null + for (def i=0; fe == null && i<30; i++) { + if (options.connectToFollower) { + fe = cluster.getOneFollowerFe() + } else { + fe = cluster.getMasterFe() + } Thread.sleep(1000) } - assertNotNull(masterFe) + assertNotNull(fe) def url = String.format( "jdbc:mysql://%s:%s/?useLocalSessionState=false&allowLoadLocalInfile=false", - masterFe.host, masterFe.queryPort) + fe.host, fe.queryPort) def conn = DriverManager.getConnection(url, user, password) def sql = "CREATE DATABASE IF NOT EXISTS " + context.dbName logger.info("try create database if not exists {}", context.dbName) diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy index 47b642af4853ce6..6d4ae4be0ad8b80 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy @@ -36,6 +36,7 @@ class ClusterOptions { int beNum = 3 List feConfigs = ['heartbeat_interval_second=5'] List beConfigs = [] + boolean connectToFollower = false // each be disks, a disks format is: disk_type=disk_num[,disk_capacity] // here disk_type=HDD or SSD, disk capacity is in gb unit. @@ -242,6 +243,10 @@ class SuiteCluster { return getFrontends().stream().filter(fe -> fe.isMaster).findFirst().orElse(null) } + Frontend getOneFollowerFe() { + return getFrontends().stream().filter(fe -> !fe.isMaster).findFirst().orElse(null) + } + Frontend getFeByIndex(int index) { return getFrontends().stream().filter(fe -> fe.index == index).findFirst().orElse(null) } diff --git a/regression-test/suites/query_p0/test_forward_qeury.groovy b/regression-test/suites/query_p0/test_forward_qeury.groovy new file mode 100644 index 000000000000000..8dbef459d2dd75a --- /dev/null +++ b/regression-test/suites/query_p0/test_forward_qeury.groovy @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.doris.regression.util.NodeType + +suite("test_forward_query") { + def options = new ClusterOptions() + options.enableDebugPoints() + options.setFeNum(2) + options.connectToFollower = true + + docker(options) { + def tbl = "test_forward_query" + sql """ DROP TABLE IF EXISTS ${tbl} """ + sql """ + CREATE TABLE ${tbl} + ( + k1 int + ) + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ); + """ + + sql """ INSERT INTO ${tbl} VALUES(1);""" + + cluster.injectDebugPoints(NodeType.FE, ['StmtExecutor.forward_all_queries' : [forwardAllQueries:true]]) + + try { + sql """ SELECT * FROM ${tbl} """ + } catch (Exception ignored) { + assertTrue(false) + } + } +} \ No newline at end of file