Skip to content

Commit

Permalink
[Feature](PreparedStatement) implement general server side prepared (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
eldenmoon authored and ByteYue committed May 15, 2024
1 parent cc63911 commit d5a144d
Show file tree
Hide file tree
Showing 32 changed files with 506 additions and 295 deletions.
9 changes: 7 additions & 2 deletions be/src/pipeline/exec/result_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,13 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
// create writer based on sink type
switch (p._sink_type) {
case TResultSinkType::MYSQL_PROTOCAL:
_writer.reset(new (std::nothrow) vectorized::VMysqlResultWriter(
_sender.get(), _output_vexpr_ctxs, _profile));
if (state->mysql_row_binary_format()) {
_writer.reset(new (std::nothrow) vectorized::VMysqlResultWriter<true>(
_sender.get(), _output_vexpr_ctxs, _profile));
} else {
_writer.reset(new (std::nothrow) vectorized::VMysqlResultWriter<false>(
_sender.get(), _output_vexpr_ctxs, _profile));
}
break;
default:
return Status::InternalError("Unknown result sink type");
Expand Down
5 changes: 5 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ class RuntimeState {
_query_options.enable_common_expr_pushdown_for_inverted_index;
};

bool mysql_row_binary_format() const {
return _query_options.__isset.mysql_row_binary_format &&
_query_options.mysql_row_binary_format;
}

bool enable_faster_float_convert() const {
return _query_options.__isset.faster_float_convert && _query_options.faster_float_convert;
}
Expand Down
1 change: 0 additions & 1 deletion be/src/service/point_query_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/jsonb/serialize.h"
#include "vec/sink/vmysql_result_writer.cpp"
#include "vec/sink/vmysql_result_writer.h"

namespace doris {
Expand Down
12 changes: 9 additions & 3 deletions be/src/vec/sink/vresult_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,16 @@ Status VResultSink::prepare(RuntimeState* state) {

// create writer based on sink type
switch (_sink_type) {
case TResultSinkType::MYSQL_PROTOCAL:
_writer.reset(new (std::nothrow)
VMysqlResultWriter(_sender.get(), _output_vexpr_ctxs, _profile));
case TResultSinkType::MYSQL_PROTOCAL: {
if (state->mysql_row_binary_format()) {
_writer.reset(new (std::nothrow) VMysqlResultWriter<true>(
_sender.get(), _output_vexpr_ctxs, _profile));
} else {
_writer.reset(new (std::nothrow) VMysqlResultWriter<false>(
_sender.get(), _output_vexpr_ctxs, _profile));
}
break;
}
case TResultSinkType::ARROW_FLIGHT_PROTOCAL: {
std::shared_ptr<arrow::Schema> arrow_schema;
RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, &arrow_schema));
Expand Down
4 changes: 4 additions & 0 deletions fe/fe-common/src/main/java/org/apache/doris/catalog/Type.java
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,10 @@ public boolean isInvalid() {
return isScalarType(PrimitiveType.INVALID_TYPE);
}

public boolean isUnsupported() {
return isScalarType(PrimitiveType.UNSUPPORTED);
}

public boolean isValid() {
return !isInvalid();
}
Expand Down
8 changes: 6 additions & 2 deletions fe/fe-core/src/main/cup/sql_parser.cup
Original file line number Diff line number Diff line change
Expand Up @@ -1234,7 +1234,11 @@ stmt ::=
| insert_overwrite_stmt : stmt
{: RESULT = stmt; :}
| update_stmt : stmt
{: RESULT = stmt; :}
{:
RESULT = stmt;
stmt.setPlaceHolders(parser.placeholder_expr_list);
parser.placeholder_expr_list.clear();
:}
| backup_stmt : stmt
{: RESULT = stmt; :}
| restore_stmt : stmt
Expand Down Expand Up @@ -5786,7 +5790,7 @@ expr_or_default ::=
prepare_stmt ::=
KW_PREPARE variable_name:name KW_FROM select_stmt:s
{:
RESULT = new PrepareStmt(s, name, false);
RESULT = new PrepareStmt(s, name);
s.setPlaceHolders(parser.placeholder_expr_list);
parser.placeholder_expr_list.clear();
:}
Expand Down
16 changes: 8 additions & 8 deletions fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,14 @@ public int getCallDepth() {
return callDepth;
}

public void setPrepareStmt(PrepareStmt stmt) {
prepareStmt = stmt;
}

public PrepareStmt getPrepareStmt() {
return prepareStmt;
}

public void setInlineView(boolean inlineView) {
isInlineView = inlineView;
}
Expand All @@ -630,14 +638,6 @@ public void setExplicitViewAlias(String alias) {
explicitViewAlias = alias;
}

public void setPrepareStmt(PrepareStmt stmt) {
prepareStmt = stmt;
}

public PrepareStmt getPrepareStmt() {
return prepareStmt;
}

public String getExplicitViewAlias() {
return explicitViewAlias;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,11 +508,13 @@ public Pair<SlotRef, Expr> extract() {
public void analyzeImpl(Analyzer analyzer) throws AnalysisException {
super.analyzeImpl(analyzer);
this.checkIncludeBitmap();
// Ignore placeholder
if (getChild(0) instanceof PlaceHolderExpr || getChild(1) instanceof PlaceHolderExpr) {
// Ignore placeholder, when it type is invalid.
// Invalid type could happen when analyze prepared point query select statement,
// since the value is occupied but not assigned
if ((getChild(0) instanceof PlaceHolderExpr && getChild(0).type == Type.UNSUPPORTED)
|| (getChild(1) instanceof PlaceHolderExpr && getChild(1).type == Type.UNSUPPORTED)) {
return;
}

for (Expr expr : children) {
if (expr instanceof Subquery) {
Subquery subquery = (Subquery) expr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1471,7 +1471,7 @@ private void checkHllCompatibility() throws AnalysisException {
* failure to convert a string literal to a date literal
*/
public final Expr castTo(Type targetType) throws AnalysisException {
if (this instanceof PlaceHolderExpr && this.type.isInvalid()) {
if (this instanceof PlaceHolderExpr && this.type.isUnsupported()) {
return this;
}
// If the targetType is NULL_TYPE then ignore the cast because NULL_TYPE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,14 +343,6 @@ public String toString() {
return getStringValue();
}

// Parse from binary data, the format follows mysql binary protocal
// see https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_binary_resultset.html.
// Return next offset
public void setupParamFromBinary(ByteBuffer data) {
Preconditions.checkState(false,
"should implement this in derived class. " + this.type.toSql());
}

public static LiteralExpr getLiteralByMysqlType(int mysqlType) throws AnalysisException {
switch (mysqlType) {
// MYSQL_TYPE_TINY
Expand Down Expand Up @@ -499,4 +491,12 @@ public static LiteralExpr getLiteralExprFromThrift(TExprNode node) throws Analys
default: throw new AnalysisException("Wrong type from thrift;");
}
}

// Parse from binary data, the format follows mysql binary protocal
// see https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_binary_resultset.html.
// Return next offset
public void setupParamFromBinary(ByteBuffer data) {
Preconditions.checkState(false,
"should implement this in derived class. " + this.type.toSql());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,17 @@ enum InsertType {
}
}

public NativeInsertStmt(NativeInsertStmt other) {
super(other.label, null, null);
this.tblName = other.tblName;
this.targetPartitionNames = other.targetPartitionNames;
this.label = other.label;
this.queryStmt = other.queryStmt;
this.planHints = other.planHints;
this.targetColumnNames = other.targetColumnNames;
this.isValuesOrConstantSelect = other.isValuesOrConstantSelect;
}

public NativeInsertStmt(InsertTarget target, String label, List<String> cols, InsertSource source,
List<String> hints) {
super(new LabelName(null, label), null, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class PlaceHolderExpr extends LiteralExpr {
int mysqlTypeCode = -1;

public PlaceHolderExpr() {

type = Type.UNSUPPORTED;
}

public void setTypeCode(int mysqlTypeCode) {
Expand All @@ -45,10 +45,12 @@ public void setTypeCode(int mysqlTypeCode) {

protected PlaceHolderExpr(LiteralExpr literal) {
this.lExpr = literal;
this.type = literal.getType();
}

protected PlaceHolderExpr(PlaceHolderExpr other) {
this.lExpr = other.lExpr;
this.type = other.type;
}

public void setLiteral(LiteralExpr literal) {
Expand Down Expand Up @@ -161,7 +163,17 @@ public Expr clone() {

@Override
public String toSqlImpl() {
return getStringValue();
if (this.lExpr == null) {
return "?";
}
return "_placeholder_(" + this.lExpr.toSqlImpl() + ")";
}

// @Override
public Expr reset() {
this.lExpr = null;
this.type = Type.UNSUPPORTED;
return this;
}

@Override
Expand Down
Loading

0 comments on commit d5a144d

Please sign in to comment.