From 9907dd4a2f5ac3027ddac58f2298b2a7898ef39d Mon Sep 17 00:00:00 2001 From: Tang Siyang Date: Fri, 30 Jun 2023 23:30:15 +0800 Subject: [PATCH] fix reviewed problems --- docs/en/docs/admin-manual/config/fe-config.md | 2 +- .../docs/admin-manual/config/fe-config.md | 2 +- .../java/org/apache/doris/common/Config.java | 6 -- .../org/apache/doris/analysis/SyncStmt.java | 2 +- .../java/org/apache/doris/qe/DdlExecutor.java | 3 + .../org/apache/doris/qe/MasterOpExecutor.java | 89 ++++++++++++------- .../org/apache/doris/qe/StmtExecutor.java | 25 +----- .../doris/service/FrontendServiceImpl.java | 13 ++- gensrc/thrift/FrontendService.thrift | 7 +- 9 files changed, 71 insertions(+), 78 deletions(-) diff --git a/docs/en/docs/admin-manual/config/fe-config.md b/docs/en/docs/admin-manual/config/fe-config.md index 8475c7f39cee78..ad55044db52982 100644 --- a/docs/en/docs/admin-manual/config/fe-config.md +++ b/docs/en/docs/admin-manual/config/fe-config.md @@ -1010,7 +1010,7 @@ This variable is a session variable, and the session level takes effect. Default: false -This variable is both a session variable and global config option. +This variable is a session variable, and the session level takes effect. - Type: boolean - Description: To enable strong consistency reading. In scenarios where strong consistency is required, this variable can be set to true to make loaded data immediately visible to queries. diff --git a/docs/zh-CN/docs/admin-manual/config/fe-config.md b/docs/zh-CN/docs/admin-manual/config/fe-config.md index 30ae2b87673243..39dd8f9a628cf9 100644 --- a/docs/zh-CN/docs/admin-manual/config/fe-config.md +++ b/docs/zh-CN/docs/admin-manual/config/fe-config.md @@ -1010,7 +1010,7 @@ colocate join PlanFragment instance 的 memory_limit = exec_mem_limit / min (que 默认值:false -该变量可以作为全局配置项和session variable生效。 +该变量为 session variable,session 级别生效。 - 类型:boolean - 描述:用以开启强一致性读。在强一致性的需求场景下可将该将该变量设置为true,使导入数据在查询时实时可见。 diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 35cf796b500ebe..fd4e19f574dabf 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2023,10 +2023,4 @@ public class Config extends ConfigBase { "是否禁止使用 WITH REOSOURCE 语句创建 Catalog。", "Whether to disable creating catalog with WITH RESOURCE statement."}) public static boolean disallow_create_catalog_with_resource = true; - - @ConfField(description = { - "是否开启强一致性读。", - "Whether to enable strong consistency read." - }) - public static boolean enable_strong_consistency_read = false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SyncStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SyncStmt.java index 5e8fe00c9a539f..59b9d3cd8af82a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SyncStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SyncStmt.java @@ -27,6 +27,6 @@ public void analyze(Analyzer analyzer) throws UserException { @Override public RedirectStatus getRedirectStatus() { - return RedirectStatus.NO_FORWARD; + return RedirectStatus.FORWARD_WITH_SYNC; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java index 452556c705dc9a..15b7bcc883ab67 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -110,6 +110,7 @@ import org.apache.doris.analysis.SetUserPropertyStmt; import org.apache.doris.analysis.StopRoutineLoadStmt; import org.apache.doris.analysis.StopSyncJobStmt; +import org.apache.doris.analysis.SyncStmt; import org.apache.doris.analysis.TruncateTableStmt; import org.apache.doris.analysis.UninstallPluginStmt; import org.apache.doris.catalog.EncryptKeyHelper; @@ -229,6 +230,8 @@ public static void execute(Env env, DdlStmt ddlStmt) throws Exception { env.getBackupHandler().createRepository((CreateRepositoryStmt) ddlStmt); } else if (ddlStmt instanceof DropRepositoryStmt) { env.getBackupHandler().dropRepository((DropRepositoryStmt) ddlStmt); + } else if (ddlStmt instanceof SyncStmt) { + return; } else if (ddlStmt instanceof TruncateTableStmt) { env.truncateTable((TruncateTableStmt) ddlStmt); } else if (ddlStmt instanceof AdminRepairTableStmt) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java index 9bbbd598d6f7fe..e692e3b8532f19 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java @@ -55,6 +55,8 @@ public class MasterOpExecutor { private boolean shouldNotRetry; + private boolean isSyncJournalOnly; + public MasterOpExecutor(OriginStatement originStmt, ConnectContext ctx, RedirectStatus status, boolean isQuery) { this.originStmt = originStmt; this.ctx = ctx; @@ -66,6 +68,15 @@ public MasterOpExecutor(OriginStatement originStmt, ConnectContext ctx, Redirect this.thriftTimeoutMs = (int) (ctx.getExecTimeout() * 1000 * RPC_TIMEOUT_COEFFICIENT); // if isQuery=false, we shouldn't retry twice when catch exception because of Idempotency this.shouldNotRetry = !isQuery; + this.isSyncJournalOnly = false; + } + + /** + * used for simply syncing journal with master under strong consistency mode + */ + public MasterOpExecutor(ConnectContext ctx) { + this(null, ctx, RedirectStatus.FORWARD_WITH_SYNC, true); + this.isSyncJournalOnly = true; } public void execute() throws Exception { @@ -100,36 +111,12 @@ private void forward() throws Exception { // may throw NullPointerException. add err msg throw new Exception("Failed to get master client.", e); } - TMasterOpRequest params = new TMasterOpRequest(); - params.setCluster(ctx.getClusterName()); - params.setSql(originStmt.originStmt); - params.setStmtIdx(originStmt.idx); - params.setUser(ctx.getQualifiedUser()); - params.setDb(ctx.getDatabase()); - params.setUserIp(ctx.getRemoteIP()); - params.setStmtId(ctx.getStmtId()); - params.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift()); - - // query options - params.setQueryOptions(ctx.getSessionVariable().getQueryOptionVariables()); - // session variables - params.setSessionVariables(ctx.getSessionVariable().getForwardVariables()); - - // create a trace carrier - Map traceCarrier = new HashMap<>(); - // Inject the request with the current context - Telemetry.getOpenTelemetry().getPropagators().getTextMapPropagator() - .inject(Context.current(), traceCarrier, (carrier, key, value) -> carrier.put(key, value)); - // carrier send tracing to master - params.setTraceCarrier(traceCarrier); - - if (null != ctx.queryId()) { - params.setQueryId(ctx.queryId()); + final TMasterOpRequest params = buildForwardParams(); + final StringBuilder forwardMsg = new StringBuilder(String.format("forward to Master %s", thriftAddress)); + if (!isSyncJournalOnly) { + forwardMsg.append(", statement: %s").append(ctx.getStmtId()); } - //node ident - params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost()); - params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort()); - LOG.info("Forward statement {} to Master {}", ctx.getStmtId(), thriftAddress); + LOG.info(forwardMsg.toString()); boolean isReturnToPool = false; try { @@ -137,9 +124,8 @@ private void forward() throws Exception { isReturnToPool = true; } catch (TTransportException e) { // wrap the raw exception. - Exception exception = new ForwardToMasterException( - String.format("Forward statement %s to Master %s failed", ctx.getStmtId(), - thriftAddress), e); + forwardMsg.append(" : failed"); + Exception exception = new ForwardToMasterException(String.format(forwardMsg.toString()), e); boolean ok = ClientPool.frontendPool.reopen(client, thriftTimeoutMs); if (!ok) { @@ -148,7 +134,7 @@ private void forward() throws Exception { if (shouldNotRetry || e.getType() == TTransportException.TIMED_OUT) { throw exception; } else { - LOG.warn("Forward statement " + ctx.getStmtId() + " to Master " + thriftAddress + " twice", e); + LOG.warn(forwardMsg.append(" twice").toString(), e); try { result = client.forward(params); isReturnToPool = true; @@ -165,6 +151,43 @@ private void forward() throws Exception { } } + private TMasterOpRequest buildForwardParams() { + TMasterOpRequest params = new TMasterOpRequest(); + //node ident + params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost()); + params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort()); + if (isSyncJournalOnly) { + params.setSyncJournalOnly(true); + return params; + } + params.setCluster(ctx.getClusterName()); + params.setSql(originStmt.originStmt); + params.setStmtIdx(originStmt.idx); + params.setUser(ctx.getQualifiedUser()); + params.setDb(ctx.getDatabase()); + params.setUserIp(ctx.getRemoteIP()); + params.setStmtId(ctx.getStmtId()); + params.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift()); + + // query options + params.setQueryOptions(ctx.getSessionVariable().getQueryOptionVariables()); + // session variables + params.setSessionVariables(ctx.getSessionVariable().getForwardVariables()); + + // create a trace carrier + Map traceCarrier = new HashMap<>(); + // Inject the request with the current context + Telemetry.getOpenTelemetry().getPropagators().getTextMapPropagator() + .inject(Context.current(), traceCarrier, (carrier, key, value) -> carrier.put(key, value)); + // carrier send tracing to master + params.setTraceCarrier(traceCarrier); + + if (null != ctx.queryId()) { + params.setQueryId(ctx.queryId()); + } + return params; + } + public ByteBuffer getOutputPacket() { if (result == null) { return null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 2dbfd0b647f5eb..9149d738c7c7af 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -725,8 +725,6 @@ public void executeByLegacy(TUniqueId queryId) throws Exception { handleLoadStmt(); } else if (parsedStmt instanceof UpdateStmt) { handleUpdateStmt(); - } else if (parsedStmt instanceof SyncStmt) { - syncJournal(); } else if (parsedStmt instanceof DdlStmt) { if (parsedStmt instanceof DeleteStmt && ((DeleteStmt) parsedStmt).getFromClause() != null) { handleDeleteStmt(); @@ -790,30 +788,11 @@ public void executeByLegacy(TUniqueId queryId) throws Exception { } private void syncJournalIfNeeded() throws Exception { - if (!context.getSessionVariable().enableStrongConsistencyRead - || !Config.enable_strong_consistency_read) { - return; - } - syncJournal(); - } - - /** - * fetch master's max journal id and wait for edit log replaying - */ - private void syncJournal() throws Exception { final Env env = context.getEnv(); - if (env.isMaster()) { + if (env.isMaster() || !context.getSessionVariable().enableStrongConsistencyRead) { return; } - String masterHost = env.getMasterHost(); - int masterRpcPort = env.getMasterRpcPort(); - TNetworkAddress thriftAddress = new TNetworkAddress(masterHost, masterRpcPort); - final int timeoutMs = context.getExecTimeout() * 1000; - final Client client = ClientPool.frontendPool.borrowObject(thriftAddress, timeoutMs); - final TGetMasterMaxJournalIdReply reply = client.getMasterMaxJournalId(); - Preconditions.checkNotNull(reply, "max journal id should be not null."); - final long maxJournalId = reply.getMaxJournalId(); - env.getJournalObservable().waitOn(maxJournalId, timeoutMs); + new MasterOpExecutor(context).execute(); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 03678a45aa39d3..884cbd3331bb80 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -182,6 +182,7 @@ import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; +import java.nio.ByteBuffer; import java.time.Instant; import java.time.ZoneId; import java.time.ZonedDateTime; @@ -898,6 +899,11 @@ public TMasterOpResult forward(TMasterOpRequest params) throws TException { LOG.warn("reject request from invalid host. client: {}", params.getClientNodeHost()); throw new TException("request from invalid host was rejected."); } + if (params.isSetSyncJournalOnly() && params.syncJournalOnly) { + final TMasterOpResult result = new TMasterOpResult(); + result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId()); + return result; + } // add this log so that we can track this stmt LOG.debug("receive forwarded stmt {} from FE: {}", params.getStmtId(), params.getClientNodeHost()); @@ -2553,11 +2559,4 @@ public TGetMasterTokenResult getMasterToken(TGetMasterTokenRequest request) thro return result; } - - @Override - public TGetMasterMaxJournalIdReply getMasterMaxJournalId() throws TException { - final Env env = Env.getCurrentEnv(); - Preconditions.checkState(env.isMaster(), "should get max journal id from master"); - return new TGetMasterMaxJournalIdReply(env.getMaxJournalId()); - } } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 43ebde63a1046b..c58bebf80cb0e9 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -468,6 +468,7 @@ struct TMasterOpRequest { 21: optional map trace_carrier 22: optional string clientNodeHost 23: optional i32 clientNodePort + 24: optional bool syncJournalOnly // if set to true, this request means to do nothing but just sync max journal id of master } struct TColumnDefinition { @@ -1041,10 +1042,6 @@ struct TGetMasterTokenResult { 2: optional string token } -struct TGetMasterMaxJournalIdReply { - 1: i64 maxJournalId -} - service FrontendService { TGetDbsResult getDbNames(1: TGetDbsParams params) TGetTablesResult getTableNames(1: TGetTablesParams params) @@ -1107,6 +1104,4 @@ service FrontendService { TGetTabletReplicaInfosResult getTabletReplicaInfos(1: TGetTabletReplicaInfosRequest request) TGetMasterTokenResult getMasterToken(1: TGetMasterTokenRequest request) - - TGetMasterMaxJournalIdReply getMasterMaxJournalId() }