From 48e2cb230b4241a078e5795ba9e44f513975e196 Mon Sep 17 00:00:00 2001 From: deardeng <565620795@qq.com> Date: Wed, 25 Sep 2024 15:03:04 +0800 Subject: [PATCH 01/11] =?UTF-8?q?[fix](case)=20Fix=20some=20non=20cloud=20?= =?UTF-8?q?regression=20case=20`failed=20to=20get=20cloud=20c=E2=80=A6=20(?= =?UTF-8?q?#41249)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …ompute group` ## Proposed changes Issue Number: close #xxx --- .../org/apache/doris/qe/MasterOpExecutor.java | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java index 371c39b077c3a1..e4e058772f946f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java @@ -22,6 +22,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ClientPool; +import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.thrift.FrontendService; @@ -211,15 +212,17 @@ private TMasterOpRequest buildStmtForwardParams() throws AnalysisException { params.setStmtId(ctx.getStmtId()); params.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift()); - String cluster = ""; - try { - cluster = ctx.getCloudCluster(false); - } catch (Exception e) { - LOG.warn("failed to get cloud compute group", e); - throw new AnalysisException("failed to get cloud compute group", e); - } - if (!Strings.isNullOrEmpty(cluster)) { - params.setCloudCluster(cluster); + if (Config.isCloudMode()) { + String cluster = ""; + try { + cluster = ctx.getCloudCluster(false); + } catch (Exception e) { + LOG.warn("failed to get cloud compute group", e); + throw new AnalysisException("failed to get cloud compute group", e); + } + if (!Strings.isNullOrEmpty(cluster)) { + params.setCloudCluster(cluster); + } } // query options From 3a6c4b7d89aa0f4df1ecc3681e1978f3193ef8b3 Mon Sep 17 00:00:00 2001 From: feiniaofeiafei <53502832+feiniaofeiafei@users.noreply.github.com> Date: Tue, 3 Sep 2024 10:38:44 +0800 Subject: [PATCH 02/11] [Fix](planner) legacy planner repeat group by has grouping report error (#40281) --- .../apache/doris/analysis/GroupByClause.java | 5 ++ .../grouping_sets/valid_grouping.groovy | 51 +++++++++++++++++++ 2 files changed, 56 insertions(+) create mode 100644 regression-test/suites/nereids_rules_p0/grouping_sets/valid_grouping.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/GroupByClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/GroupByClause.java index f6305e611da442..9b5055944674b2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/GroupByClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/GroupByClause.java @@ -192,6 +192,11 @@ public void analyze(Analyzer analyzer) throws AnalysisException { "GROUP BY expression must not contain aggregate functions: " + groupingExpr.toSql()); } + if (groupingExpr.contains(GroupingFunctionCallExpr.class)) { + throw new AnalysisException( + "GROUP BY expression must not contain grouping scalar functions: " + + groupingExpr.toSql()); + } if (groupingExpr.contains(AnalyticExpr.class)) { // reference the original expr in the error msg throw new AnalysisException( diff --git a/regression-test/suites/nereids_rules_p0/grouping_sets/valid_grouping.groovy b/regression-test/suites/nereids_rules_p0/grouping_sets/valid_grouping.groovy new file mode 100644 index 00000000000000..dd69f23f027f21 --- /dev/null +++ b/regression-test/suites/nereids_rules_p0/grouping_sets/valid_grouping.groovy @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("valid_grouping"){ + sql "SET enable_fallback_to_original_planner=true" + sql "drop table if exists valid_grouping" + sql """ + CREATE TABLE `valid_grouping` ( + `a` INT NULL, + `b` VARCHAR(10) NULL, + `c` INT NULL, + `d` INT NULL + ) ENGINE=OLAP + DUPLICATE KEY(`a`, `b`) + DISTRIBUTED BY RANDOM BUCKETS AUTO + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + sql "insert into valid_grouping values(1,'d2',3,5);" + test { + sql """select + b, 'day' as DT_TYPE + from valid_grouping + group by grouping sets ( (grouping_id(b)),(b));""" + exception("GROUP BY expression must not contain grouping scalar functions: grouping_id(`b`)") + } + + test { + sql """select + b, 'day' as DT_TYPE + from valid_grouping + group by grouping sets ( (grouping(b)),(b));""" + exception("GROUP BY expression must not contain grouping scalar functions: grouping(`b`)") + } + +} \ No newline at end of file From d3f157103227e71e294f1f1bedc9a2dcc82fac1b Mon Sep 17 00:00:00 2001 From: Qi Chen Date: Mon, 30 Sep 2024 15:42:56 +0800 Subject: [PATCH 03/11] [Opt](scanner-scheduler) Opt scanner scheduler starvation issue. (#41485) ## Proposed changes Backport #40641 --- be/src/common/config.cpp | 2 ++ be/src/common/config.h | 2 ++ be/src/vec/exec/format/parquet/vparquet_group_reader.cpp | 5 +++++ be/src/vec/exec/scan/scanner_scheduler.cpp | 6 ++++++ be/src/vec/exec/scan/vfile_scanner.cpp | 2 +- 5 files changed, 16 insertions(+), 1 deletion(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 2696400ca1db17..c621738c1a3ce1 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -273,6 +273,8 @@ DEFINE_mInt32(doris_scan_block_max_mb, "67108864"); DEFINE_mInt32(doris_scanner_row_num, "16384"); // single read execute fragment row bytes DEFINE_mInt32(doris_scanner_row_bytes, "10485760"); +// single read execute fragment max run time millseconds +DEFINE_mInt32(doris_scanner_max_run_time_ms, "1000"); DEFINE_mInt32(min_bytes_in_scanner_queue, "67108864"); // (Advanced) Maximum size of per-query receive-side buffer DEFINE_mInt32(exchg_node_buffer_size_bytes, "20485760"); diff --git a/be/src/common/config.h b/be/src/common/config.h index c9801275424afd..0b5a282f26fbf0 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -323,6 +323,8 @@ DECLARE_mInt32(doris_scan_block_max_mb); DECLARE_mInt32(doris_scanner_row_num); // single read execute fragment row bytes DECLARE_mInt32(doris_scanner_row_bytes); +// single read execute fragment max run time millseconds +DECLARE_mInt32(doris_scanner_max_run_time_ms); DECLARE_mInt32(min_bytes_in_scanner_queue); // (Advanced) Maximum size of per-query receive-side buffer DECLARE_mInt32(exchg_node_buffer_size_bytes); diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index 576ed58a284f8f..b601064daeb1e4 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -455,6 +455,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re columns_to_filter[i] = i; } IColumn::Filter result_filter; + size_t pre_raw_read_rows = 0; while (!_state->is_cancelled()) { // read predicate columns pre_read_rows = 0; @@ -466,6 +467,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re DCHECK_EQ(pre_eof, true); break; } + pre_raw_read_rows += pre_read_rows; RETURN_IF_ERROR(_fill_partition_columns(block, pre_read_rows, _lazy_read_ctx.predicate_partition_columns)); RETURN_IF_ERROR(_fill_missing_columns(block, pre_read_rows, @@ -518,6 +520,9 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re Block::erase_useless_column(block, origin_column_num); if (!pre_eof) { + if (pre_raw_read_rows >= config::doris_scanner_row_num) { + break; + } // If continuous batches are skipped, we can cache them to skip a whole page _cached_filtered_rows += pre_read_rows; } else { // pre_eof diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 7f868fba5a666e..f3f91ec6f2da0b 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -232,6 +232,8 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, Thread::set_thread_nice_value(); } #endif + MonotonicStopWatch max_run_time_watch; + max_run_time_watch.start(); scanner->update_wait_worker_timer(); scanner->start_scan_cpu_timer(); Status status = Status::OK(); @@ -266,6 +268,10 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, eos = true; break; } + if (max_run_time_watch.elapsed_time() > + config::doris_scanner_max_run_time_ms * 1e6) { + break; + } BlockUPtr free_block = ctx->get_free_block(first_read); if (free_block == nullptr) { break; diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 899c618dcc5045..52375e2e9c011a 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -333,8 +333,8 @@ Status VFileScanner::_get_block_wrapped(RuntimeState* state, Block* block, bool* // or not found in the file column schema. RETURN_IF_ERROR(_truncate_char_or_varchar_columns(block)); } - break; } + break; } while (true); // Update filtered rows and unselected rows for load, reset counter. From 222c46c08eb9775642312a2226fc7459ec89a773 Mon Sep 17 00:00:00 2001 From: Yongqiang YANG Date: Mon, 30 Sep 2024 16:08:36 +0800 Subject: [PATCH 04/11] [fix](cloud) enable_file_cache for cloud by default (#41502) --- be/src/common/config.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index c621738c1a3ce1..9b9d16db778e2b 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1689,6 +1689,11 @@ bool init(const char* conf_file, bool fill_conf_map, bool must_exist, bool set_t SET_FIELD(it.second, std::vector, fill_conf_map, set_to_default); } + if (config::is_cloud_mode()) { + auto st = config::set_config("enable_file_cache", "true", true); + LOG(INFO) << "set config enable_file_cache " << "true" << " " << st; + } + return true; } From 009115cfccab8bf4f0a135b4ad1dbb60ddd7d141 Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Mon, 30 Sep 2024 16:17:59 +0800 Subject: [PATCH 05/11] [fix](jdbc catalog) Fixed FE memory leak by enabling weak references in HikariCP (#40773) (#41503) bp #40773 Co-authored-by: zy-kkk --- .../java/org/apache/doris/datasource/jdbc/client/JdbcClient.java | 1 + 1 file changed, 1 insertion(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java index cd64efcd80f2cd..155afea1217064 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java @@ -98,6 +98,7 @@ public static JdbcClient createJdbcClient(JdbcClientConfig jdbcClientConfig) { } protected JdbcClient(JdbcClientConfig jdbcClientConfig) { + System.setProperty("com.zaxxer.hikari.useWeakReferences", "true"); this.catalogName = jdbcClientConfig.getCatalog(); this.jdbcUser = jdbcClientConfig.getUser(); this.isOnlySpecifiedDatabase = Boolean.parseBoolean(jdbcClientConfig.getOnlySpecifiedDatabase()); From 5bc77b150ba9410343f14344b5c8067fa03edfe0 Mon Sep 17 00:00:00 2001 From: Yongqiang YANG Date: Mon, 30 Sep 2024 17:03:06 +0800 Subject: [PATCH 06/11] [improve](cloud) set default vaule for file_path (#41507) --- be/src/common/config.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 9b9d16db778e2b..d1cf65c37241ff 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1007,7 +1007,7 @@ DEFINE_Bool(enable_file_cache, "false"); // or use the default storage value: // {"path": "memory", "total_size":53687091200} // Both will use the directory "memory" on the disk instead of the real RAM. -DEFINE_String(file_cache_path, ""); +DEFINE_String(file_cache_path, "[{\"path\":\"${DORIS_HOME}/file_cache\"}]"); DEFINE_Int64(file_cache_each_block_size, "1048576"); // 1MB DEFINE_Bool(clear_file_cache, "false"); From 04e842df520a9a10d4f1035f78765d7dbe311e2c Mon Sep 17 00:00:00 2001 From: zy-kkk Date: Mon, 30 Sep 2024 22:46:00 +0800 Subject: [PATCH 07/11] [3.0][improvement](jdbc catalog) support jdbc external catalog insert stmt in nereids (#41511) pick (#39813) --- .../analyzer/UnboundJdbcTableSink.java | 84 ++++++++++ .../analyzer/UnboundTableSinkCreator.java | 20 +-- .../translator/PhysicalPlanTranslator.java | 20 +++ .../TurnOffPageCacheForInsertIntoSelect.java | 8 + .../properties/RequestPropertyDeriver.java | 9 ++ .../apache/doris/nereids/rules/RuleSet.java | 2 + .../apache/doris/nereids/rules/RuleType.java | 2 + .../nereids/rules/analysis/BindSink.java | 77 ++++++++- ...lJdbcTableSinkToPhysicalJdbcTableSink.java | 48 ++++++ .../doris/nereids/trees/plans/PlanType.java | 3 + .../BaseExternalTableInsertExecutor.java | 2 +- .../insert/InsertIntoTableCommand.java | 27 +++- .../plans/commands/insert/InsertUtils.java | 9 ++ .../insert/JdbcInsertCommandContext.java | 24 +++ .../commands/insert/JdbcInsertExecutor.java | 113 +++++++++++++ .../plans/logical/LogicalJdbcTableSink.java | 151 ++++++++++++++++++ .../PhysicalBaseExternalTableSink.java | 4 + .../plans/physical/PhysicalJdbcTableSink.java | 109 +++++++++++++ .../trees/plans/visitor/SinkVisitor.java | 15 ++ .../transaction/JdbcTransactionManager.java | 42 +++++ .../doris/transaction/TransactionType.java | 3 +- 21 files changed, 758 insertions(+), 14 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundJdbcTableSink.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalJdbcTableSinkToPhysicalJdbcTableSink.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertCommandContext.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertExecutor.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalJdbcTableSink.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalJdbcTableSink.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/transaction/JdbcTransactionManager.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundJdbcTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundJdbcTableSink.java new file mode 100644 index 00000000000000..53367cf9c21ae6 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundJdbcTableSink.java @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.analyzer; + +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Optional; + +/** + * Represent an jdbc table sink plan node that has not been bound. + */ +public class UnboundJdbcTableSink extends UnboundBaseExternalTableSink { + + public UnboundJdbcTableSink(List nameParts, List colNames, List hints, + List partitions, CHILD_TYPE child) { + this(nameParts, colNames, hints, partitions, DMLCommandType.NONE, + Optional.empty(), Optional.empty(), child); + } + + /** + * constructor + */ + public UnboundJdbcTableSink(List nameParts, + List colNames, + List hints, + List partitions, + DMLCommandType dmlCommandType, + Optional groupExpression, + Optional logicalProperties, + CHILD_TYPE child) { + super(nameParts, PlanType.LOGICAL_UNBOUND_JDBC_TABLE_SINK, ImmutableList.of(), groupExpression, + logicalProperties, colNames, dmlCommandType, child, hints, partitions); + } + + @Override + public Plan withChildren(List children) { + Preconditions.checkArgument(children.size() == 1, + "UnboundJdbcTableSink should have exactly one child"); + return new UnboundJdbcTableSink<>(nameParts, colNames, hints, partitions, + dmlCommandType, Optional.empty(), Optional.empty(), children.get(0)); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitUnboundJdbcTableSink(this, context); + } + + @Override + public Plan withGroupExpression(Optional groupExpression) { + return new UnboundJdbcTableSink<>(nameParts, colNames, hints, partitions, + dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child()); + } + + @Override + public Plan withGroupExprLogicalPropChildren(Optional groupExpression, + Optional logicalProperties, List children) { + return new UnboundJdbcTableSink<>(nameParts, colNames, hints, partitions, + dmlCommandType, groupExpression, logicalProperties, children.get(0)); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java index 6c361c36f055a3..8ca58f977578a6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java @@ -23,6 +23,7 @@ import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; +import org.apache.doris.datasource.jdbc.JdbcExternalCatalog; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.exceptions.ParseException; import org.apache.doris.nereids.trees.plans.Plan; @@ -78,6 +79,9 @@ public static LogicalSink createUnboundTableSink(List na } else if (curCatalog instanceof IcebergExternalCatalog) { return new UnboundIcebergTableSink<>(nameParts, colNames, hints, partitions, dmlCommandType, Optional.empty(), Optional.empty(), plan); + } else if (curCatalog instanceof JdbcExternalCatalog) { + return new UnboundJdbcTableSink<>(nameParts, colNames, hints, partitions, + dmlCommandType, Optional.empty(), Optional.empty(), plan); } throw new RuntimeException("Load data to " + curCatalog.getClass().getSimpleName() + " is not supported."); } @@ -109,20 +113,16 @@ public static LogicalSink createUnboundTableSinkMaybeOverwrite(L dmlCommandType, Optional.empty(), Optional.empty(), plan); } else if (curCatalog instanceof IcebergExternalCatalog && !isAutoDetectPartition) { return new UnboundIcebergTableSink<>(nameParts, colNames, hints, partitions, - dmlCommandType, Optional.empty(), Optional.empty(), plan); - } - // TODO: we need to support insert into other catalog - try { - if (ConnectContext.get() != null) { - ConnectContext.get().getSessionVariable().enableFallbackToOriginalPlannerOnce(); - } - } catch (Exception e) { - // ignore this. + dmlCommandType, Optional.empty(), Optional.empty(), plan); + } else if (curCatalog instanceof JdbcExternalCatalog) { + return new UnboundJdbcTableSink<>(nameParts, colNames, hints, partitions, + dmlCommandType, Optional.empty(), Optional.empty(), plan); } + throw new AnalysisException( (isOverwrite ? "insert overwrite" : "insert") + " data to " + curCatalog.getClass().getSimpleName() + " is not supported." + (isAutoDetectPartition - ? " PARTITION(*) is only supported in overwrite partition for OLAP table" : "")); + ? " PARTITION(*) is only supported in overwrite partition for OLAP table" : "")); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 960df63a62f539..49d0f6ae90ac97 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -56,6 +56,7 @@ import org.apache.doris.datasource.iceberg.IcebergExternalTable; import org.apache.doris.datasource.iceberg.source.IcebergScanNode; import org.apache.doris.datasource.jdbc.JdbcExternalTable; +import org.apache.doris.datasource.jdbc.sink.JdbcTableSink; import org.apache.doris.datasource.jdbc.source.JdbcScanNode; import org.apache.doris.datasource.lakesoul.LakeSoulExternalTable; import org.apache.doris.datasource.lakesoul.source.LakeSoulScanNode; @@ -128,6 +129,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalIntersect; import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcScan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit; import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalOdbcScan; @@ -496,6 +498,24 @@ public PlanFragment visitPhysicalIcebergTableSink(PhysicalIcebergTableSink jdbcTableSink, + PlanTranslatorContext context) { + PlanFragment rootFragment = jdbcTableSink.child().accept(this, context); + rootFragment.setOutputPartition(DataPartition.UNPARTITIONED); + List targetTableColumns = jdbcTableSink.getCols(); + List insertCols = targetTableColumns.stream() + .map(Column::getName) + .collect(Collectors.toList()); + + JdbcTableSink sink = new JdbcTableSink( + ((JdbcExternalTable) jdbcTableSink.getTargetTable()).getJdbcTable(), + insertCols + ); + rootFragment.setSink(sink); + return rootFragment; + } + @Override public PlanFragment visitPhysicalFileSink(PhysicalFileSink fileSink, PlanTranslatorContext context) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPageCacheForInsertIntoSelect.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPageCacheForInsertIntoSelect.java index ab817c2f1d7c56..2479af68fbece9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPageCacheForInsertIntoSelect.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPageCacheForInsertIntoSelect.java @@ -26,6 +26,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink; import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalIcebergTableSink; +import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink; import org.apache.doris.qe.SessionVariable; import org.apache.doris.qe.VariableMgr; @@ -67,6 +68,13 @@ public Plan visitLogicalIcebergTableSink( return tableSink; } + @Override + public Plan visitLogicalJdbcTableSink( + LogicalJdbcTableSink tableSink, StatementContext context) { + turnOffPageCache(context); + return tableSink; + } + private void turnOffPageCache(StatementContext context) { SessionVariable sessionVariable = context.getConnectContext().getSessionVariable(); // set temporary session value, and then revert value in the 'finally block' of StmtExecutor#execute diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java index 0b4929e0a87566..e184ce2777d7e5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java @@ -40,6 +40,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit; import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; @@ -152,6 +153,14 @@ public Void visitPhysicalIcebergTableSink( return null; } + @Override + public Void visitPhysicalJdbcTableSink( + PhysicalJdbcTableSink jdbcTableSink, PlanContext context) { + // Always use gather properties for jdbcTableSink + addRequestPropertyToChildren(PhysicalProperties.GATHER); + return null; + } + @Override public Void visitPhysicalResultSink(PhysicalResultSink physicalResultSink, PlanContext context) { if (context.getSessionVariable().enableParallelResultSink() diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java index be4d8b390c9f1f..26868665b10806 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java @@ -68,6 +68,7 @@ import org.apache.doris.nereids.rules.implementation.LogicalIcebergTableSinkToPhysicalIcebergTableSink; import org.apache.doris.nereids.rules.implementation.LogicalIntersectToPhysicalIntersect; import org.apache.doris.nereids.rules.implementation.LogicalJdbcScanToPhysicalJdbcScan; +import org.apache.doris.nereids.rules.implementation.LogicalJdbcTableSinkToPhysicalJdbcTableSink; import org.apache.doris.nereids.rules.implementation.LogicalJoinToHashJoin; import org.apache.doris.nereids.rules.implementation.LogicalJoinToNestedLoopJoin; import org.apache.doris.nereids.rules.implementation.LogicalLimitToPhysicalLimit; @@ -190,6 +191,7 @@ public class RuleSet { .add(new LogicalOlapTableSinkToPhysicalOlapTableSink()) .add(new LogicalHiveTableSinkToPhysicalHiveTableSink()) .add(new LogicalIcebergTableSinkToPhysicalIcebergTableSink()) + .add(new LogicalJdbcTableSinkToPhysicalJdbcTableSink()) .add(new LogicalFileSinkToPhysicalFileSink()) .add(new LogicalResultSinkToPhysicalResultSink()) .add(new LogicalDeferMaterializeResultSinkToPhysicalDeferMaterializeResultSink()) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java index f2c572f7779e91..d1a48899873f8d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java @@ -32,6 +32,7 @@ public enum RuleType { BINDING_RESULT_SINK(RuleTypeClass.REWRITE), BINDING_INSERT_HIVE_TABLE(RuleTypeClass.REWRITE), BINDING_INSERT_ICEBERG_TABLE(RuleTypeClass.REWRITE), + BINDING_INSERT_JDBC_TABLE(RuleTypeClass.REWRITE), BINDING_INSERT_TARGET_TABLE(RuleTypeClass.REWRITE), INIT_MATERIALIZATION_HOOK_FOR_FILE_SINK(RuleTypeClass.REWRITE), INIT_MATERIALIZATION_HOOK_FOR_TABLE_SINK(RuleTypeClass.REWRITE), @@ -437,6 +438,7 @@ public enum RuleType { LOGICAL_OLAP_TABLE_SINK_TO_PHYSICAL_OLAP_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_HIVE_TABLE_SINK_TO_PHYSICAL_HIVE_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_ICEBERG_TABLE_SINK_TO_PHYSICAL_ICEBERG_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION), + LOGICAL_JDBC_TABLE_SINK_TO_PHYSICAL_JDBC_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_RESULT_SINK_TO_PHYSICAL_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_DEFER_MATERIALIZE_RESULT_SINK_TO_PHYSICAL_DEFER_MATERIALIZE_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_FILE_SINK_TO_PHYSICAL_FILE_SINK_RULE(RuleTypeClass.IMPLEMENTATION), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java index 6d8ad94242b53c..05027b856740c7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java @@ -32,10 +32,13 @@ import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.iceberg.IcebergExternalDatabase; import org.apache.doris.datasource.iceberg.IcebergExternalTable; +import org.apache.doris.datasource.jdbc.JdbcExternalDatabase; +import org.apache.doris.datasource.jdbc.JdbcExternalTable; import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.analyzer.Scope; import org.apache.doris.nereids.analyzer.UnboundHiveTableSink; import org.apache.doris.nereids.analyzer.UnboundIcebergTableSink; +import org.apache.doris.nereids.analyzer.UnboundJdbcTableSink; import org.apache.doris.nereids.analyzer.UnboundSlot; import org.apache.doris.nereids.analyzer.UnboundTableSink; import org.apache.doris.nereids.exceptions.AnalysisException; @@ -58,6 +61,7 @@ import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalIcebergTableSink; +import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; @@ -109,7 +113,8 @@ public List buildRules() { // TODO: bind hive taget table RuleType.BINDING_INSERT_HIVE_TABLE.build(unboundHiveTableSink().thenApply(this::bindHiveTableSink)), RuleType.BINDING_INSERT_ICEBERG_TABLE.build( - unboundIcebergTableSink().thenApply(this::bindIcebergTableSink)) + unboundIcebergTableSink().thenApply(this::bindIcebergTableSink)), + RuleType.BINDING_INSERT_JDBC_TABLE.build(unboundJdbcTableSink().thenApply(this::bindJdbcTableSink)) ); } @@ -524,6 +529,64 @@ private Plan bindIcebergTableSink(MatchingContext> return boundSink.withChildAndUpdateOutput(fullOutputProject); } + private Plan bindJdbcTableSink(MatchingContext> ctx) { + UnboundJdbcTableSink sink = ctx.root; + Pair pair = bind(ctx.cascadesContext, sink); + JdbcExternalDatabase database = pair.first; + JdbcExternalTable table = pair.second; + LogicalPlan child = ((LogicalPlan) sink.child()); + + List bindColumns; + if (sink.getColNames().isEmpty()) { + bindColumns = table.getBaseSchema(true).stream().collect(ImmutableList.toImmutableList()); + } else { + bindColumns = sink.getColNames().stream().map(cn -> { + Column column = table.getColumn(cn); + if (column == null) { + throw new AnalysisException(String.format("column %s is not found in table %s", + cn, table.getName())); + } + return column; + }).collect(ImmutableList.toImmutableList()); + } + LogicalJdbcTableSink boundSink = new LogicalJdbcTableSink<>( + database, + table, + bindColumns, + child.getOutput().stream() + .map(NamedExpression.class::cast) + .collect(ImmutableList.toImmutableList()), + sink.getDMLCommandType(), + Optional.empty(), + Optional.empty(), + child); + // we need to insert all the columns of the target table + if (boundSink.getCols().size() != child.getOutput().size()) { + throw new AnalysisException("insert into cols should be corresponding to the query output"); + } + Map columnToOutput = getJdbcColumnToOutput(bindColumns, child); + // We don't need to insert unmentioned columns, only user specified columns + LogicalProject outputProject = getOutputProjectByCoercion(bindColumns, child, columnToOutput); + return boundSink.withChildAndUpdateOutput(outputProject); + } + + private static Map getJdbcColumnToOutput( + List bindColumns, LogicalPlan child) { + Map columnToOutput = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); + + for (int i = 0; i < bindColumns.size(); i++) { + Column column = bindColumns.get(i); + NamedExpression outputExpr = child.getOutput().get(i); + Alias output = new Alias( + TypeCoercionUtils.castIfNotSameType(outputExpr, DataType.fromCatalogType(column.getType())), + column.getName() + ); + columnToOutput.put(column.getName(), output); + } + + return columnToOutput; + } + private Pair bind(CascadesContext cascadesContext, UnboundTableSink sink) { List tableQualifier = RelationUtil.getQualifierName(cascadesContext.getConnectContext(), sink.getNameParts()); @@ -567,6 +630,18 @@ private Pair bind(CascadesContext throw new AnalysisException("the target table of insert into is not an iceberg table"); } + private Pair bind(CascadesContext cascadesContext, + UnboundJdbcTableSink sink) { + List tableQualifier = RelationUtil.getQualifierName(cascadesContext.getConnectContext(), + sink.getNameParts()); + Pair, TableIf> pair = RelationUtil.getDbAndTable(tableQualifier, + cascadesContext.getConnectContext().getEnv()); + if (pair.second instanceof JdbcExternalTable) { + return Pair.of(((JdbcExternalDatabase) pair.first), (JdbcExternalTable) pair.second); + } + throw new AnalysisException("the target table of insert into is not an jdbc table"); + } + private List bindPartitionIds(OlapTable table, List partitions, boolean temp) { return partitions.isEmpty() ? ImmutableList.of() diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalJdbcTableSinkToPhysicalJdbcTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalJdbcTableSinkToPhysicalJdbcTableSink.java new file mode 100644 index 00000000000000..960350c6117586 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalJdbcTableSinkToPhysicalJdbcTableSink.java @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.implementation; + +import org.apache.doris.nereids.rules.Rule; +import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcTableSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink; + +import java.util.Optional; + +/** + * Implementation rule that convert logical JdbcTableSink to physical JdbcTableSink. + */ +public class LogicalJdbcTableSinkToPhysicalJdbcTableSink extends OneImplementationRuleFactory { + @Override + public Rule build() { + return logicalJdbcTableSink().thenApply(ctx -> { + LogicalJdbcTableSink sink = ctx.root; + return new PhysicalJdbcTableSink<>( + sink.getDatabase(), + sink.getTargetTable(), + sink.getCols(), + sink.getOutputExprs(), + Optional.empty(), + sink.getLogicalProperties(), + null, + null, + sink.child()); + }).toRule(RuleType.LOGICAL_JDBC_TABLE_SINK_TO_PHYSICAL_JDBC_TABLE_SINK_RULE); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java index 9f451732bdc886..f3587b379210de 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java @@ -49,9 +49,11 @@ public enum PlanType { LOGICAL_OLAP_TABLE_SINK, LOGICAL_HIVE_TABLE_SINK, LOGICAL_ICEBERG_TABLE_SINK, + LOGICAL_JDBC_TABLE_SINK, LOGICAL_RESULT_SINK, LOGICAL_UNBOUND_OLAP_TABLE_SINK, LOGICAL_UNBOUND_HIVE_TABLE_SINK, + LOGICAL_UNBOUND_JDBC_TABLE_SINK, LOGICAL_UNBOUND_RESULT_SINK, // logical others @@ -103,6 +105,7 @@ public enum PlanType { PHYSICAL_OLAP_TABLE_SINK, PHYSICAL_HIVE_TABLE_SINK, PHYSICAL_ICEBERG_TABLE_SINK, + PHYSICAL_JDBC_TABLE_SINK, PHYSICAL_RESULT_SINK, // physical others diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java index a3aa33f96ab02c..e456d171df5986 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java @@ -46,8 +46,8 @@ * Insert executor for base external table */ public abstract class BaseExternalTableInsertExecutor extends AbstractInsertExecutor { + protected static final long INVALID_TXN_ID = -1L; private static final Logger LOG = LogManager.getLogger(BaseExternalTableInsertExecutor.class); - private static final long INVALID_TXN_ID = -1L; protected long txnId = INVALID_TXN_ID; protected TransactionStatus txnStatus = TransactionStatus.ABORTED; protected final TransactionManager transactionManager; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index 55f16c20e09ede..38d0d8386307cf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -18,6 +18,7 @@ package org.apache.doris.nereids.trees.plans.commands.insert; import org.apache.doris.analysis.StmtType; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.TableIf; @@ -26,12 +27,14 @@ import org.apache.doris.common.util.ProfileManager.ProfileType; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.iceberg.IcebergExternalTable; +import org.apache.doris.datasource.jdbc.JdbcExternalTable; import org.apache.doris.load.loadv2.LoadStatistic; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.analyzer.UnboundTableSink; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.glue.LogicalPlanAdapter; +import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.plans.Explainable; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; @@ -41,8 +44,11 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation; import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation; import org.apache.doris.nereids.trees.plans.physical.PhysicalSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.planner.DataSink; import org.apache.doris.qe.ConnectContext; @@ -53,6 +59,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.List; import java.util.Objects; import java.util.Optional; @@ -191,9 +198,27 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor IcebergExternalTable icebergExternalTable = (IcebergExternalTable) targetTableIf; insertExecutor = new IcebergInsertExecutor(ctx, icebergExternalTable, label, planner, Optional.of(insertCtx.orElse((new BaseExternalTableInsertCommandContext()))), emptyInsert); + } else if (physicalSink instanceof PhysicalJdbcTableSink) { + boolean emptyInsert = childIsEmptyRelation(physicalSink); + List cols = ((PhysicalJdbcTableSink) physicalSink).getCols(); + List slots = ((PhysicalJdbcTableSink) physicalSink).getOutput(); + if (physicalSink.children().size() == 1) { + if (physicalSink.child(0) instanceof PhysicalOneRowRelation + || physicalSink.child(0) instanceof PhysicalUnion) { + for (int i = 0; i < cols.size(); i++) { + if (!(cols.get(i).isAllowNull()) && slots.get(i).nullable()) { + throw new AnalysisException("Column `" + cols.get(i).getName() + + "` is not nullable, but the inserted value is nullable."); + } + } + } + } + JdbcExternalTable jdbcExternalTable = (JdbcExternalTable) targetTableIf; + insertExecutor = new JdbcInsertExecutor(ctx, jdbcExternalTable, label, planner, + Optional.of(insertCtx.orElse((new JdbcInsertCommandContext()))), emptyInsert); } else { // TODO: support other table types - throw new AnalysisException("insert into command only support [olap, hive, iceberg] table"); + throw new AnalysisException("insert into command only support [olap, hive, iceberg, jdbc] table"); } if (!insertExecutor.isEmptyInsert()) { insertExecutor.beginTransaction(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java index c5c2197faf0444..49e7858f6faf65 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java @@ -28,9 +28,11 @@ import org.apache.doris.common.Config; import org.apache.doris.common.FormatOptions; import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.datasource.jdbc.JdbcExternalTable; import org.apache.doris.nereids.analyzer.UnboundAlias; import org.apache.doris.nereids.analyzer.UnboundHiveTableSink; import org.apache.doris.nereids.analyzer.UnboundIcebergTableSink; +import org.apache.doris.nereids.analyzer.UnboundJdbcTableSink; import org.apache.doris.nereids.analyzer.UnboundOneRowRelation; import org.apache.doris.nereids.analyzer.UnboundTableSink; import org.apache.doris.nereids.exceptions.AnalysisException; @@ -266,6 +268,11 @@ public static Plan normalizePlan(Plan plan, TableIf table, Optional) unboundLogicalSink).isPartialUpdate()) { @@ -408,6 +415,8 @@ public static TableIf getTargetTable(Plan plan, ConnectContext ctx) { unboundTableSink = (UnboundHiveTableSink) plan; } else if (plan instanceof UnboundIcebergTableSink) { unboundTableSink = (UnboundIcebergTableSink) plan; + } else if (plan instanceof UnboundJdbcTableSink) { + unboundTableSink = (UnboundJdbcTableSink) plan; } else { throw new AnalysisException("the root of plan should be" + " [UnboundTableSink, UnboundHiveTableSink, UnboundIcebergTableSink]," diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertCommandContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertCommandContext.java new file mode 100644 index 00000000000000..71df7e417e6a8f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertCommandContext.java @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.insert; + +/** + * For iceberg External Table + */ +public class JdbcInsertCommandContext extends BaseExternalTableInsertCommandContext { +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertExecutor.java new file mode 100644 index 00000000000000..928b17edf38933 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertExecutor.java @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.insert; + +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.UserException; +import org.apache.doris.common.profile.SummaryProfile; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.datasource.jdbc.JdbcExternalTable; +import org.apache.doris.nereids.NereidsPlanner; +import org.apache.doris.nereids.trees.plans.physical.PhysicalSink; +import org.apache.doris.planner.DataSink; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.QueryState; +import org.apache.doris.transaction.TransactionStatus; +import org.apache.doris.transaction.TransactionType; + +import com.google.common.base.Strings; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Optional; + +/** + * Insert executor for jdbc table + */ +public class JdbcInsertExecutor extends BaseExternalTableInsertExecutor { + private static final Logger LOG = LogManager.getLogger(JdbcInsertExecutor.class); + + /** + * constructor + */ + public JdbcInsertExecutor(ConnectContext ctx, JdbcExternalTable table, + String labelName, NereidsPlanner planner, + Optional insertCtx, + boolean emptyInsert) { + super(ctx, table, labelName, planner, insertCtx, emptyInsert); + } + + @Override + public void beginTransaction() { + // do nothing + } + + @Override + protected void onComplete() throws UserException { + if (ctx.getState().getStateType() == QueryState.MysqlStateType.ERR) { + LOG.warn("errors when abort txn. {}", ctx.getQueryIdentifier()); + } else { + summaryProfile.ifPresent(profile -> profile.setTransactionBeginTime(transactionType())); + summaryProfile.ifPresent(SummaryProfile::setTransactionEndTime); + txnStatus = TransactionStatus.COMMITTED; + } + } + + @Override + protected void onFail(Throwable t) { + errMsg = t.getMessage() == null ? "unknown reason" : t.getMessage(); + String queryId = DebugUtil.printId(ctx.queryId()); + // if any throwable being thrown during insert operation, first we should abort this txn + LOG.warn("insert [{}] with query id {} failed", labelName, queryId, t); + StringBuilder sb = new StringBuilder(t.getMessage()); + if (txnId != INVALID_TXN_ID) { + LOG.warn("insert [{}] with query id {} abort txn {} failed", labelName, queryId, txnId); + if (!Strings.isNullOrEmpty(coordinator.getTrackingUrl())) { + sb.append(". url: ").append(coordinator.getTrackingUrl()); + } + } + ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, t.getMessage()); + } + + @Override + protected void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink physicalSink) { + // do nothing + } + + @Override + protected void setCollectCommitInfoFunc() { + // do nothing + } + + @Override + protected void doBeforeCommit() throws UserException { + // do nothing + } + + @Override + protected TransactionType transactionType() { + return TransactionType.JDBC; + } + + @Override + protected void beforeExec() { + String queryId = DebugUtil.printId(ctx.queryId()); + LOG.info("start insert [{}] with query id {} and txn id {}", labelName, queryId, txnId); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalJdbcTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalJdbcTableSink.java new file mode 100644 index 00000000000000..b4027383916599 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalJdbcTableSink.java @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.logical; + +import org.apache.doris.catalog.Column; +import org.apache.doris.datasource.jdbc.JdbcExternalDatabase; +import org.apache.doris.datasource.jdbc.JdbcExternalTable; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.PropagateFuncDeps; +import org.apache.doris.nereids.trees.plans.algebra.Sink; +import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.nereids.util.Utils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** + * logical jdbc table sink for insert command + */ +public class LogicalJdbcTableSink extends LogicalTableSink + implements Sink, PropagateFuncDeps { + // bound data sink + private final JdbcExternalDatabase database; + private final JdbcExternalTable targetTable; + private final DMLCommandType dmlCommandType; + + /** + * constructor + */ + public LogicalJdbcTableSink(JdbcExternalDatabase database, + JdbcExternalTable targetTable, + List cols, + List outputExprs, + DMLCommandType dmlCommandType, + Optional groupExpression, + Optional logicalProperties, + CHILD_TYPE child) { + super(PlanType.LOGICAL_JDBC_TABLE_SINK, outputExprs, groupExpression, logicalProperties, cols, child); + this.database = Objects.requireNonNull(database, "database != null in LogicalJdbcTableSink"); + this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in LogicalJdbcTableSink"); + this.dmlCommandType = dmlCommandType; + } + + public Plan withChildAndUpdateOutput(Plan child) { + List output = child.getOutput().stream() + .map(NamedExpression.class::cast) + .collect(ImmutableList.toImmutableList()); + return new LogicalJdbcTableSink<>(database, targetTable, cols, output, + dmlCommandType, Optional.empty(), Optional.empty(), child); + } + + @Override + public Plan withChildren(List children) { + Preconditions.checkArgument(children.size() == 1, "LogicalJdbcTableSink only accepts one child"); + return new LogicalJdbcTableSink<>(database, targetTable, cols, outputExprs, + dmlCommandType, Optional.empty(), Optional.empty(), children.get(0)); + } + + @Override + public LogicalSink withOutputExprs(List outputExprs) { + return new LogicalJdbcTableSink<>(database, targetTable, cols, outputExprs, + dmlCommandType, Optional.empty(), Optional.empty(), child()); + } + + public JdbcExternalDatabase getDatabase() { + return database; + } + + public JdbcExternalTable getTargetTable() { + return targetTable; + } + + public DMLCommandType getDmlCommandType() { + return dmlCommandType; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof LogicalJdbcTableSink)) { + return false; + } + if (!super.equals(o)) { + return false; + } + LogicalJdbcTableSink that = (LogicalJdbcTableSink) o; + return dmlCommandType == that.dmlCommandType + && Objects.equals(database, that.database) + && Objects.equals(targetTable, that.targetTable) && Objects.equals(cols, that.cols); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), database, targetTable, dmlCommandType); + } + + @Override + public String toString() { + return Utils.toSqlString("LogicalJdbcTableSink[" + id.asInt() + "]", + "outputExprs", outputExprs, + "database", database.getFullName(), + "targetTable", targetTable.getName(), + "cols", cols, + "dmlCommandType", dmlCommandType + ); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitLogicalJdbcTableSink(this, context); + } + + @Override + public Plan withGroupExpression(Optional groupExpression) { + return new LogicalJdbcTableSink<>(database, targetTable, cols, outputExprs, + dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child()); + } + + @Override + public Plan withGroupExprLogicalPropChildren(Optional groupExpression, + Optional logicalProperties, List children) { + return new LogicalJdbcTableSink<>(database, targetTable, cols, outputExprs, + dmlCommandType, groupExpression, logicalProperties, children.get(0)); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBaseExternalTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBaseExternalTableSink.java index 82483c63a40412..7c99886f06dffe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBaseExternalTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBaseExternalTableSink.java @@ -71,6 +71,10 @@ public ExternalTable getTargetTable() { return targetTable; } + public List getCols() { + return cols; + } + @Override public List getExpressions() { return ImmutableList.of(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalJdbcTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalJdbcTableSink.java new file mode 100644 index 00000000000000..2b0f12c1dea62a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalJdbcTableSink.java @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.physical; + +import org.apache.doris.catalog.Column; +import org.apache.doris.datasource.jdbc.JdbcExternalDatabase; +import org.apache.doris.datasource.jdbc.JdbcExternalTable; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.statistics.Statistics; + +import java.util.List; +import java.util.Optional; + +/** physical jdbc sink */ +public class PhysicalJdbcTableSink extends PhysicalBaseExternalTableSink { + + /** + * constructor + */ + public PhysicalJdbcTableSink(JdbcExternalDatabase database, + JdbcExternalTable targetTable, + List cols, + List outputExprs, + Optional groupExpression, + LogicalProperties logicalProperties, + CHILD_TYPE child) { + this(database, targetTable, cols, outputExprs, groupExpression, logicalProperties, + PhysicalProperties.GATHER, null, child); + } + + /** + * constructor + */ + public PhysicalJdbcTableSink(JdbcExternalDatabase database, + JdbcExternalTable targetTable, + List cols, + List outputExprs, + Optional groupExpression, + LogicalProperties logicalProperties, + PhysicalProperties physicalProperties, + Statistics statistics, + CHILD_TYPE child) { + super(PlanType.PHYSICAL_JDBC_TABLE_SINK, database, targetTable, cols, outputExprs, groupExpression, + logicalProperties, physicalProperties, statistics, child); + } + + @Override + public Plan withChildren(List children) { + return new PhysicalJdbcTableSink<>( + (JdbcExternalDatabase) database, (JdbcExternalTable) targetTable, + cols, outputExprs, groupExpression, + getLogicalProperties(), physicalProperties, statistics, children.get(0)); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitPhysicalJdbcTableSink(this, context); + } + + @Override + public Plan withGroupExpression(Optional groupExpression) { + return new PhysicalJdbcTableSink<>( + (JdbcExternalDatabase) database, (JdbcExternalTable) targetTable, cols, outputExprs, + groupExpression, getLogicalProperties(), child()); + } + + @Override + public Plan withGroupExprLogicalPropChildren(Optional groupExpression, + Optional logicalProperties, List children) { + return new PhysicalJdbcTableSink<>( + (JdbcExternalDatabase) database, (JdbcExternalTable) targetTable, cols, outputExprs, + groupExpression, logicalProperties.get(), children.get(0)); + } + + @Override + public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) { + return new PhysicalJdbcTableSink<>( + (JdbcExternalDatabase) database, (JdbcExternalTable) targetTable, cols, outputExprs, + groupExpression, getLogicalProperties(), physicalProperties, statistics, child()); + } + + @Override + public PhysicalProperties getRequirePhysicalProperties() { + // Since JDBC tables do not have partitioning, return a default physical property. + // GATHER implies that all data is gathered to a single location, which is a common requirement for JDBC sinks. + return PhysicalProperties.GATHER; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java index e0b8a1dddc1706..289687476b2cf3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java @@ -19,6 +19,7 @@ import org.apache.doris.nereids.analyzer.UnboundHiveTableSink; import org.apache.doris.nereids.analyzer.UnboundIcebergTableSink; +import org.apache.doris.nereids.analyzer.UnboundJdbcTableSink; import org.apache.doris.nereids.analyzer.UnboundResultSink; import org.apache.doris.nereids.analyzer.UnboundTableSink; import org.apache.doris.nereids.trees.plans.Plan; @@ -26,6 +27,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink; import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalIcebergTableSink; +import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink; import org.apache.doris.nereids.trees.plans.logical.LogicalSink; @@ -34,6 +36,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalFileSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalSink; @@ -68,6 +71,10 @@ default R visitUnboundIcebergTableSink(UnboundIcebergTableSink u return visitLogicalSink(unboundTableSink, context); } + default R visitUnboundJdbcTableSink(UnboundJdbcTableSink unboundTableSink, C context) { + return visitLogicalSink(unboundTableSink, context); + } + default R visitUnboundResultSink(UnboundResultSink unboundResultSink, C context) { return visitLogicalSink(unboundResultSink, context); } @@ -96,6 +103,10 @@ default R visitLogicalIcebergTableSink(LogicalIcebergTableSink i return visitLogicalTableSink(icebergTableSink, context); } + default R visitLogicalJdbcTableSink(LogicalJdbcTableSink jdbcTableSink, C context) { + return visitLogicalTableSink(jdbcTableSink, context); + } + default R visitLogicalResultSink(LogicalResultSink logicalResultSink, C context) { return visitLogicalSink(logicalResultSink, context); } @@ -129,6 +140,10 @@ default R visitPhysicalIcebergTableSink(PhysicalIcebergTableSink return visitPhysicalTableSink(icebergTableSink, context); } + default R visitPhysicalJdbcTableSink(PhysicalJdbcTableSink jdbcTableSink, C context) { + return visitPhysicalTableSink(jdbcTableSink, context); + } + default R visitPhysicalResultSink(PhysicalResultSink physicalResultSink, C context) { return visitPhysicalSink(physicalResultSink, context); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/JdbcTransactionManager.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/JdbcTransactionManager.java new file mode 100644 index 00000000000000..a0a1cc28803d4e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/JdbcTransactionManager.java @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.transaction; + +import org.apache.doris.common.UserException; + +public class JdbcTransactionManager implements TransactionManager { + @Override + public long begin() { + return 0; + } + + @Override + public void commit(long id) throws UserException { + + } + + @Override + public void rollback(long id) { + + } + + @Override + public Transaction getTransaction(long id) { + return null; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java index 2372c199738116..c83f61888901c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionType.java @@ -20,5 +20,6 @@ public enum TransactionType { UNKNOWN, HMS, - ICEBERG + ICEBERG, + JDBC } From d61a23d1d48f535e1bc6039080026c987af02247 Mon Sep 17 00:00:00 2001 From: zy-kkk Date: Tue, 1 Oct 2024 21:04:59 +0800 Subject: [PATCH 08/11] [3.0][fix](oracle scan) Fix performance issues caused by version judgment (#41513) pick (#41407) --- .../main/java/org/apache/doris/jdbc/OracleJdbcExecutor.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/OracleJdbcExecutor.java b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/OracleJdbcExecutor.java index 6f38895335b986..344e88b96c2095 100644 --- a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/OracleJdbcExecutor.java +++ b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/OracleJdbcExecutor.java @@ -39,9 +39,11 @@ public class OracleJdbcExecutor extends BaseJdbcExecutor { private static final Logger LOG = Logger.getLogger(OracleJdbcExecutor.class); private final CharsetDecoder utf8Decoder = StandardCharsets.UTF_8.newDecoder(); + private final boolean isNewJdbcVersion; public OracleJdbcExecutor(byte[] thriftParams) throws Exception { super(thriftParams); + isNewJdbcVersion = isJdbcVersionGreaterThanOrEqualTo("12.2.0"); } @Override @@ -65,7 +67,7 @@ protected void initializeBlock(int columnCount, String[] replaceStringList, int @Override protected Object getColumnValue(int columnIndex, ColumnType type, String[] replaceStringList) throws SQLException { - if (isJdbcVersionGreaterThanOrEqualTo("12.2.0")) { + if (isNewJdbcVersion) { return newGetColumnValue(columnIndex, type, replaceStringList); } else { return oldGetColumnValue(columnIndex, type, replaceStringList); From ff79d390600b398b5a2fca74b29cd372c886153d Mon Sep 17 00:00:00 2001 From: Yongqiang YANG Date: Wed, 2 Oct 2024 09:47:57 +0800 Subject: [PATCH 09/11] [fix] fix enable_file_cache --- be/src/common/config.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index d1cf65c37241ff..0cda0ffdcc515c 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1690,7 +1690,7 @@ bool init(const char* conf_file, bool fill_conf_map, bool must_exist, bool set_t } if (config::is_cloud_mode()) { - auto st = config::set_config("enable_file_cache", "true", true); + auto st = config::set_config("enable_file_cache", "true", true, true); LOG(INFO) << "set config enable_file_cache " << "true" << " " << st; } From ccae8ddac03a1a9c06098448d73efbe332f504bd Mon Sep 17 00:00:00 2001 From: Yongqiang YANG Date: Wed, 2 Oct 2024 20:58:14 +0800 Subject: [PATCH 10/11] [fix](cache) fix show cache hotspot syntax error (#41519) --- .../java/org/apache/doris/analysis/ShowCacheHotSpotStmt.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowCacheHotSpotStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowCacheHotSpotStmt.java index 35234870a85157..cf978033d796d7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowCacheHotSpotStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowCacheHotSpotStmt.java @@ -153,7 +153,7 @@ private String generateQueryString() { query = q1.append(q2); } else if (metaDataPos == 2) { query = new StringBuilder("select partition_id as PartitionId, partition_name as PartitionName" - + "FROM " + TABLE_NAME.toString() + + " FROM " + TABLE_NAME.toString() + " where " + whereExpr.get(0) + " and " + whereExpr.get(1) + "group by cluster_id, cluster_name, table_id, " From c21b9f5bce5142a409e9a49230505a0830eb2eb0 Mon Sep 17 00:00:00 2001 From: Yongqiang YANG Date: Sat, 5 Oct 2024 16:21:57 +0800 Subject: [PATCH 11/11] bump to 3.0.2-rc03 --- gensrc/script/gen_build_version.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gensrc/script/gen_build_version.sh b/gensrc/script/gen_build_version.sh index dd4ab72ae5bbdd..57117e483c5bd1 100755 --- a/gensrc/script/gen_build_version.sh +++ b/gensrc/script/gen_build_version.sh @@ -31,7 +31,7 @@ build_version_prefix="doris" build_version_major=3 build_version_minor=0 build_version_patch=2 -build_version_rc_version="rc02" +build_version_rc_version="rc03" build_version="${build_version_prefix}-${build_version_major}.${build_version_minor}.${build_version_patch}-${build_version_rc_version}"