Skip to content

Commit

Permalink
[enhancement](stmt-forward) record query result for proxy query to av…
Browse files Browse the repository at this point in the history
…oid EOF
  • Loading branch information
TangSiyang2001 committed Jan 30, 2024
1 parent 3dade1c commit a8856f9
Show file tree
Hide file tree
Showing 11 changed files with 161 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.mysql;

import com.google.common.collect.Lists;

import java.nio.ByteBuffer;
import java.util.List;

/**
* An interceptor for proxy query, keeping all packets to be sent to master mysql channel.
*/
public class ProxyMysqlChannel extends MysqlChannel {

private final List<ByteBuffer> proxyResultBuffer = Lists.newArrayList();

@Override
public void sendOnePacket(ByteBuffer packet) {
proxyResultBuffer.add(packet);
}

public List<ByteBuffer> getProxyResultBufferList() {
return proxyResultBuffer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.doris.mysql.MysqlChannel;
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.mysql.MysqlSslContext;
import org.apache.doris.mysql.ProxyMysqlChannel;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.stats.StatsErrorEstimator;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
Expand Down Expand Up @@ -329,14 +330,20 @@ public void init() {
}

public ConnectContext() {
this((StreamConnection) null);
this(null);
}

public ConnectContext(StreamConnection connection) {
this(connection, false);
}

public ConnectContext(StreamConnection connection, boolean isProxy) {
connectType = ConnectType.MYSQL;
serverCapability = MysqlCapability.DEFAULT_CAPABILITY;
if (connection != null) {
mysqlChannel = new MysqlChannel(connection, this);
} else if (isProxy) {
mysqlChannel = new ProxyMysqlChannel();
} else {
mysqlChannel = new DummyMysqlChannel();
}
Expand Down
14 changes: 11 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,11 @@ public void finalizeCommand() throws IOException {
&& ctx.getState().getStateType() != QueryState.MysqlStateType.ERR) {
ShowResultSet resultSet = executor.getShowResultSet();
if (resultSet == null) {
packet = executor.getOutputPacket();
if (executor.sendProxyQueryResult()) {
packet = getResultPacket();
} else {
packet = executor.getOutputPacket();
}
} else {
executor.sendResultSet(resultSet);
packet = getResultPacket();
Expand Down Expand Up @@ -584,8 +588,12 @@ public TMasterOpResult proxyExecute(TMasterOpRequest request) {
result.setStatusCode(ctx.getState().getErrorCode().getCode());
result.setErrMessage(ctx.getState().getErrorMessage());
}
if (executor != null && executor.getProxyResultSet() != null) {
result.setResultSet(executor.getProxyResultSet().tothrift());
if (executor != null) {
if (executor.getProxyShowResultSet() != null) {
result.setResultSet(executor.getProxyShowResultSet().tothrift());
} else if (!executor.getQueryResultBufList().isEmpty()) {
result.setQueryResultBufList(executor.getQueryResultBufList());
}
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.thrift.transport.TTransportException;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Map;

public class MasterOpExecutor {
Expand Down Expand Up @@ -144,7 +146,7 @@ private TMasterOpResult forward(TMasterOpRequest params) throws Exception {

private TMasterOpRequest buildStmtForwardParams() {
TMasterOpRequest params = new TMasterOpRequest();
//node ident
// node ident
params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
params.setSql(originStmt.originStmt);
Expand All @@ -170,7 +172,7 @@ private TMasterOpRequest buildStmtForwardParams() {

private TMasterOpRequest buildSyncJournalParmas() {
final TMasterOpRequest params = new TMasterOpRequest();
//node ident
// node ident
params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
params.setSyncJournalOnly(true);
Expand Down Expand Up @@ -235,6 +237,10 @@ public ShowResultSet getProxyResultSet() {
}
}

public List<ByteBuffer> getQueryResultBufList() {
return result.isSetQueryResultBufList() ? result.getQueryResultBufList() : Collections.emptyList();
}

public void setResult(TMasterOpResult result) {
this.result = result;
}
Expand Down
31 changes: 25 additions & 6 deletions fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.mysql.MysqlEofPacket;
import org.apache.doris.mysql.MysqlSerializer;
import org.apache.doris.mysql.ProxyMysqlChannel;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.StatementContext;
Expand Down Expand Up @@ -224,7 +225,7 @@ public class StmtExecutor {
private RedirectStatus redirectStatus = null;
private Planner planner;
private boolean isProxy;
private ShowResultSet proxyResultSet = null;
private ShowResultSet proxyShowResultSet = null;
private Data.PQueryStatistics.Builder statisticsForAuditLog;
private boolean isCached;
private String stmtName;
Expand Down Expand Up @@ -382,8 +383,8 @@ public ByteBuffer getOutputPacket() {
}
}

public ShowResultSet getProxyResultSet() {
return proxyResultSet;
public ShowResultSet getProxyShowResultSet() {
return proxyShowResultSet;
}

public ShowResultSet getShowResultSet() {
Expand Down Expand Up @@ -2356,7 +2357,7 @@ private void handleShow() throws IOException, AnalysisException, DdlException {
return;
}
if (isProxy) {
proxyResultSet = resultSet;
proxyShowResultSet = resultSet;
return;
}

Expand Down Expand Up @@ -2851,8 +2852,8 @@ public void setProfileType(ProfileType profileType) {
}


public void setProxyResultSet(ShowResultSet proxyResultSet) {
this.proxyResultSet = proxyResultSet;
public void setProxyShowResultSet(ShowResultSet proxyShowResultSet) {
this.proxyShowResultSet = proxyShowResultSet;
}

public ConnectContext getContext() {
Expand All @@ -2869,6 +2870,24 @@ public String getOriginStmtInString() {
}
return "";
}

public List<ByteBuffer> getQueryResultBufList() {
return ((ProxyMysqlChannel) context.getMysqlChannel()).getProxyResultBufferList();
}

public boolean sendProxyQueryResult() throws IOException {
if (masterOpExecutor == null) {
return false;
}
List<ByteBuffer> queryResultBufList = masterOpExecutor.getQueryResultBufList();
if (queryResultBufList.isEmpty()) {
return false;
}
for (ByteBuffer byteBuffer : queryResultBufList) {
context.getMysqlChannel().sendOnePacket(byteBuffer);
}
return true;
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -950,7 +950,7 @@ public TMasterOpResult forward(TMasterOpRequest params) throws TException {

// add this log so that we can track this stmt
LOG.debug("receive forwarded stmt {} from FE: {}", params.getStmtId(), params.getClientNodeHost());
ConnectContext context = new ConnectContext();
ConnectContext context = new ConnectContext(null, true);
// Set current connected FE to the client address, so that we can know where this request come from.
context.setCurrentConnectedFEIp(params.getClientNodeHost());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ private void sendJobId(List<AnalysisInfo> analysisInfos, boolean proxy) {
if (!proxy) {
ConnectContext.get().getExecutor().sendResultSet(commonResultSet);
} else {
ConnectContext.get().getExecutor().setProxyResultSet(commonResultSet);
ConnectContext.get().getExecutor().setProxyShowResultSet(commonResultSet);
}
} catch (Throwable t) {
LOG.warn("Failed to send job id to user", t);
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,7 @@ struct TMasterOpResult {
5: optional string status;
6: optional i32 statusCode;
7: optional string errMessage;
8: optional list<binary> queryResultBufList;
}

struct TUpdateExportTaskStatusRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,15 +221,19 @@ class Suite implements GroovyInterceptable {

def user = context.config.jdbcUser
def password = context.config.jdbcPassword
def masterFe = cluster.getMasterFe()
for (def i=0; masterFe == null && i<30; i++) {
masterFe = cluster.getMasterFe()
Frontend fe = null
for (def i=0; fe == null && i<30; i++) {
if (options.connectToFollower) {
fe = cluster.getOneFollowerFe()
} else {
fe = cluster.getMasterFe()
}
Thread.sleep(1000)
}
assertNotNull(masterFe)
assertNotNull(fe)
def url = String.format(
"jdbc:mysql://%s:%s/?useLocalSessionState=false&allowLoadLocalInfile=false",
masterFe.host, masterFe.queryPort)
fe.host, fe.queryPort)
def conn = DriverManager.getConnection(url, user, password)
def sql = "CREATE DATABASE IF NOT EXISTS " + context.dbName
logger.info("try create database if not exists {}", context.dbName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class ClusterOptions {
int beNum = 3
List<String> feConfigs = ['heartbeat_interval_second=5']
List<String> beConfigs = []
boolean connectToFollower = false

// each be disks, a disks format is: disk_type=disk_num[,disk_capacity]
// here disk_type=HDD or SSD, disk capacity is in gb unit.
Expand Down Expand Up @@ -242,6 +243,10 @@ class SuiteCluster {
return getFrontends().stream().filter(fe -> fe.isMaster).findFirst().orElse(null)
}

Frontend getOneFollowerFe() {
return getFrontends().stream().filter(fe -> !fe.isMaster).findFirst().orElse(null)
}

Frontend getFeByIndex(int index) {
return getFrontends().stream().filter(fe -> fe.index == index).findFirst().orElse(null)
}
Expand Down
52 changes: 52 additions & 0 deletions regression-test/suites/query_p0/test_forward_qeury.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.


import org.apache.doris.regression.suite.ClusterOptions
import org.apache.doris.regression.util.NodeType

suite("test_forward_query") {
def options = new ClusterOptions()
options.enableDebugPoints()
options.setFeNum(2)
options.connectToFollower = true

docker(options) {
def tbl = "test_forward_query"
sql """ DROP TABLE IF EXISTS ${tbl} """
sql """
CREATE TABLE ${tbl}
(
k1 int
)
DISTRIBUTED BY HASH(`k1`) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
);
"""

sql """ INSERT INTO ${tbl} VALUES(1);"""

cluster.injectDebugPoints(NodeType.FE, ['StmtExecutor.forward_all_queries' : [forwardAllQueries:true]])

try {
sql """ SELECT * FROM ${tbl} """
} catch (Exception ignored) {
assertTrue(false)
}
}
}

0 comments on commit a8856f9

Please sign in to comment.