Skip to content

Commit

Permalink
fix reviewed problems
Browse files Browse the repository at this point in the history
  • Loading branch information
TangSiyang2001 committed Jun 30, 2023
1 parent b4b8cdb commit 9907dd4
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 78 deletions.
2 changes: 1 addition & 1 deletion docs/en/docs/admin-manual/config/fe-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -1010,7 +1010,7 @@ This variable is a session variable, and the session level takes effect.

Default: false

This variable is both a session variable and global config option.
This variable is a session variable, and the session level takes effect.

- Type: boolean
- Description: To enable strong consistency reading. In scenarios where strong consistency is required, this variable can be set to true to make loaded data immediately visible to queries.
Expand Down
2 changes: 1 addition & 1 deletion docs/zh-CN/docs/admin-manual/config/fe-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -1010,7 +1010,7 @@ colocate join PlanFragment instance 的 memory_limit = exec_mem_limit / min (que

默认值:false

该变量可以作为全局配置项和session variable生效
该变量为 session variable,session 级别生效

- 类型:boolean
- 描述:用以开启强一致性读。在强一致性的需求场景下可将该将该变量设置为true,使导入数据在查询时实时可见。
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2023,10 +2023,4 @@ public class Config extends ConfigBase {
"是否禁止使用 WITH REOSOURCE 语句创建 Catalog。",
"Whether to disable creating catalog with WITH RESOURCE statement."})
public static boolean disallow_create_catalog_with_resource = true;

@ConfField(description = {
"是否开启强一致性读。",
"Whether to enable strong consistency read."
})
public static boolean enable_strong_consistency_read = false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ public void analyze(Analyzer analyzer) throws UserException {

@Override
public RedirectStatus getRedirectStatus() {
return RedirectStatus.NO_FORWARD;
return RedirectStatus.FORWARD_WITH_SYNC;
}
}
3 changes: 3 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import org.apache.doris.analysis.SetUserPropertyStmt;
import org.apache.doris.analysis.StopRoutineLoadStmt;
import org.apache.doris.analysis.StopSyncJobStmt;
import org.apache.doris.analysis.SyncStmt;
import org.apache.doris.analysis.TruncateTableStmt;
import org.apache.doris.analysis.UninstallPluginStmt;
import org.apache.doris.catalog.EncryptKeyHelper;
Expand Down Expand Up @@ -229,6 +230,8 @@ public static void execute(Env env, DdlStmt ddlStmt) throws Exception {
env.getBackupHandler().createRepository((CreateRepositoryStmt) ddlStmt);
} else if (ddlStmt instanceof DropRepositoryStmt) {
env.getBackupHandler().dropRepository((DropRepositoryStmt) ddlStmt);
} else if (ddlStmt instanceof SyncStmt) {
return;
} else if (ddlStmt instanceof TruncateTableStmt) {
env.truncateTable((TruncateTableStmt) ddlStmt);
} else if (ddlStmt instanceof AdminRepairTableStmt) {
Expand Down
89 changes: 56 additions & 33 deletions fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public class MasterOpExecutor {

private boolean shouldNotRetry;

private boolean isSyncJournalOnly;

public MasterOpExecutor(OriginStatement originStmt, ConnectContext ctx, RedirectStatus status, boolean isQuery) {
this.originStmt = originStmt;
this.ctx = ctx;
Expand All @@ -66,6 +68,15 @@ public MasterOpExecutor(OriginStatement originStmt, ConnectContext ctx, Redirect
this.thriftTimeoutMs = (int) (ctx.getExecTimeout() * 1000 * RPC_TIMEOUT_COEFFICIENT);
// if isQuery=false, we shouldn't retry twice when catch exception because of Idempotency
this.shouldNotRetry = !isQuery;
this.isSyncJournalOnly = false;
}

/**
* used for simply syncing journal with master under strong consistency mode
*/
public MasterOpExecutor(ConnectContext ctx) {
this(null, ctx, RedirectStatus.FORWARD_WITH_SYNC, true);
this.isSyncJournalOnly = true;
}

public void execute() throws Exception {
Expand Down Expand Up @@ -100,46 +111,21 @@ private void forward() throws Exception {
// may throw NullPointerException. add err msg
throw new Exception("Failed to get master client.", e);
}
TMasterOpRequest params = new TMasterOpRequest();
params.setCluster(ctx.getClusterName());
params.setSql(originStmt.originStmt);
params.setStmtIdx(originStmt.idx);
params.setUser(ctx.getQualifiedUser());
params.setDb(ctx.getDatabase());
params.setUserIp(ctx.getRemoteIP());
params.setStmtId(ctx.getStmtId());
params.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift());

// query options
params.setQueryOptions(ctx.getSessionVariable().getQueryOptionVariables());
// session variables
params.setSessionVariables(ctx.getSessionVariable().getForwardVariables());

// create a trace carrier
Map<String, String> traceCarrier = new HashMap<>();
// Inject the request with the current context
Telemetry.getOpenTelemetry().getPropagators().getTextMapPropagator()
.inject(Context.current(), traceCarrier, (carrier, key, value) -> carrier.put(key, value));
// carrier send tracing to master
params.setTraceCarrier(traceCarrier);

if (null != ctx.queryId()) {
params.setQueryId(ctx.queryId());
final TMasterOpRequest params = buildForwardParams();
final StringBuilder forwardMsg = new StringBuilder(String.format("forward to Master %s", thriftAddress));
if (!isSyncJournalOnly) {
forwardMsg.append(", statement: %s").append(ctx.getStmtId());
}
//node ident
params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
LOG.info("Forward statement {} to Master {}", ctx.getStmtId(), thriftAddress);
LOG.info(forwardMsg.toString());

boolean isReturnToPool = false;
try {
result = client.forward(params);
isReturnToPool = true;
} catch (TTransportException e) {
// wrap the raw exception.
Exception exception = new ForwardToMasterException(
String.format("Forward statement %s to Master %s failed", ctx.getStmtId(),
thriftAddress), e);
forwardMsg.append(" : failed");
Exception exception = new ForwardToMasterException(String.format(forwardMsg.toString()), e);

boolean ok = ClientPool.frontendPool.reopen(client, thriftTimeoutMs);
if (!ok) {
Expand All @@ -148,7 +134,7 @@ private void forward() throws Exception {
if (shouldNotRetry || e.getType() == TTransportException.TIMED_OUT) {
throw exception;
} else {
LOG.warn("Forward statement " + ctx.getStmtId() + " to Master " + thriftAddress + " twice", e);
LOG.warn(forwardMsg.append(" twice").toString(), e);
try {
result = client.forward(params);
isReturnToPool = true;
Expand All @@ -165,6 +151,43 @@ private void forward() throws Exception {
}
}

private TMasterOpRequest buildForwardParams() {
TMasterOpRequest params = new TMasterOpRequest();
//node ident
params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
if (isSyncJournalOnly) {
params.setSyncJournalOnly(true);
return params;
}
params.setCluster(ctx.getClusterName());
params.setSql(originStmt.originStmt);
params.setStmtIdx(originStmt.idx);
params.setUser(ctx.getQualifiedUser());
params.setDb(ctx.getDatabase());
params.setUserIp(ctx.getRemoteIP());
params.setStmtId(ctx.getStmtId());
params.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift());

// query options
params.setQueryOptions(ctx.getSessionVariable().getQueryOptionVariables());
// session variables
params.setSessionVariables(ctx.getSessionVariable().getForwardVariables());

// create a trace carrier
Map<String, String> traceCarrier = new HashMap<>();
// Inject the request with the current context
Telemetry.getOpenTelemetry().getPropagators().getTextMapPropagator()
.inject(Context.current(), traceCarrier, (carrier, key, value) -> carrier.put(key, value));
// carrier send tracing to master
params.setTraceCarrier(traceCarrier);

if (null != ctx.queryId()) {
params.setQueryId(ctx.queryId());
}
return params;
}

public ByteBuffer getOutputPacket() {
if (result == null) {
return null;
Expand Down
25 changes: 2 additions & 23 deletions fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -725,8 +725,6 @@ public void executeByLegacy(TUniqueId queryId) throws Exception {
handleLoadStmt();
} else if (parsedStmt instanceof UpdateStmt) {
handleUpdateStmt();
} else if (parsedStmt instanceof SyncStmt) {
syncJournal();
} else if (parsedStmt instanceof DdlStmt) {
if (parsedStmt instanceof DeleteStmt && ((DeleteStmt) parsedStmt).getFromClause() != null) {
handleDeleteStmt();
Expand Down Expand Up @@ -790,30 +788,11 @@ public void executeByLegacy(TUniqueId queryId) throws Exception {
}

private void syncJournalIfNeeded() throws Exception {
if (!context.getSessionVariable().enableStrongConsistencyRead
|| !Config.enable_strong_consistency_read) {
return;
}
syncJournal();
}

/**
* fetch master's max journal id and wait for edit log replaying
*/
private void syncJournal() throws Exception {
final Env env = context.getEnv();
if (env.isMaster()) {
if (env.isMaster() || !context.getSessionVariable().enableStrongConsistencyRead) {
return;
}
String masterHost = env.getMasterHost();
int masterRpcPort = env.getMasterRpcPort();
TNetworkAddress thriftAddress = new TNetworkAddress(masterHost, masterRpcPort);
final int timeoutMs = context.getExecTimeout() * 1000;
final Client client = ClientPool.frontendPool.borrowObject(thriftAddress, timeoutMs);
final TGetMasterMaxJournalIdReply reply = client.getMasterMaxJournalId();
Preconditions.checkNotNull(reply, "max journal id should be not null.");
final long maxJournalId = reply.getMaxJournalId();
env.getJournalObservable().waitOn(maxJournalId, timeoutMs);
new MasterOpExecutor(context).execute();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;

import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
Expand Down Expand Up @@ -898,6 +899,11 @@ public TMasterOpResult forward(TMasterOpRequest params) throws TException {
LOG.warn("reject request from invalid host. client: {}", params.getClientNodeHost());
throw new TException("request from invalid host was rejected.");
}
if (params.isSetSyncJournalOnly() && params.syncJournalOnly) {
final TMasterOpResult result = new TMasterOpResult();
result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId());
return result;
}

// add this log so that we can track this stmt
LOG.debug("receive forwarded stmt {} from FE: {}", params.getStmtId(), params.getClientNodeHost());
Expand Down Expand Up @@ -2553,11 +2559,4 @@ public TGetMasterTokenResult getMasterToken(TGetMasterTokenRequest request) thro

return result;
}

@Override
public TGetMasterMaxJournalIdReply getMasterMaxJournalId() throws TException {
final Env env = Env.getCurrentEnv();
Preconditions.checkState(env.isMaster(), "should get max journal id from master");
return new TGetMasterMaxJournalIdReply(env.getMaxJournalId());
}
}
7 changes: 1 addition & 6 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ struct TMasterOpRequest {
21: optional map<string, string> trace_carrier
22: optional string clientNodeHost
23: optional i32 clientNodePort
24: optional bool syncJournalOnly // if set to true, this request means to do nothing but just sync max journal id of master
}

struct TColumnDefinition {
Expand Down Expand Up @@ -1041,10 +1042,6 @@ struct TGetMasterTokenResult {
2: optional string token
}

struct TGetMasterMaxJournalIdReply {
1: i64 maxJournalId
}

service FrontendService {
TGetDbsResult getDbNames(1: TGetDbsParams params)
TGetTablesResult getTableNames(1: TGetTablesParams params)
Expand Down Expand Up @@ -1107,6 +1104,4 @@ service FrontendService {
TGetTabletReplicaInfosResult getTabletReplicaInfos(1: TGetTabletReplicaInfosRequest request)

TGetMasterTokenResult getMasterToken(1: TGetMasterTokenRequest request)

TGetMasterMaxJournalIdReply getMasterMaxJournalId()
}

0 comments on commit 9907dd4

Please sign in to comment.