From a0b4e8133862db11d9dac188757ab8e8b1e2d80b Mon Sep 17 00:00:00 2001 From: Yongqiang YANG Date: Wed, 27 Mar 2024 20:14:33 +0800 Subject: [PATCH] (cloud) mysqlload works with cluster and refactor code --- be/src/http/action/stream_load.cpp | 4 + be/src/http/http_common.h | 1 + .../org/apache/doris/catalog/DatabaseIf.java | 4 +- .../cloud/planner/CloudStreamLoadPlanner.java | 12 + .../apache/doris/httpv2/rest/LoadAction.java | 19 +- .../apache/doris/load/StreamLoadHandler.java | 278 +++++++++++++ .../doris/load/loadv2/MysqlLoadManager.java | 22 +- .../doris/service/FrontendServiceImpl.java | 368 +++--------------- .../apache/doris/catalog/DatabaseTest.java | 6 +- 9 files changed, 360 insertions(+), 354 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 67a063e1c32b3c..62c1f3ddf0ecaf 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -628,6 +628,10 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, } } + if (!http_req->header(HTTP_CLOUD_CLUSTER).empty()) { + request.__set_cloud_cluster(http_req->header(HTTP_CLOUD_CLUSTER)); + } + #ifndef BE_TEST // plan this load TNetworkAddress master_addr = _exec_env->master_info()->network_address; diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h index 5a1550f48fcefc..57bbd0642e6e96 100644 --- a/be/src/http/http_common.h +++ b/be/src/http/http_common.h @@ -66,5 +66,6 @@ static const std::string HTTP_MEMTABLE_ON_SINKNODE = "memtable_on_sink_node"; static const std::string HTTP_WAL_ID_KY = "wal_id"; static const std::string HTTP_AUTH_CODE = "auth_code"; static const std::string HTTP_GROUP_COMMIT = "group_commit"; +static const std::string HTTP_CLOUD_CLUSTER = "cloud_cluster"; } // namespace doris diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java index 6c0d46ffe98824..30477da3e0c4c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java @@ -149,12 +149,12 @@ default T getTableOrException(long tableId, Function new MetaNotFoundException("unknown table, tableName=" + t, + return getTableOrException(tableName, t -> new MetaNotFoundException("table not found, tableName=" + t, ErrorCode.ERR_BAD_TABLE_ERROR)); } default T getTableOrMetaException(long tableId) throws MetaNotFoundException { - return getTableOrException(tableId, t -> new MetaNotFoundException("unknown table, tableId=" + t, + return getTableOrException(tableId, t -> new MetaNotFoundException("table not found, tableId=" + t, ErrorCode.ERR_BAD_TABLE_ERROR)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/planner/CloudStreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/planner/CloudStreamLoadPlanner.java index c7b498ae0fc5f4..d088275a171b70 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/planner/CloudStreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/planner/CloudStreamLoadPlanner.java @@ -25,6 +25,7 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.task.LoadTaskInfo; import org.apache.doris.thrift.TExecPlanFragmentParams; +import org.apache.doris.thrift.TPipelineFragmentParams; import org.apache.doris.thrift.TUniqueId; import org.apache.logging.log4j.LogManager; @@ -51,6 +52,7 @@ private AutoCloseConnectContext buildConnectContext() throws UserException { } } + @Override public TExecPlanFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdIndex) throws UserException { try (AutoCloseConnectContext r = buildConnectContext()) { return super.plan(loadId, fragmentInstanceIdIndex); @@ -58,4 +60,14 @@ public TExecPlanFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdInde throw e; } } + + @Override + public TPipelineFragmentParams planForPipeline(TUniqueId loadId, int fragmentInstanceIdIndex) + throws UserException { + try (AutoCloseConnectContext r = buildConnectContext()) { + return super.planForPipeline(loadId, fragmentInstanceIdIndex); + } catch (UserException e) { + throw e; + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index 2b37cf997d0d4b..387259c056f9ac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -20,7 +20,6 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Table; -import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; @@ -30,6 +29,7 @@ import org.apache.doris.httpv2.entity.ResponseEntityBuilder; import org.apache.doris.httpv2.entity.RestBaseResult; import org.apache.doris.httpv2.exception.UnauthorizedException; +import org.apache.doris.load.StreamLoadHandler; import org.apache.doris.mysql.privilege.Auth; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; @@ -55,11 +55,8 @@ import org.springframework.web.servlet.view.RedirectView; import java.net.URI; -import java.security.SecureRandom; import java.util.List; -import java.util.Random; import java.util.Set; -import java.util.stream.Collectors; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -394,19 +391,7 @@ private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit) throws L private TNetworkAddress selectCloudRedirectBackend(String clusterName, String reqHostStr, boolean groupCommit) throws LoadException { - List backends = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) - .getBackendsByClusterName(clusterName) - .stream().filter(be -> be.isAlive() && (!groupCommit || groupCommit && !be.isDecommissioned())) - .collect(Collectors.toList()); - - if (backends.isEmpty()) { - LOG.warn("No available backend for stream load redirect, cluster name {}", clusterName); - throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", cluster: " + clusterName); - } - - Random rand = new SecureRandom(); - int randomIndex = rand.nextInt(backends.size()); - Backend backend = backends.get(randomIndex); + Backend backend = StreamLoadHandler.selectBackend(clusterName, groupCommit); Pair publicHostPort = null; Pair privateHostPort = null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java new file mode 100644 index 00000000000000..ca87a657904e2a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java @@ -0,0 +1,278 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load; + +import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.TableIf.TableType; +import org.apache.doris.cloud.catalog.CloudEnv; +import org.apache.doris.cloud.planner.CloudStreamLoadPlanner; +import org.apache.doris.cloud.system.CloudSystemInfoService; +import org.apache.doris.cluster.ClusterNamespace; +import org.apache.doris.common.AuthenticationException; +import org.apache.doris.common.Config; +import org.apache.doris.common.LoadException; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.UserException; +import org.apache.doris.load.routineload.RoutineLoadJob; +import org.apache.doris.planner.StreamLoadPlanner; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.service.ExecuteEnv; +import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.task.StreamLoadTask; +import org.apache.doris.thrift.TExecPlanFragmentParams; +import org.apache.doris.thrift.TPipelineFragmentParams; +import org.apache.doris.thrift.TStreamLoadPutRequest; +import org.apache.doris.thrift.TStreamLoadPutResult; +import org.apache.doris.transaction.TransactionState; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.security.SecureRandom; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +public class StreamLoadHandler { + private static final Logger LOG = LogManager.getLogger(StreamLoadHandler.class); + + private TStreamLoadPutRequest request; + private Boolean isMultiTableRequest; + private AtomicInteger multiTableFragmentInstanceIdIndex; + private TStreamLoadPutResult result; + private String clientAddr; + private Database db; + private List tables = Lists.newArrayList(); + private long timeoutMs; + private List fragmentParams = Lists.newArrayList(); + + public StreamLoadHandler(TStreamLoadPutRequest request, AtomicInteger indexId, + TStreamLoadPutResult result, String clientAddr) { + this.request = request; + this.timeoutMs = request.isSetThriftRpcTimeoutMs() ? request.getThriftRpcTimeoutMs() : 5000; + this.isMultiTableRequest = indexId != null; + this.multiTableFragmentInstanceIdIndex = indexId; + this.result = result; + this.clientAddr = clientAddr; + } + + public static Backend selectBackend(String clusterName, boolean groupCommit) throws LoadException { + List backends = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) + .getBackendsByClusterName(clusterName) + .stream().filter(be -> be.isAlive() && (!groupCommit || groupCommit && !be.isDecommissioned())) + .collect(Collectors.toList()); + + if (backends.isEmpty()) { + LOG.warn("No available backend for stream load redirect, cluster name {}", clusterName); + throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", cluster: " + clusterName); + } + + // TODO: add a more sophisticated algorithm to select backend + SecureRandom rand = new SecureRandom(); + int randomIndex = rand.nextInt(backends.size()); + Backend backend = backends.get(randomIndex); + return backend; + } + + public void setCloudCluster() throws UserException { + if (ConnectContext.get() != null) { + return; + } + + LOG.info("streamload put request: {}", request); + // create connect context + ConnectContext ctx = new ConnectContext(); + ctx.setEnv(Env.getCurrentEnv()); + ctx.setQueryId(request.getLoadId()); + ctx.setCurrentUserIdentity(UserIdentity.createAnalyzedUserIdentWithIp(request.getUser(), "%")); + ctx.setQualifiedUser(request.getUser()); + ctx.setBackendId(request.getBackendId()); + ctx.setThreadLocalInfo(); + + if (!Config.isCloudMode()) { + return; + } + + if (request.isSetAuthCode()) { + long backendId = request.getBackendId(); + Backend backend = Env.getCurrentSystemInfo().getBackend(backendId); + Preconditions.checkNotNull(backend); + ctx.setCloudCluster(backend.getCloudClusterName()); + ctx.setRemoteIP(clientAddr); + LOG.info("streamLoadPutImpl set context: cluster {}", ctx.getCloudCluster()); + } else { + ctx.setRemoteIP(request.getUserIp()); + String userName = ClusterNamespace.getNameFromFullName(request.getUser()); + if (!Strings.isNullOrEmpty(userName)) { + List currentUser = Lists.newArrayList(); + try { + Env.getCurrentEnv().getAuth().checkPlainPassword(userName, + request.getUserIp(), request.getPasswd(), currentUser); + } catch (AuthenticationException e) { + throw new UserException(e.formatErrMsg()); + } + Preconditions.checkState(currentUser.size() == 1); + ctx.setCurrentUserIdentity(currentUser.get(0)); + } + LOG.info("request user: {}, remote ip: {}, user ip: {}, passwd: {}, cluster: {}", + request.getUser(), clientAddr, request.getUserIp(), request.getPasswd(), + request.getCloudCluster()); + if (!Strings.isNullOrEmpty(request.getCloudCluster())) { + if (Strings.isNullOrEmpty(request.getUser())) { + // mysql load + ctx.setCloudCluster(request.getCloudCluster()); + } else { + // stream load + ((CloudEnv) Env.getCurrentEnv()).changeCloudCluster(request.getCloudCluster(), ctx); + } + } + } + } + + private void setDbAndTable() throws UserException, MetaNotFoundException { + Env env = Env.getCurrentEnv(); + String fullDbName = request.getDb(); + db = env.getInternalCatalog().getDbNullable(fullDbName); + if (db == null) { + String dbName = fullDbName; + if (Strings.isNullOrEmpty(request.getCluster())) { + dbName = request.getDb(); + } + throw new UserException("unknown database, database=" + dbName); + } + + List tableNames = null; + if (isMultiTableRequest) { + tableNames = request.getTableNames(); + } else { + tableNames = Lists.newArrayList(); + tableNames.add(request.getTbl()); + } + + for (String tableName : tableNames) { + Table table = db.getTableOrMetaException(tableName, TableType.OLAP); + if (!((OlapTable) table).getTableProperty().getUseSchemaLightChange() + && (request.getGroupCommitMode() != null + && !request.getGroupCommitMode().equals("off_mode"))) { + throw new UserException( + "table light_schema_change is false, can't do stream load with group commit mode"); + } + tables.add((OlapTable) table); + } + + if (tables.isEmpty()) { + throw new MetaNotFoundException("table not found"); + } + + if (result != null) { + OlapTable olapTable = tables.get(0); + result.setDbId(db.getId()); + result.setTableId(olapTable.getId()); + result.setBaseSchemaVersion(olapTable.getBaseSchemaVersion()); + } + } + + /** + * For first-class multi-table scenarios, we should store the mapping between Txn and data source type in a common + * place. Since there is only Kafka now, we should do this first. + */ + private void buildMultiTableStreamLoadTask(StreamLoadTask baseTaskInfo, long txnId) { + try { + RoutineLoadJob routineLoadJob = Env.getCurrentEnv().getRoutineLoadManager() + .getRoutineLoadJobByMultiLoadTaskTxnId(txnId); + if (routineLoadJob == null) { + return; + } + baseTaskInfo.setMultiTableBaseTaskInfo(routineLoadJob); + } catch (Exception e) { + LOG.warn("failed to build multi table stream load task: {}", e.getMessage()); + } + } + + public void generatePlan(OlapTable table) throws UserException { + if (!table.tryReadLock(timeoutMs, TimeUnit.MILLISECONDS)) { + throw new UserException( + "get table read lock timeout, database=" + request.getDb() + ",table=" + table.getName()); + } + try { + StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request); + if (isMultiTableRequest) { + buildMultiTableStreamLoadTask(streamLoadTask, request.getTxnId()); + } + + StreamLoadPlanner planner = null; + if (Config.isCloudMode()) { + planner = new CloudStreamLoadPlanner(db, table, streamLoadTask, request.getCloudCluster()); + } else { + planner = new StreamLoadPlanner(db, table, streamLoadTask); + } + int index = multiTableFragmentInstanceIdIndex != null + ? multiTableFragmentInstanceIdIndex.getAndIncrement() : 0; + if (Config.enable_pipeline_load) { + TPipelineFragmentParams pipeResult = null; + pipeResult = planner.planForPipeline(streamLoadTask.getId(), index); + pipeResult.setTableName(table.getName()); + pipeResult.query_options.setFeProcessUuid(ExecuteEnv.getInstance().getProcessUUID()); + fragmentParams.add(pipeResult); + } else { + TExecPlanFragmentParams result = null; + result = planner.plan(streamLoadTask.getId(), index); + result.setTableName(table.getName()); + result.query_options.setFeProcessUuid(ExecuteEnv.getInstance().getProcessUUID()); + fragmentParams.add(result); + } + + if (StringUtils.isEmpty(streamLoadTask.getGroupCommit())) { + // add table indexes to transaction state + TransactionState txnState = Env.getCurrentGlobalTransactionMgr() + .getTransactionState(db.getId(), request.getTxnId()); + if (txnState == null) { + throw new UserException("txn does not exist: " + request.getTxnId()); + } + txnState.addTableIndexes(table); + if (request.isPartialUpdate()) { + txnState.setSchemaForPartialUpdate(table); + } + } + } finally { + table.readUnlock(); + } + } + + public void generatePlan() throws UserException, MetaNotFoundException { + setCloudCluster(); + setDbAndTable(); + for (OlapTable table : tables) { + generatePlan(table); + } + } + + public List getFragmentParams() { + return fragmentParams; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java index adbfc27fca22a2..7b156aeff8fe93 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java @@ -32,6 +32,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.io.ByteBufferNetworkInputStream; import org.apache.doris.load.LoadJobRowResult; +import org.apache.doris.load.StreamLoadHandler; import org.apache.doris.mysql.MysqlSerializer; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.SessionVariable; @@ -444,16 +445,21 @@ public HttpPut generateRequestForMySqlLoad( } private String selectBackendForMySqlLoad(String database, String table) throws LoadException { - BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().build(); - List backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1); - if (backendIds.isEmpty()) { - throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy); + Backend backend = null; + if (Config.isCloudMode()) { + backend = StreamLoadHandler.selectBackend(ConnectContext.get().getCloudCluster(), false); + } else { + BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().build(); + List backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1); + if (backendIds.isEmpty()) { + throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy); + } + backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0)); + if (backend == null) { + throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy); + } } - Backend backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0)); - if (backend == null) { - throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy); - } StringBuilder sb = new StringBuilder(); sb.append("http://"); sb.append(backend.getHost()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 60ae26df757130..4dc88507bd8e59 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -44,9 +44,7 @@ import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.TabletMeta; -import org.apache.doris.cloud.catalog.CloudEnv; import org.apache.doris.cloud.catalog.CloudTablet; -import org.apache.doris.cloud.planner.CloudStreamLoadPlanner; import org.apache.doris.cloud.proto.Cloud.CommitTxnResponse; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; @@ -74,6 +72,7 @@ import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.insertoverwrite.InsertOverwriteManager; import org.apache.doris.insertoverwrite.InsertOverwriteUtil; +import org.apache.doris.load.StreamLoadHandler; import org.apache.doris.load.routineload.ErrorReason; import org.apache.doris.load.routineload.RoutineLoadJob; import org.apache.doris.load.routineload.RoutineLoadJob.JobState; @@ -82,7 +81,6 @@ import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.planner.OlapTableSink; -import org.apache.doris.planner.StreamLoadPlanner; import org.apache.doris.plsql.metastore.PlsqlPackage; import org.apache.doris.plsql.metastore.PlsqlProcedureKey; import org.apache.doris.plsql.metastore.PlsqlStoredProcedure; @@ -111,7 +109,6 @@ import org.apache.doris.system.Frontend; import org.apache.doris.system.SystemInfoService; import org.apache.doris.tablefunction.MetadataGenerator; -import org.apache.doris.task.StreamLoadTask; import org.apache.doris.thrift.FrontendService; import org.apache.doris.thrift.FrontendServiceVersion; import org.apache.doris.thrift.TAddPlsqlPackageRequest; @@ -272,6 +269,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -287,7 +285,8 @@ public class FrontendServiceImpl implements FrontendService.Iface { private ExecuteEnv exeEnv; // key is txn id,value is index of plan fragment instance, it's used by multi // table request plan - private ConcurrentHashMap multiTableFragmentInstanceIdIndexMap = new ConcurrentHashMap<>(64); + private ConcurrentHashMap multiTableFragmentInstanceIdIndexMap = + new ConcurrentHashMap<>(64); private final Map proxyQueryIdToConnCtx = new ConcurrentHashMap<>(64); @@ -1901,41 +1900,43 @@ public TStreamLoadPutResult streamLoadPut(TStreamLoadPutRequest request) { TStatus status = new TStatus(TStatusCode.OK); result.setStatus(status); - // create connect context - ConnectContext ctx = new ConnectContext(); - ctx.setEnv(Env.getCurrentEnv()); - ctx.setQueryId(request.getLoadId()); - ctx.setCurrentUserIdentity(UserIdentity.createAnalyzedUserIdentWithIp(request.getUser(), "%")); - ctx.setQualifiedUser(request.getUser()); - ctx.setBackendId(request.getBackendId()); - ctx.setThreadLocalInfo(); + StreamLoadHandler streamLoadHandler = new StreamLoadHandler(request, null, result, clientAddr); try { + streamLoadHandler.setCloudCluster(); + List tWorkloadGroupList = null; // mysql load request not carry user info, need fix it later. - boolean hasUserName = !StringUtils.isEmpty(ctx.getQualifiedUser()); + boolean hasUserName = !StringUtils.isEmpty(request.getUser()); if (Config.enable_workload_group && hasUserName) { + ConnectContext ctx = ConnectContext.get(); tWorkloadGroupList = Env.getCurrentEnv().getWorkloadGroupMgr().getWorkloadGroup(ctx); } if (!Strings.isNullOrEmpty(request.getLoadSql())) { - httpStreamPutImpl(request, result, ctx); + httpStreamPutImpl(request, result); if (tWorkloadGroupList != null && tWorkloadGroupList.size() > 0) { result.params.setWorkloadGroups(tWorkloadGroupList); } return result; } else { + streamLoadHandler.generatePlan(); if (Config.enable_pipeline_load) { - result.setPipelineParams(pipelineStreamLoadPutImpl(request)); - if (tWorkloadGroupList != null && tWorkloadGroupList.size() > 0) { - result.pipeline_params.setWorkloadGroups(tWorkloadGroupList); - } + result.setPipelineParams((TPipelineFragmentParams) streamLoadHandler.getFragmentParams().get(0)); } else { - result.setParams(streamLoadPutImpl(request, result)); - if (tWorkloadGroupList != null && tWorkloadGroupList.size() > 0) { - result.params.setWorkloadGroups(tWorkloadGroupList); - } + result.setParams((TExecPlanFragmentParams) streamLoadHandler.getFragmentParams().get(0)); + } + } + if (tWorkloadGroupList != null && tWorkloadGroupList.size() > 0) { + if (Config.enable_pipeline_load) { + result.pipeline_params.setWorkloadGroups(tWorkloadGroupList); + } else { + result.params.setWorkloadGroups(tWorkloadGroupList); } } + } catch (MetaNotFoundException e) { + LOG.warn("failed to rollback txn, id: {}, label: {}", request.getTxnId(), request.getLabel(), e); + status.setStatusCode(TStatusCode.NOT_FOUND); + status.addToErrorMsgs(e.getMessage()); } catch (UserException e) { LOG.warn("failed to get stream load plan: {}", e.getMessage()); status.setStatusCode(TStatusCode.ANALYSIS_ERROR); @@ -1951,24 +1952,6 @@ public TStreamLoadPutResult streamLoadPut(TStreamLoadPutRequest request) { return result; } - /** - * For first-class multi-table scenarios, we should store the mapping between - * Txn and data source type in a common - * place. Since there is only Kafka now, we should do this first. - */ - private void buildMultiTableStreamLoadTask(StreamLoadTask baseTaskInfo, long txnId) { - try { - RoutineLoadJob routineLoadJob = Env.getCurrentEnv().getRoutineLoadManager() - .getRoutineLoadJobByMultiLoadTaskTxnId(txnId); - if (routineLoadJob == null) { - return; - } - baseTaskInfo.setMultiTableBaseTaskInfo(routineLoadJob); - } catch (Exception e) { - LOG.warn("failed to build multi table stream load task: {}", e.getMessage()); - } - } - private void deleteMultiTableStreamLoadJobIndex(long txnId) { try { Env.getCurrentEnv().getRoutineLoadManager().removeMultiLoadTaskTxnIdToRoutineLoadJobId(txnId); @@ -1979,78 +1962,22 @@ private void deleteMultiTableStreamLoadJobIndex(long txnId) { @Override public TStreamLoadMultiTablePutResult streamLoadMultiTablePut(TStreamLoadPutRequest request) { - List olapTables; - Database db; - String fullDbName; TStreamLoadMultiTablePutResult result = new TStreamLoadMultiTablePutResult(); TStatus status = new TStatus(TStatusCode.OK); result.setStatus(status); - List tableNames = request.getTableNames(); - - if (Config.isCloudMode()) { - try { - ConnectContext ctx = new ConnectContext(); - ctx.setThreadLocalInfo(); - ctx.setQualifiedUser(request.getUser()); - ctx.setRemoteIP(request.getUserIp()); - String userName = ClusterNamespace.getNameFromFullName(request.getUser()); - if (userName != null) { - List currentUser = Lists.newArrayList(); - try { - Env.getCurrentEnv().getAuth().checkPlainPassword(userName, - request.getUserIp(), request.getPasswd(), currentUser); - } catch (AuthenticationException e) { - throw new UserException(e.formatErrMsg()); - } - Preconditions.checkState(currentUser.size() == 1); - ctx.setCurrentUserIdentity(currentUser.get(0)); - } - LOG.info("one stream multi table load use cloud cluster {}", request.getCloudCluster()); - // ctx.setCloudCluster(); - if (!Strings.isNullOrEmpty(request.getCloudCluster())) { - if (Strings.isNullOrEmpty(request.getUser())) { - ctx.setCloudCluster(request.getCloudCluster()); - } else { - ((CloudEnv) Env.getCurrentEnv()).changeCloudCluster(request.getCloudCluster(), ctx); - } - } - } catch (UserException e) { - LOG.warn("failed to set ConnectContext info: {}", e.getMessage()); - status.setStatusCode(TStatusCode.ANALYSIS_ERROR); - status.addToErrorMsgs(e.getMessage()); - } - } + List planFragmentParamsList = new ArrayList<>(); + multiTableFragmentInstanceIdIndexMap.putIfAbsent(request.getTxnId(), new AtomicInteger(1)); + AtomicInteger index = multiTableFragmentInstanceIdIndexMap.get(request.getTxnId()); + StreamLoadHandler streamLoadHandler = new StreamLoadHandler(request, index, null, + getClientAddrAsString()); try { - if (CollectionUtils.isEmpty(tableNames)) { - throw new MetaNotFoundException("table not found"); - } - fullDbName = request.getDb(); - db = Env.getCurrentEnv().getInternalCatalog().getDbNullable(fullDbName); - if (db == null) { - String dbName = fullDbName; - if (Strings.isNullOrEmpty(request.getCluster())) { - dbName = request.getDb(); - } - throw new UserException("unknown database, database=" + dbName); - } - // todo Whether there will be a large amount of data risk - List tables = db.getTablesOrEmpty(); - if (CollectionUtils.isEmpty(tables)) { - throw new MetaNotFoundException("table not found"); - } - olapTables = new ArrayList<>(tableNames.size()); - Map olapTableMap = tables.stream() - .filter(OlapTable.class::isInstance) - .map(OlapTable.class::cast) - .collect(Collectors.toMap(OlapTable::getName, olapTable -> olapTable)); - for (String tableName : tableNames) { - if (null == olapTableMap.get(tableName)) { - throw new MetaNotFoundException("table not found, table name is " + tableName); - } - olapTables.add(olapTableMap.get(tableName)); + streamLoadHandler.generatePlan(); + planFragmentParamsList.addAll(streamLoadHandler.getFragmentParams()); + if (LOG.isDebugEnabled()) { + LOG.debug("receive stream load multi table put request result: {}", result); } - } catch (UserException exception) { + } catch (UserException exception) { LOG.warn("failed to get stream load plan: {}", exception.getMessage()); status = new TStatus(TStatusCode.ANALYSIS_ERROR); status.addToErrorMsgs(exception.getMessage()); @@ -2059,45 +1986,25 @@ public TStreamLoadMultiTablePutResult streamLoadMultiTablePut(TStreamLoadPutRequ RoutineLoadJob routineLoadJob = Env.getCurrentEnv().getRoutineLoadManager() .getRoutineLoadJobByMultiLoadTaskTxnId(request.getTxnId()); routineLoadJob.updateState(JobState.PAUSED, new ErrorReason(InternalErrorCode.CANNOT_RESUME_ERR, - "failed to get stream load plan, " + exception.getMessage()), false); - } catch (UserException e) { + "failed to get stream load plan, " + exception.getMessage()), false); + } catch (Throwable e) { LOG.warn("catch update routine load job error.", e); } return result; - } - long timeoutMs = request.isSetThriftRpcTimeoutMs() ? request.getThriftRpcTimeoutMs() : 5000; - List planFragmentParamsList = new ArrayList<>(tableNames.size()); - // todo: if is multi table, we need consider the lock time and the timeout - boolean enablePipelineLoad = Config.enable_pipeline_load; - try { - multiTableFragmentInstanceIdIndexMap.putIfAbsent(request.getTxnId(), 1); - for (OlapTable table : olapTables) { - int index = multiTableFragmentInstanceIdIndexMap.get(request.getTxnId()); - if (enablePipelineLoad) { - planFragmentParamsList.add(generatePipelineStreamLoadPut(request, db, fullDbName, table, timeoutMs, - index, true)); - } else { - TExecPlanFragmentParams planFragmentParams = generatePlanFragmentParams(request, db, fullDbName, - table, timeoutMs, index, true); - - planFragmentParamsList.add(planFragmentParams); - } - multiTableFragmentInstanceIdIndexMap.put(request.getTxnId(), ++index); - } - if (LOG.isDebugEnabled()) { - LOG.debug("receive stream load multi table put request result: {}", result); - } } catch (Throwable e) { LOG.warn("catch unknown result.", e); status.setStatusCode(TStatusCode.INTERNAL_ERROR); status.addToErrorMsgs(e.getClass().getSimpleName() + ": " + Strings.nullToEmpty(e.getMessage())); return result; } - if (enablePipelineLoad) { + if (Config.enable_pipeline_load) { result.setPipelineParams(planFragmentParamsList); + LOG.info("receive stream load multi table put request result: {}", result); return result; } result.setParams(planFragmentParamsList); + LOG.info("receive stream load multi table put request result: {}", result); + return result; } @@ -2132,11 +2039,14 @@ private HttpStreamParams initHttpStreamPlan(TStreamLoadPutRequest request, Conne return httpStreamParams; } - private void httpStreamPutImpl(TStreamLoadPutRequest request, TStreamLoadPutResult result, ConnectContext ctx) + private void httpStreamPutImpl(TStreamLoadPutRequest request, TStreamLoadPutResult result) throws UserException { if (LOG.isDebugEnabled()) { LOG.debug("receive http stream put request: {}", request); } + + ConnectContext ctx = ConnectContext.get(); + if (request.isSetAuthCode()) { // TODO(cmy): find a way to check } else if (Strings.isNullOrEmpty(request.getToken())) { @@ -2179,196 +2089,6 @@ private void httpStreamPutImpl(TStreamLoadPutRequest request, TStreamLoadPutResu } } - private TExecPlanFragmentParams streamLoadPutImpl(TStreamLoadPutRequest request, TStreamLoadPutResult result) - throws UserException { - if (request.isSetAuthCode()) { - String clientAddr = getClientAddrAsString(); - ConnectContext ctx = new ConnectContext(); - ctx.setThreadLocalInfo(); - ctx.setRemoteIP(clientAddr); - long backendId = request.getBackendId(); - Backend backend = Env.getCurrentSystemInfo().getBackend(backendId); - Preconditions.checkNotNull(backend); - ctx.setCloudCluster(backend.getCloudClusterName()); - LOG.info("streamLoadPutImpl set context: cluster {}", ctx.getCloudCluster()); - } else if (Config.isCloudMode()) { - ConnectContext ctx = new ConnectContext(); - ctx.setThreadLocalInfo(); - ctx.setQualifiedUser(request.getUser()); - ctx.setRemoteIP(request.getUserIp()); - String userName = ClusterNamespace.getNameFromFullName(request.getUser()); - if (userName != null) { - List currentUser = Lists.newArrayList(); - try { - Env.getCurrentEnv().getAuth().checkPlainPassword(userName, - request.getUserIp(), request.getPasswd(), currentUser); - } catch (AuthenticationException e) { - throw new UserException(e.formatErrMsg()); - } - Preconditions.checkState(currentUser.size() == 1); - ctx.setCurrentUserIdentity(currentUser.get(0)); - } - - LOG.info("stream load use cloud cluster {}", request.getCloudCluster()); - if (!Strings.isNullOrEmpty(request.getCloudCluster())) { - if (Strings.isNullOrEmpty(request.getUser())) { - // mysql load - ctx.setCloudCluster(request.getCloudCluster()); - } else { - // stream load - ((CloudEnv) Env.getCurrentEnv()).changeCloudCluster(request.getCloudCluster(), ctx); - } - } - - LOG.debug("streamLoadPutImpl set context: cluster {}, setCurrentUserIdentity {}", - ctx.getCloudCluster(), ctx.getCurrentUserIdentity()); - } - - Env env = Env.getCurrentEnv(); - String fullDbName = request.getDb(); - Database db = env.getInternalCatalog().getDbNullable(fullDbName); - if (db == null) { - String dbName = fullDbName; - if (Strings.isNullOrEmpty(request.getCluster())) { - dbName = request.getDb(); - } - throw new UserException("unknown database, database=" + dbName); - } - long timeoutMs = request.isSetThriftRpcTimeoutMs() ? request.getThriftRpcTimeoutMs() : 5000; - Table table = db.getTableOrMetaException(request.getTbl(), TableType.OLAP); - if (!table.tryReadLock(timeoutMs, TimeUnit.MILLISECONDS)) { - throw new UserException( - "get table read lock timeout, database=" + fullDbName + ",table=" + table.getName()); - } - try { - if (!((OlapTable) table).getTableProperty().getUseSchemaLightChange() - && (request.getGroupCommitMode() != null - && !request.getGroupCommitMode().equals("off_mode"))) { - throw new UserException( - "table light_schema_change is false, can't do stream load with group commit mode"); - } - result.setDbId(db.getId()); - result.setTableId(table.getId()); - result.setBaseSchemaVersion(((OlapTable) table).getBaseSchemaVersion()); - return generatePlanFragmentParams(request, db, fullDbName, (OlapTable) table, timeoutMs); - } finally { - table.readUnlock(); - } - } - - private TExecPlanFragmentParams generatePlanFragmentParams(TStreamLoadPutRequest request, Database db, - String fullDbName, OlapTable table, - long timeoutMs) throws UserException { - return generatePlanFragmentParams(request, db, fullDbName, table, timeoutMs, 1, false); - } - - private TExecPlanFragmentParams generatePlanFragmentParams(TStreamLoadPutRequest request, Database db, - String fullDbName, OlapTable table, - long timeoutMs, int multiTableFragmentInstanceIdIndex, - boolean isMultiTableRequest) - throws UserException { - if (!table.tryReadLock(timeoutMs, TimeUnit.MILLISECONDS)) { - throw new UserException( - "get table read lock timeout, database=" + fullDbName + ",table=" + table.getName()); - } - try { - StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request); - if (isMultiTableRequest) { - buildMultiTableStreamLoadTask(streamLoadTask, request.getTxnId()); - } - StreamLoadPlanner planner = new StreamLoadPlanner(db, table, streamLoadTask); - TExecPlanFragmentParams plan = planner.plan(streamLoadTask.getId(), multiTableFragmentInstanceIdIndex); - if (StringUtils.isEmpty(streamLoadTask.getGroupCommit())) { - // add table indexes to transaction state - TransactionState txnState = Env.getCurrentGlobalTransactionMgr() - .getTransactionState(db.getId(), request.getTxnId()); - if (txnState == null) { - throw new UserException("txn does not exist: " + request.getTxnId()); - } - txnState.addTableIndexes(table); - if (request.isPartialUpdate()) { - txnState.setSchemaForPartialUpdate(table); - } - } - plan.setTableName(table.getName()); - plan.query_options.setFeProcessUuid(ExecuteEnv.getInstance().getProcessUUID()); - return plan; - } finally { - table.readUnlock(); - } - } - - private TPipelineFragmentParams pipelineStreamLoadPutImpl(TStreamLoadPutRequest request) throws UserException { - Env env = Env.getCurrentEnv(); - String fullDbName = request.getDb(); - Database db = env.getInternalCatalog().getDbNullable(fullDbName); - if (db == null) { - String dbName = fullDbName; - if (Strings.isNullOrEmpty(request.getCluster())) { - dbName = request.getDb(); - } - throw new UserException("unknown database, database=" + dbName); - } - long timeoutMs = request.isSetThriftRpcTimeoutMs() ? request.getThriftRpcTimeoutMs() : 5000; - Table table = db.getTableOrMetaException(request.getTbl(), TableType.OLAP); - if (!((OlapTable) table).getTableProperty().getUseSchemaLightChange() && (request.getGroupCommitMode() != null - && !request.getGroupCommitMode().equals("off_mode"))) { - throw new UserException("table light_schema_change is false, can't do stream load with group commit mode"); - } - return this.generatePipelineStreamLoadPut(request, db, fullDbName, (OlapTable) table, timeoutMs, - 1, false); - } - - private TPipelineFragmentParams generatePipelineStreamLoadPut(TStreamLoadPutRequest request, Database db, - String fullDbName, OlapTable table, - long timeoutMs, - int multiTableFragmentInstanceIdIndex, - boolean isMultiTableRequest) - throws UserException { - if (db == null) { - String dbName = fullDbName; - if (Strings.isNullOrEmpty(request.getCluster())) { - dbName = request.getDb(); - } - throw new UserException("unknown database, database=" + dbName); - } - if (!table.tryReadLock(timeoutMs, TimeUnit.MILLISECONDS)) { - throw new UserException( - "get table read lock timeout, database=" + fullDbName + ",table=" + table.getName()); - } - try { - StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request); - if (isMultiTableRequest) { - buildMultiTableStreamLoadTask(streamLoadTask, request.getTxnId()); - } - - StreamLoadPlanner planner = null; - if (Config.isCloudMode()) { - planner = new CloudStreamLoadPlanner(db, table, streamLoadTask, request.getCloudCluster()); - } else { - planner = new StreamLoadPlanner(db, table, streamLoadTask); - } - - TPipelineFragmentParams plan = planner.planForPipeline(streamLoadTask.getId(), - multiTableFragmentInstanceIdIndex); - if (StringUtils.isEmpty(streamLoadTask.getGroupCommit())) { - // add table indexes to transaction state - TransactionState txnState = Env.getCurrentGlobalTransactionMgr() - .getTransactionState(db.getId(), request.getTxnId()); - if (txnState == null) { - throw new UserException("txn does not exist: " + request.getTxnId()); - } - txnState.addTableIndexes(table); - if (request.isPartialUpdate()) { - txnState.setSchemaForPartialUpdate(table); - } - } - return plan; - } finally { - table.readUnlock(); - } - } - @Override public TStatus snapshotLoaderReport(TSnapshotLoaderReportRequest request) throws TException { if (Env.getCurrentEnv().getBackupHandler().report(request.getTaskType(), request.getJobId(), diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/DatabaseTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/DatabaseTest.java index 07ce2f9229b8de..b5ee8331c59541 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/DatabaseTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/DatabaseTest.java @@ -129,7 +129,7 @@ public void getTablesOnIdOrderOrThrowExceptionTest() throws MetaNotFoundExceptio Assert.assertEquals(2, tableList.size()); Assert.assertEquals(2000L, tableList.get(0).getId()); Assert.assertEquals(2001L, tableList.get(1).getId()); - ExceptionChecker.expectThrowsWithMsg(MetaNotFoundException.class, "unknown table, tableId=3000", + ExceptionChecker.expectThrowsWithMsg(MetaNotFoundException.class, "table not found, tableId=3000", () -> db.getTablesOnIdOrderOrThrowException(Lists.newArrayList(3000L))); } @@ -143,9 +143,9 @@ public void getTableOrThrowExceptionTest() throws MetaNotFoundException { Table resultTable2 = db.getTableOrMetaException("baseTable", Table.TableType.OLAP); Assert.assertEquals(table, resultTable1); Assert.assertEquals(table, resultTable2); - ExceptionChecker.expectThrowsWithMsg(MetaNotFoundException.class, "unknown table, tableId=3000", + ExceptionChecker.expectThrowsWithMsg(MetaNotFoundException.class, "table not found, tableId=3000", () -> db.getTableOrMetaException(3000L, Table.TableType.OLAP)); - ExceptionChecker.expectThrowsWithMsg(MetaNotFoundException.class, "unknown table, tableName=baseTable1", + ExceptionChecker.expectThrowsWithMsg(MetaNotFoundException.class, "table not found, tableName=baseTable1", () -> db.getTableOrMetaException("baseTable1", Table.TableType.OLAP)); ExceptionChecker.expectThrowsWithMsg(MetaNotFoundException.class, "table type is not BROKER, tableId=2000, type=OLAP",