Skip to content

Commit

Permalink
[Feature](PreparedStatement) implement general server side prepared
Browse files Browse the repository at this point in the history
fix rebase
  • Loading branch information
eldenmoon committed Apr 17, 2024
1 parent 20b37e7 commit f205219
Show file tree
Hide file tree
Showing 32 changed files with 524 additions and 196 deletions.
75 changes: 75 additions & 0 deletions be/src/olap/column_tree.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <mutex>
#include <utility>

#include "common/status.h"
#include "olap/olap_common.h"
#include "olap/rowset/segment_v2/hierarchical_data_reader.h"
#include "vec/columns/subcolumn_tree.h"
#include "vec/json/path_in_data.h"

namespace doris {
struct SubcolumnReader {
std::unique_ptr<ColumnReader> reader;
std::shared_ptr<const vectorized::IDataType> file_column_type;
};

using RowsetSegmentId = std::pair<RowsetId, uint32_t>;

using SubstreamReaderTree = vectorized::SubcolumnsTree<StreamReader>;
using SegmentSubcolumnReaderMap = std::map<RowsetSegmentId, SubcolumnReader>;
using SubcolumnColumnReaders = vectorized::SubcolumnsTree<SegmentSubcolumnReaderMap>;

// Tree for variant subcolumns
// path -> rowsetid-segmentid-> SubcolumnReader
class GolobalColumnTree {
public:
GolobalColumnTree() = default;
void register_column(const vectorized::PathInData& path, const RowsetId& rowset_id,
uint32_t segment_id, SubcolumnReader&& reader) {
std::lock_guard<std::mutex> lock(_lock);
const auto* node = _readers_tree.find_exact(path);
if (!node) {
}
}
void remove_column(const vectorized::PathInData& path, const RowsetId& rowset_id,
uint32_t segment_id) {
std::lock_guard<std::mutex> lock(_lock);
}
// return nullptr if not found
const SubcolumnReader* get_reader(const vectorized::PathInData& path, RowsetId rowset_id,
uint32_t segment_id) {
std::lock_guard<std::mutex> lock(_lock);
const auto* node = _readers_tree.find_exact(path);
if (!node) {
return nullptr;
}
auto it = node->data.find(RowsetSegmentId {rowset_id, segment_id});
if (it == node->data.end()) {
return nullptr;
}
return &it->second;
}

private:
std::mutex _lock;
SubcolumnColumnReaders _readers_tree;
};

} // namespace doris
9 changes: 7 additions & 2 deletions be/src/pipeline/exec/result_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,13 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
// create writer based on sink type
switch (p._sink_type) {
case TResultSinkType::MYSQL_PROTOCAL:
_writer.reset(new (std::nothrow) vectorized::VMysqlResultWriter(
_sender.get(), _output_vexpr_ctxs, _profile));
if (state->mysql_row_binary_format()) {
_writer.reset(new (std::nothrow) vectorized::VMysqlResultWriter<true>(
_sender.get(), _output_vexpr_ctxs, _profile));
} else {
_writer.reset(new (std::nothrow) vectorized::VMysqlResultWriter<false>(
_sender.get(), _output_vexpr_ctxs, _profile));
}
break;
default:
return Status::InternalError("Unknown result sink type");
Expand Down
5 changes: 5 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ class RuntimeState {
_query_options.enable_common_expr_pushdown;
}

bool mysql_row_binary_format() const {
return _query_options.__isset.mysql_row_binary_format &&
_query_options.mysql_row_binary_format;
}

bool enable_faster_float_convert() const {
return _query_options.__isset.faster_float_convert && _query_options.faster_float_convert;
}
Expand Down
1 change: 0 additions & 1 deletion be/src/service/point_query_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/jsonb/serialize.h"
#include "vec/sink/vmysql_result_writer.cpp"
#include "vec/sink/vmysql_result_writer.h"

namespace doris {
Expand Down
12 changes: 9 additions & 3 deletions be/src/vec/sink/vresult_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,16 @@ Status VResultSink::prepare(RuntimeState* state) {

// create writer based on sink type
switch (_sink_type) {
case TResultSinkType::MYSQL_PROTOCAL:
_writer.reset(new (std::nothrow)
VMysqlResultWriter(_sender.get(), _output_vexpr_ctxs, _profile));
case TResultSinkType::MYSQL_PROTOCAL: {
if (state->mysql_row_binary_format()) {
_writer.reset(new (std::nothrow) VMysqlResultWriter<true>(
_sender.get(), _output_vexpr_ctxs, _profile));
} else {
_writer.reset(new (std::nothrow) VMysqlResultWriter<false>(
_sender.get(), _output_vexpr_ctxs, _profile));
}
break;
}
case TResultSinkType::ARROW_FLIGHT_PROTOCAL: {
std::shared_ptr<arrow::Schema> arrow_schema;
RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, &arrow_schema));
Expand Down
4 changes: 4 additions & 0 deletions fe/fe-common/src/main/java/org/apache/doris/catalog/Type.java
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,10 @@ public boolean isInvalid() {
return isScalarType(PrimitiveType.INVALID_TYPE);
}

public boolean isUnsupported() {
return isScalarType(PrimitiveType.UNSUPPORTED);
}

public boolean isValid() {
return !isInvalid();
}
Expand Down
8 changes: 6 additions & 2 deletions fe/fe-core/src/main/cup/sql_parser.cup
Original file line number Diff line number Diff line change
Expand Up @@ -1209,7 +1209,11 @@ stmt ::=
| insert_overwrite_stmt : stmt
{: RESULT = stmt; :}
| update_stmt : stmt
{: RESULT = stmt; :}
{:
RESULT = stmt;
stmt.setPlaceHolders(parser.placeholder_expr_list);
parser.placeholder_expr_list.clear();
:}
| backup_stmt : stmt
{: RESULT = stmt; :}
| restore_stmt : stmt
Expand Down Expand Up @@ -5539,7 +5543,7 @@ expr_or_default ::=
prepare_stmt ::=
KW_PREPARE variable_name:name KW_FROM select_stmt:s
{:
RESULT = new PrepareStmt(s, name, false);
RESULT = new PrepareStmt(s, name);
s.setPlaceHolders(parser.placeholder_expr_list);
parser.placeholder_expr_list.clear();
:}
Expand Down
16 changes: 8 additions & 8 deletions fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,14 @@ public int getCallDepth() {
return callDepth;
}

public void setPrepareStmt(PrepareStmt stmt) {
prepareStmt = stmt;
}

public PrepareStmt getPrepareStmt() {
return prepareStmt;
}

public void setInlineView(boolean inlineView) {
isInlineView = inlineView;
}
Expand All @@ -630,14 +638,6 @@ public void setExplicitViewAlias(String alias) {
explicitViewAlias = alias;
}

public void setPrepareStmt(PrepareStmt stmt) {
prepareStmt = stmt;
}

public PrepareStmt getPrepareStmt() {
return prepareStmt;
}

public String getExplicitViewAlias() {
return explicitViewAlias;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,11 +534,13 @@ public Pair<SlotRef, Expr> extract() {
public void analyzeImpl(Analyzer analyzer) throws AnalysisException {
super.analyzeImpl(analyzer);
this.checkIncludeBitmap();
// Ignore placeholder
if (getChild(0) instanceof PlaceHolderExpr || getChild(1) instanceof PlaceHolderExpr) {
// Ignore placeholder, when it type is invalid.
// Invalid type could happen when analyze prepared point query select statement,
// since the value is occupied but not assigned
if ((getChild(0) instanceof PlaceHolderExpr && getChild(0).type == Type.UNSUPPORTED)
|| (getChild(1) instanceof PlaceHolderExpr && getChild(1).type == Type.UNSUPPORTED)) {
return;
}

for (Expr expr : children) {
if (expr instanceof Subquery) {
Subquery subquery = (Subquery) expr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1529,7 +1529,7 @@ private void checkHllCompatibility() throws AnalysisException {
* failure to convert a string literal to a date literal
*/
public final Expr castTo(Type targetType) throws AnalysisException {
if (this instanceof PlaceHolderExpr && this.type.isInvalid()) {
if (this instanceof PlaceHolderExpr && this.type.isUnsupported()) {
return this;
}
// If the targetType is NULL_TYPE then ignore the cast because NULL_TYPE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,14 +343,6 @@ public String toString() {
return getStringValue();
}

// Parse from binary data, the format follows mysql binary protocal
// see https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_binary_resultset.html.
// Return next offset
public void setupParamFromBinary(ByteBuffer data) {
Preconditions.checkState(false,
"should implement this in derived class. " + this.type.toSql());
}

public static LiteralExpr getLiteralByMysqlType(int mysqlType) throws AnalysisException {
switch (mysqlType) {
// MYSQL_TYPE_TINY
Expand Down Expand Up @@ -499,4 +491,12 @@ public static LiteralExpr getLiteralExprFromThrift(TExprNode node) throws Analys
default: throw new AnalysisException("Wrong type from thrift;");
}
}

// Parse from binary data, the format follows mysql binary protocal
// see https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_binary_resultset.html.
// Return next offset
public void setupParamFromBinary(ByteBuffer data) {
Preconditions.checkState(false,
"should implement this in derived class. " + this.type.toSql());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,17 @@ enum InsertType {
}
}

public NativeInsertStmt(NativeInsertStmt other) {
super(other.label, null, null);
this.tblName = other.tblName;
this.targetPartitionNames = other.targetPartitionNames;
this.label = other.label;
this.queryStmt = other.queryStmt;
this.planHints = other.planHints;
this.targetColumnNames = other.targetColumnNames;
this.isValuesOrConstantSelect = other.isValuesOrConstantSelect;
}

public NativeInsertStmt(InsertTarget target, String label, List<String> cols, InsertSource source,
List<String> hints) {
super(new LabelName(null, label), null, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class PlaceHolderExpr extends LiteralExpr {
int mysqlTypeCode = -1;

public PlaceHolderExpr() {

type = Type.UNSUPPORTED;
}

public void setTypeCode(int mysqlTypeCode) {
Expand Down Expand Up @@ -164,7 +164,10 @@ public Expr clone() {

@Override
public String toSqlImpl() {
return getStringValue();
if (this.lExpr == null) {
return "?";
}
return "_placeholder_(" + this.lExpr.toSqlImpl() + ")";
}

@Override
Expand Down
Loading

0 comments on commit f205219

Please sign in to comment.