Skip to content

Commit

Permalink
Merge branch 'branch-3.0' into branch-3.0-pick38226
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryange authored Oct 6, 2024
2 parents 867ed69 + c21b9f5 commit 6e9f0d2
Show file tree
Hide file tree
Showing 33 changed files with 854 additions and 28 deletions.
9 changes: 8 additions & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,8 @@ DEFINE_mInt32(doris_scan_block_max_mb, "67108864");
DEFINE_mInt32(doris_scanner_row_num, "16384");
// single read execute fragment row bytes
DEFINE_mInt32(doris_scanner_row_bytes, "10485760");
// single read execute fragment max run time millseconds
DEFINE_mInt32(doris_scanner_max_run_time_ms, "1000");
DEFINE_mInt32(min_bytes_in_scanner_queue, "67108864");
// (Advanced) Maximum size of per-query receive-side buffer
DEFINE_mInt32(exchg_node_buffer_size_bytes, "20485760");
Expand Down Expand Up @@ -1005,7 +1007,7 @@ DEFINE_Bool(enable_file_cache, "false");
// or use the default storage value:
// {"path": "memory", "total_size":53687091200}
// Both will use the directory "memory" on the disk instead of the real RAM.
DEFINE_String(file_cache_path, "");
DEFINE_String(file_cache_path, "[{\"path\":\"${DORIS_HOME}/file_cache\"}]");
DEFINE_Int64(file_cache_each_block_size, "1048576"); // 1MB

DEFINE_Bool(clear_file_cache, "false");
Expand Down Expand Up @@ -1687,6 +1689,11 @@ bool init(const char* conf_file, bool fill_conf_map, bool must_exist, bool set_t
SET_FIELD(it.second, std::vector<std::string>, fill_conf_map, set_to_default);
}

if (config::is_cloud_mode()) {
auto st = config::set_config("enable_file_cache", "true", true, true);
LOG(INFO) << "set config enable_file_cache " << "true" << " " << st;
}

return true;
}

Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,8 @@ DECLARE_mInt32(doris_scan_block_max_mb);
DECLARE_mInt32(doris_scanner_row_num);
// single read execute fragment row bytes
DECLARE_mInt32(doris_scanner_row_bytes);
// single read execute fragment max run time millseconds
DECLARE_mInt32(doris_scanner_max_run_time_ms);
DECLARE_mInt32(min_bytes_in_scanner_queue);
// (Advanced) Maximum size of per-query receive-side buffer
DECLARE_mInt32(exchg_node_buffer_size_bytes);
Expand Down
5 changes: 5 additions & 0 deletions be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re
columns_to_filter[i] = i;
}
IColumn::Filter result_filter;
size_t pre_raw_read_rows = 0;
while (!_state->is_cancelled()) {
// read predicate columns
pre_read_rows = 0;
Expand All @@ -466,6 +467,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re
DCHECK_EQ(pre_eof, true);
break;
}
pre_raw_read_rows += pre_read_rows;
RETURN_IF_ERROR(_fill_partition_columns(block, pre_read_rows,
_lazy_read_ctx.predicate_partition_columns));
RETURN_IF_ERROR(_fill_missing_columns(block, pre_read_rows,
Expand Down Expand Up @@ -518,6 +520,9 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re
Block::erase_useless_column(block, origin_column_num);

if (!pre_eof) {
if (pre_raw_read_rows >= config::doris_scanner_row_num) {
break;
}
// If continuous batches are skipped, we can cache them to skip a whole page
_cached_filtered_rows += pre_read_rows;
} else { // pre_eof
Expand Down
6 changes: 6 additions & 0 deletions be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,8 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
Thread::set_thread_nice_value();
}
#endif
MonotonicStopWatch max_run_time_watch;
max_run_time_watch.start();
scanner->update_wait_worker_timer();
scanner->start_scan_cpu_timer();
Status status = Status::OK();
Expand Down Expand Up @@ -266,6 +268,10 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
eos = true;
break;
}
if (max_run_time_watch.elapsed_time() >
config::doris_scanner_max_run_time_ms * 1e6) {
break;
}
BlockUPtr free_block = ctx->get_free_block(first_read);
if (free_block == nullptr) {
break;
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,8 @@ Status VFileScanner::_get_block_wrapped(RuntimeState* state, Block* block, bool*
// or not found in the file column schema.
RETURN_IF_ERROR(_truncate_char_or_varchar_columns(block));
}
break;
}
break;
} while (true);

// Update filtered rows and unselected rows for load, reset counter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@
public class OracleJdbcExecutor extends BaseJdbcExecutor {
private static final Logger LOG = Logger.getLogger(OracleJdbcExecutor.class);
private final CharsetDecoder utf8Decoder = StandardCharsets.UTF_8.newDecoder();
private final boolean isNewJdbcVersion;

public OracleJdbcExecutor(byte[] thriftParams) throws Exception {
super(thriftParams);
isNewJdbcVersion = isJdbcVersionGreaterThanOrEqualTo("12.2.0");
}

@Override
Expand All @@ -65,7 +67,7 @@ protected void initializeBlock(int columnCount, String[] replaceStringList, int

@Override
protected Object getColumnValue(int columnIndex, ColumnType type, String[] replaceStringList) throws SQLException {
if (isJdbcVersionGreaterThanOrEqualTo("12.2.0")) {
if (isNewJdbcVersion) {
return newGetColumnValue(columnIndex, type, replaceStringList);
} else {
return oldGetColumnValue(columnIndex, type, replaceStringList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ public void analyze(Analyzer analyzer) throws AnalysisException {
"GROUP BY expression must not contain aggregate functions: "
+ groupingExpr.toSql());
}
if (groupingExpr.contains(GroupingFunctionCallExpr.class)) {
throw new AnalysisException(
"GROUP BY expression must not contain grouping scalar functions: "
+ groupingExpr.toSql());
}
if (groupingExpr.contains(AnalyticExpr.class)) {
// reference the original expr in the error msg
throw new AnalysisException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ private String generateQueryString() {
query = q1.append(q2);
} else if (metaDataPos == 2) {
query = new StringBuilder("select partition_id as PartitionId, partition_name as PartitionName"
+ "FROM " + TABLE_NAME.toString()
+ " FROM " + TABLE_NAME.toString()
+ " where " + whereExpr.get(0)
+ " and " + whereExpr.get(1)
+ "group by cluster_id, cluster_name, table_id, "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public static JdbcClient createJdbcClient(JdbcClientConfig jdbcClientConfig) {
}

protected JdbcClient(JdbcClientConfig jdbcClientConfig) {
System.setProperty("com.zaxxer.hikari.useWeakReferences", "true");
this.catalogName = jdbcClientConfig.getCatalog();
this.jdbcUser = jdbcClientConfig.getUser();
this.isOnlySpecifiedDatabase = Boolean.parseBoolean(jdbcClientConfig.getOnlySpecifiedDatabase());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.analyzer;

import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;

import java.util.List;
import java.util.Optional;

/**
* Represent an jdbc table sink plan node that has not been bound.
*/
public class UnboundJdbcTableSink<CHILD_TYPE extends Plan> extends UnboundBaseExternalTableSink<CHILD_TYPE> {

public UnboundJdbcTableSink(List<String> nameParts, List<String> colNames, List<String> hints,
List<String> partitions, CHILD_TYPE child) {
this(nameParts, colNames, hints, partitions, DMLCommandType.NONE,
Optional.empty(), Optional.empty(), child);
}

/**
* constructor
*/
public UnboundJdbcTableSink(List<String> nameParts,
List<String> colNames,
List<String> hints,
List<String> partitions,
DMLCommandType dmlCommandType,
Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties,
CHILD_TYPE child) {
super(nameParts, PlanType.LOGICAL_UNBOUND_JDBC_TABLE_SINK, ImmutableList.of(), groupExpression,
logicalProperties, colNames, dmlCommandType, child, hints, partitions);
}

@Override
public Plan withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1,
"UnboundJdbcTableSink should have exactly one child");
return new UnboundJdbcTableSink<>(nameParts, colNames, hints, partitions,
dmlCommandType, Optional.empty(), Optional.empty(), children.get(0));
}

@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitUnboundJdbcTableSink(this, context);
}

@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new UnboundJdbcTableSink<>(nameParts, colNames, hints, partitions,
dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child());
}

@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new UnboundJdbcTableSink<>(nameParts, colNames, hints, partitions,
dmlCommandType, groupExpression, logicalProperties, children.get(0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.jdbc.JdbcExternalCatalog;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.exceptions.ParseException;
import org.apache.doris.nereids.trees.plans.Plan;
Expand Down Expand Up @@ -78,6 +79,9 @@ public static LogicalSink<? extends Plan> createUnboundTableSink(List<String> na
} else if (curCatalog instanceof IcebergExternalCatalog) {
return new UnboundIcebergTableSink<>(nameParts, colNames, hints, partitions,
dmlCommandType, Optional.empty(), Optional.empty(), plan);
} else if (curCatalog instanceof JdbcExternalCatalog) {
return new UnboundJdbcTableSink<>(nameParts, colNames, hints, partitions,
dmlCommandType, Optional.empty(), Optional.empty(), plan);
}
throw new RuntimeException("Load data to " + curCatalog.getClass().getSimpleName() + " is not supported.");
}
Expand Down Expand Up @@ -109,20 +113,16 @@ public static LogicalSink<? extends Plan> createUnboundTableSinkMaybeOverwrite(L
dmlCommandType, Optional.empty(), Optional.empty(), plan);
} else if (curCatalog instanceof IcebergExternalCatalog && !isAutoDetectPartition) {
return new UnboundIcebergTableSink<>(nameParts, colNames, hints, partitions,
dmlCommandType, Optional.empty(), Optional.empty(), plan);
}
// TODO: we need to support insert into other catalog
try {
if (ConnectContext.get() != null) {
ConnectContext.get().getSessionVariable().enableFallbackToOriginalPlannerOnce();
}
} catch (Exception e) {
// ignore this.
dmlCommandType, Optional.empty(), Optional.empty(), plan);
} else if (curCatalog instanceof JdbcExternalCatalog) {
return new UnboundJdbcTableSink<>(nameParts, colNames, hints, partitions,
dmlCommandType, Optional.empty(), Optional.empty(), plan);
}

throw new AnalysisException(
(isOverwrite ? "insert overwrite" : "insert") + " data to " + curCatalog.getClass().getSimpleName()
+ " is not supported."
+ (isAutoDetectPartition
? " PARTITION(*) is only supported in overwrite partition for OLAP table" : ""));
? " PARTITION(*) is only supported in overwrite partition for OLAP table" : ""));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.source.IcebergScanNode;
import org.apache.doris.datasource.jdbc.JdbcExternalTable;
import org.apache.doris.datasource.jdbc.sink.JdbcTableSink;
import org.apache.doris.datasource.jdbc.source.JdbcScanNode;
import org.apache.doris.datasource.lakesoul.LakeSoulExternalTable;
import org.apache.doris.datasource.lakesoul.source.LakeSoulScanNode;
Expand Down Expand Up @@ -128,6 +129,7 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIntersect;
import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOdbcScan;
Expand Down Expand Up @@ -496,6 +498,24 @@ public PlanFragment visitPhysicalIcebergTableSink(PhysicalIcebergTableSink<? ext
return rootFragment;
}

@Override
public PlanFragment visitPhysicalJdbcTableSink(PhysicalJdbcTableSink<? extends Plan> jdbcTableSink,
PlanTranslatorContext context) {
PlanFragment rootFragment = jdbcTableSink.child().accept(this, context);
rootFragment.setOutputPartition(DataPartition.UNPARTITIONED);
List<Column> targetTableColumns = jdbcTableSink.getCols();
List<String> insertCols = targetTableColumns.stream()
.map(Column::getName)
.collect(Collectors.toList());

JdbcTableSink sink = new JdbcTableSink(
((JdbcExternalTable) jdbcTableSink.getTargetTable()).getJdbcTable(),
insertCols
);
rootFragment.setSink(sink);
return rootFragment;
}

@Override
public PlanFragment visitPhysicalFileSink(PhysicalFileSink<? extends Plan> fileSink,
PlanTranslatorContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalIcebergTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.VariableMgr;
Expand Down Expand Up @@ -67,6 +68,13 @@ public Plan visitLogicalIcebergTableSink(
return tableSink;
}

@Override
public Plan visitLogicalJdbcTableSink(
LogicalJdbcTableSink<? extends Plan> tableSink, StatementContext context) {
turnOffPageCache(context);
return tableSink;
}

private void turnOffPageCache(StatementContext context) {
SessionVariable sessionVariable = context.getConnectContext().getSessionVariable();
// set temporary session value, and then revert value in the 'finally block' of StmtExecutor#execute
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
Expand Down Expand Up @@ -152,6 +153,14 @@ public Void visitPhysicalIcebergTableSink(
return null;
}

@Override
public Void visitPhysicalJdbcTableSink(
PhysicalJdbcTableSink<? extends Plan> jdbcTableSink, PlanContext context) {
// Always use gather properties for jdbcTableSink
addRequestPropertyToChildren(PhysicalProperties.GATHER);
return null;
}

@Override
public Void visitPhysicalResultSink(PhysicalResultSink<? extends Plan> physicalResultSink, PlanContext context) {
if (context.getSessionVariable().enableParallelResultSink()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.doris.nereids.rules.implementation.LogicalIcebergTableSinkToPhysicalIcebergTableSink;
import org.apache.doris.nereids.rules.implementation.LogicalIntersectToPhysicalIntersect;
import org.apache.doris.nereids.rules.implementation.LogicalJdbcScanToPhysicalJdbcScan;
import org.apache.doris.nereids.rules.implementation.LogicalJdbcTableSinkToPhysicalJdbcTableSink;
import org.apache.doris.nereids.rules.implementation.LogicalJoinToHashJoin;
import org.apache.doris.nereids.rules.implementation.LogicalJoinToNestedLoopJoin;
import org.apache.doris.nereids.rules.implementation.LogicalLimitToPhysicalLimit;
Expand Down Expand Up @@ -190,6 +191,7 @@ public class RuleSet {
.add(new LogicalOlapTableSinkToPhysicalOlapTableSink())
.add(new LogicalHiveTableSinkToPhysicalHiveTableSink())
.add(new LogicalIcebergTableSinkToPhysicalIcebergTableSink())
.add(new LogicalJdbcTableSinkToPhysicalJdbcTableSink())
.add(new LogicalFileSinkToPhysicalFileSink())
.add(new LogicalResultSinkToPhysicalResultSink())
.add(new LogicalDeferMaterializeResultSinkToPhysicalDeferMaterializeResultSink())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public enum RuleType {
BINDING_RESULT_SINK(RuleTypeClass.REWRITE),
BINDING_INSERT_HIVE_TABLE(RuleTypeClass.REWRITE),
BINDING_INSERT_ICEBERG_TABLE(RuleTypeClass.REWRITE),
BINDING_INSERT_JDBC_TABLE(RuleTypeClass.REWRITE),
BINDING_INSERT_TARGET_TABLE(RuleTypeClass.REWRITE),
INIT_MATERIALIZATION_HOOK_FOR_FILE_SINK(RuleTypeClass.REWRITE),
INIT_MATERIALIZATION_HOOK_FOR_TABLE_SINK(RuleTypeClass.REWRITE),
Expand Down Expand Up @@ -437,6 +438,7 @@ public enum RuleType {
LOGICAL_OLAP_TABLE_SINK_TO_PHYSICAL_OLAP_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_HIVE_TABLE_SINK_TO_PHYSICAL_HIVE_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_ICEBERG_TABLE_SINK_TO_PHYSICAL_ICEBERG_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_JDBC_TABLE_SINK_TO_PHYSICAL_JDBC_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_RESULT_SINK_TO_PHYSICAL_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_DEFER_MATERIALIZE_RESULT_SINK_TO_PHYSICAL_DEFER_MATERIALIZE_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_FILE_SINK_TO_PHYSICAL_FILE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
Expand Down
Loading

0 comments on commit 6e9f0d2

Please sign in to comment.