diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index bb9b842705b373c..d212938f122cd16 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -2004,6 +2004,7 @@ void calc_delete_bimtap_callback(CloudStorageEngine& engine, const TAgentTaskReq const auto& calc_delete_bitmap_req = req.calc_delete_bitmap_req; CloudEngineCalcDeleteBitmapTask engine_task(engine, calc_delete_bitmap_req, &error_tablet_ids, &succ_tablet_ids); + SCOPED_ATTACH_TASK(engine_task.mem_tracker()); status = engine_task.execute(); TFinishTaskRequest finish_task_request; diff --git a/be/src/cloud/cloud_delta_writer.cpp b/be/src/cloud/cloud_delta_writer.cpp index ef5f4dc33f7eb92..23e00dfb7469e51 100644 --- a/be/src/cloud/cloud_delta_writer.cpp +++ b/be/src/cloud/cloud_delta_writer.cpp @@ -52,7 +52,8 @@ Status CloudDeltaWriter::batch_init(std::vector writers) { if (writer->_is_init || writer->_is_cancelled) { return Status::OK(); } - return writer->init(); + Status st = writer->init(); // included in SCOPED_ATTACH_TASK + return st; }); } diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp index 2ff9b8e91ba9e88..9c784b1ced77e76 100644 --- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp +++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp @@ -17,6 +17,8 @@ #include "cloud/cloud_engine_calc_delete_bitmap_task.h" +#include + #include #include "cloud/cloud_meta_mgr.h" @@ -29,6 +31,7 @@ #include "olap/tablet_meta.h" #include "olap/txn_manager.h" #include "olap/utils.h" +#include "runtime/memory/mem_tracker_limiter.h" namespace doris { @@ -38,7 +41,10 @@ CloudEngineCalcDeleteBitmapTask::CloudEngineCalcDeleteBitmapTask( : _engine(engine), _cal_delete_bitmap_req(cal_delete_bitmap_req), _error_tablet_ids(error_tablet_ids), - _succ_tablet_ids(succ_tablet_ids) {} + _succ_tablet_ids(succ_tablet_ids) { + _mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::SCHEMA_CHANGE, + "CloudEngineCalcDeleteBitmapTask"); +} void CloudEngineCalcDeleteBitmapTask::add_error_tablet_id(int64_t tablet_id, const Status& err) { std::lock_guard lck(_mutex); @@ -126,9 +132,14 @@ CloudTabletCalcDeleteBitmapTask::CloudTabletCalcDeleteBitmapTask( _engine_calc_delete_bitmap_task(engine_task), _tablet(tablet), _transaction_id(transaction_id), - _version(version) {} + _version(version) { + _mem_tracker = MemTrackerLimiter::create_shared( + MemTrackerLimiter::Type::SCHEMA_CHANGE, + fmt::format("CloudTabletCalcDeleteBitmapTask#_transaction_id={}", _transaction_id)); +} void CloudTabletCalcDeleteBitmapTask::handle() const { + SCOPED_ATTACH_TASK(_mem_tracker); RowsetSharedPtr rowset; DeleteBitmapPtr delete_bitmap; RowsetIdUnorderedSet rowset_ids; diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.h b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.h index 514f1b059d5be64..ad5486b9cb60295 100644 --- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.h +++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.h @@ -28,6 +28,7 @@ namespace doris { class CloudEngineCalcDeleteBitmapTask; +class MemTrackerLimiter; class CloudTabletCalcDeleteBitmapTask { public: @@ -46,6 +47,7 @@ class CloudTabletCalcDeleteBitmapTask { std::shared_ptr _tablet; int64_t _transaction_id; int64_t _version; + std::shared_ptr _mem_tracker; }; class CloudEngineCalcDeleteBitmapTask : public EngineTask { diff --git a/be/src/io/cache/fs_file_cache_storage.cpp b/be/src/io/cache/fs_file_cache_storage.cpp index 11d1e714fb8d4a1..35f4112ae8b83e3 100644 --- a/be/src/io/cache/fs_file_cache_storage.cpp +++ b/be/src/io/cache/fs_file_cache_storage.cpp @@ -264,6 +264,7 @@ Status FSFileCacheStorage::rebuild_data_structure() const { auto rebuild_dir = [&](std::filesystem::directory_iterator& upgrade_key_it) -> Status { for (; upgrade_key_it != std::filesystem::directory_iterator(); ++upgrade_key_it) { if (upgrade_key_it->path().filename().native().find('_') == std::string::npos) { + RETURN_IF_ERROR(fs->delete_directory(upgrade_key_it->path().native() + "_0")); RETURN_IF_ERROR( fs->rename(upgrade_key_it->path(), upgrade_key_it->path().native() + "_0")); } diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 64c946e6daabdb0..6e0042da7d26318 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -631,7 +631,8 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPla _is_first_phase(tnode.agg_node.__isset.is_first_phase && tnode.agg_node.is_first_phase), _pool(pool), _limit(tnode.limit), - _have_conjuncts(tnode.__isset.vconjunct && !tnode.vconjunct.nodes.empty()), + _have_conjuncts((tnode.__isset.vconjunct && !tnode.vconjunct.nodes.empty()) || + (tnode.__isset.conjuncts && !tnode.conjuncts.empty())), _partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0] : std::vector {}), _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate) {} diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp index fec2edc71b8908a..f074afce374c08a 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp @@ -42,8 +42,8 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkSta } _runtime_filters.resize(p._runtime_filter_descs.size()); for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) { - RETURN_IF_ERROR(state->register_producer_runtime_filter(p._runtime_filter_descs[i], false, - &_runtime_filters[i], false)); + RETURN_IF_ERROR(state->register_producer_runtime_filter( + p._runtime_filter_descs[i], p._need_local_merge, &_runtime_filters[i], false)); } return Status::OK(); } @@ -51,9 +51,11 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkSta NestedLoopJoinBuildSinkOperatorX::NestedLoopJoinBuildSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs) + const DescriptorTbl& descs, + bool need_local_merge) : JoinBuildSinkOperatorX(pool, operator_id, tnode, descs), + _need_local_merge(need_local_merge), _is_output_left_side_only(tnode.nested_loop_join_node.__isset.is_output_left_side_only && tnode.nested_loop_join_node.is_output_left_side_only), _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {} diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h b/be/src/pipeline/exec/nested_loop_join_build_operator.h index 801d4ff88ea2c03..52f723b13aed154 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h @@ -78,7 +78,7 @@ class NestedLoopJoinBuildSinkOperatorX final : public JoinBuildSinkOperatorX { public: NestedLoopJoinBuildSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs); + const DescriptorTbl& descs, bool need_local_merge); Status init(const TDataSink& tsink) override { return Status::InternalError( "{} should not init with TDataSink", @@ -105,6 +105,7 @@ class NestedLoopJoinBuildSinkOperatorX final vectorized::VExprContextSPtrs _filter_src_expr_ctxs; + bool _need_local_merge; const bool _is_output_left_side_only; RowDescriptor _row_descriptor; }; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 55f6506115726eb..6e321f6ca7e4a83 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -1092,8 +1092,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN _dag[downstream_pipeline_id].push_back(build_side_pipe->id()); DataSinkOperatorXPtr sink; - sink.reset( - new NestedLoopJoinBuildSinkOperatorX(pool, next_sink_operator_id(), tnode, descs)); + sink.reset(new NestedLoopJoinBuildSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, + _need_local_merge)); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(build_side_pipe->set_sink(sink)); RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, _runtime_state.get())); diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/AddPartitionRecord.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/AddPartitionRecord.java index 9bc5ff7da0fd0e5..cd872bec1bc0ead 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/AddPartitionRecord.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/AddPartitionRecord.java @@ -84,7 +84,7 @@ public AddPartitionRecord(long commitSeq, PartitionPersistInfo partitionPersistI sb.append(")"); } sb.append("(\"version_info\" = \""); - sb.append(partition.getVisibleVersion()).append("\""); + sb.append(partition.getCachedVisibleVersion()).append("\""); sb.append(");"); this.sql = sb.toString(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java index 1c9ec4e49a35856..a41502e395f91cd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java @@ -161,6 +161,10 @@ public void updateVisibleVersionAndTime(long visibleVersion, long visibleVersion this.setVisibleVersionAndTime(visibleVersion, visibleVersionTime); } + public long getCachedVisibleVersion() { + return visibleVersion; + } + public long getVisibleVersion() { return visibleVersion; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java index 6336be08ee473b1..0f613b6c442d618 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java @@ -100,6 +100,11 @@ public void setCachedVisibleVersion(long version) { lock.unlock(); } + @Override + public long getCachedVisibleVersion() { + return super.getVisibleVersion(); + } + @Override public long getVisibleVersion() { if (LOG.isDebugEnabled()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java index 666b131061ec7df..c1beae813bf2116 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java @@ -391,6 +391,7 @@ public Set computeInputSlotIds(Analyzer analyzer) throws NotImplementedE } } allRequestIds.forEach(allUnRequestIds::remove); + groupingSlotIds.forEach(allUnRequestIds::remove); allUnRequestIds.forEach(result::remove); } return result; diff --git a/regression-test/data/correctness_p0/test_inlineview_with_project.out b/regression-test/data/correctness_p0/test_inlineview_with_project.out index fad0b3fe4460a5c..e6ed3ee5706de91 100644 --- a/regression-test/data/correctness_p0/test_inlineview_with_project.out +++ b/regression-test/data/correctness_p0/test_inlineview_with_project.out @@ -26,3 +26,6 @@ -- !select5 -- 3 +-- !select5 -- +3 + diff --git a/regression-test/data/nereids_p0/aggregate/aggregate.out b/regression-test/data/nereids_p0/aggregate/aggregate.out index 584fc855e0b7141..f81a14f0ce02b9a 100644 --- a/regression-test/data/nereids_p0/aggregate/aggregate.out +++ b/regression-test/data/nereids_p0/aggregate/aggregate.out @@ -377,3 +377,5 @@ TESTING AGAIN -- !select_quantile_percent -- 5000.0 +-- !having_with_limit -- +7 -32767.0 diff --git a/regression-test/suites/correctness_p0/test_inlineview_with_project.groovy b/regression-test/suites/correctness_p0/test_inlineview_with_project.groovy index 78eb76b5c42cbf1..e68c2033e559848 100644 --- a/regression-test/suites/correctness_p0/test_inlineview_with_project.groovy +++ b/regression-test/suites/correctness_p0/test_inlineview_with_project.groovy @@ -603,6 +603,67 @@ suite("test_inlineview_with_project") { ) as a; """ + qt_select5 """ + select + count(*) + from + ( + select + cast(random() * 10000000000000000 as bigint) as id, + ga.column1 as column1, + ga.column6 as column2, + CAST(count(CAST(ga.column1 AS CHAR)) AS CHAR) as column3 + from + ( + select + t1.id as id, + upper(t1.caseId) as column1, + t1.`timestamp` as column2, + lower(t1.content) as column6 + from + ( + select + id, + caseId, + content, + `timestamp` + from + ( + select + id, + caseId, + content, + `timestamp` + from + dr_user_test_t2 + ) aaa + ) t1 + left join ( + select + id, + caseId, + content, + `timestamp` + from + ( + select + id, + caseId, + content, + `timestamp` + from + dr_user_test_t2 + ) bbb + ) t2 on t1.id = t2.id + ) as ga + group by + ga.column1, + ga.column6 + ) as tda; + + + """ + sql """DROP TABLE IF EXISTS `dr_user_test_t1`;""" sql """DROP TABLE IF EXISTS `dr_user_test_t2`;""" } diff --git a/regression-test/suites/nereids_p0/aggregate/aggregate.groovy b/regression-test/suites/nereids_p0/aggregate/aggregate.groovy index 2446fcf09239bcc..760333f3e7821f7 100644 --- a/regression-test/suites/nereids_p0/aggregate/aggregate.groovy +++ b/regression-test/suites/nereids_p0/aggregate/aggregate.groovy @@ -343,4 +343,10 @@ suite("aggregate") { """ exception "aggregate function cannot contain aggregate parameters" } + + sql " set parallel_pipeline_task_num = 1; " + sql " set enable_pipeline_x_engine = 1; " + qt_having_with_limit """ + select k1 as k, avg(k2) as k2 from tempbaseall group by k1 having k2 < -32765 limit 1; + """ }