Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
zzzzzzzs committed Jun 5, 2024
1 parent eea6eef commit c2d9f9a
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.Cast;
Expand All @@ -48,22 +47,17 @@
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SqlModeHelper;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.transaction.TransactionStatus;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ProtocolStringList;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -143,8 +137,7 @@ private static boolean isGroupCommitAvailablePlan(PhysicalOlapTableSink<? extend
}

private void handleGroupCommit(ConnectContext ctx, DataSink sink,
PhysicalOlapTableSink<?> physicalOlapTableSink, NereidsPlanner planner)
throws UserException, TException, RpcException, ExecutionException, InterruptedException {
PhysicalOlapTableSink<?> physicalOlapTableSink, NereidsPlanner planner) throws Exception {
// TODO we should refactor this to remove rely on UnionNode
List<InternalService.PDataRow> rows = new ArrayList<>();

Expand Down Expand Up @@ -181,15 +174,8 @@ private void handleGroupCommit(ConnectContext ctx, DataSink sink,
ConnectContext.get().getSessionVariable().getGroupCommit());
PGroupCommitInsertResponse response = groupCommitPlanner.executeGroupCommitInsert(ctx, rows);
TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode());
ProtocolStringList errorMsgsList = response.getStatus().getErrorMsgsList();
// TODO: in legacy, there is a retry, we need to implement
if (code == TStatusCode.DATA_QUALITY_ERROR && !errorMsgsList.isEmpty() && errorMsgsList.get(0)
.contains("schema version not match")) {
LOG.info("group commit insert failed. query id: {}, backend id: {}, status: {}, "
+ "schema version: {}", ctx.queryId(),
groupCommitPlanner.getBackend(), response.getStatus(),
physicalOlapTableSink.getTargetTable().getBaseSchemaVersion());
} else if (code != TStatusCode.OK) {
if (code != TStatusCode.OK) {
String errMsg = "group commit insert failed. backend id: "
+ groupCommitPlanner.getBackend().getId() + ", status: "
+ response.getStatus();
Expand Down Expand Up @@ -220,8 +206,6 @@ protected void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink p

@Override
protected void beforeExec() {
String queryId = DebugUtil.printId(ctx.queryId());
LOG.info("start insert [{}] with query id {} and txn id {}", labelName, queryId, txnId);
}

@Override
Expand All @@ -232,40 +216,21 @@ protected void onComplete() throws UserException {
@Override
protected void onFail(Throwable t) {
errMsg = t.getMessage() == null ? "unknown reason" : t.getMessage();
StringBuilder sb = new StringBuilder();
sb.append("{");
sb.append("'status':'").append(ctx.isTxnModel() ? TransactionStatus.PREPARE.name() : txnStatus.name());
// sb.append("', 'txnId':'").append(txnId).append("'");
if (!Strings.isNullOrEmpty(errMsg)) {
sb.append("', 'err':'").append(errMsg).append("'");
}
sb.append("}");
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, t.getMessage());
// set insert result in connection context,
// so that user can use `show insert result` to get info of the last insert operation.
ctx.setOrUpdateInsertResult(txnId, labelName, database.getFullName(), table.getName(),
txnStatus, loadedRows, 0);
// update it, so that user can get loaded rows in fe.audit.log
ctx.updateReturnRows((int) loadedRows);
}

@Override
protected void afterExec(StmtExecutor executor) {

}

protected final void execImpl() throws UserException {
protected final void execImpl() throws Exception {
Optional<PhysicalOlapTableSink<?>> plan = (planner.getPhysicalPlan()
.<Set<PhysicalOlapTableSink<?>>>collect(PhysicalSink.class::isInstance)).stream()
.findAny();
PhysicalOlapTableSink<?> olapSink = plan.get();
DataSink sink = planner.getFragments().get(0).getSink();
try {
handleGroupCommit(ctx, sink, olapSink, planner);
} catch (TException | RpcException | ExecutionException | InterruptedException e) {
LOG.error("errors when group commit insert. {}", e);
throw new UserException("errors when group commit insert. " + e.getMessage(), e);
}
handleGroupCommit(ctx, sink, olapSink, planner);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ suite("insert_group_commit_into_max_filter_ratio") {
logger.warn("insert result: " + result + ", expected_row_count: " + expected_row_count + ", sql: " + sql)
}
// assertEquals(result, expected_row_count)
assertTrue(serverInfo.contains("too many filtered rows"))
assertTrue(serverInfo.contains("'status':'ABORTED'") || serverInfo.contains("too many filtered rows"))
// assertFalse(serverInfo.contains("'label':'group_commit_"))
} catch (Exception e) {
logger.info("exception: " + e)
Expand Down

0 comments on commit c2d9f9a

Please sign in to comment.