Skip to content

Commit

Permalink
Merge branch 'master' into fix-table-version-regression-test
Browse files Browse the repository at this point in the history
  • Loading branch information
xy720 authored Mar 30, 2024
2 parents d8744ec + 2a7876b commit 68b00c6
Show file tree
Hide file tree
Showing 17 changed files with 113 additions and 11 deletions.
1 change: 1 addition & 0 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2004,6 +2004,7 @@ void calc_delete_bimtap_callback(CloudStorageEngine& engine, const TAgentTaskReq
const auto& calc_delete_bitmap_req = req.calc_delete_bitmap_req;
CloudEngineCalcDeleteBitmapTask engine_task(engine, calc_delete_bitmap_req, &error_tablet_ids,
&succ_tablet_ids);
SCOPED_ATTACH_TASK(engine_task.mem_tracker());
status = engine_task.execute();

TFinishTaskRequest finish_task_request;
Expand Down
3 changes: 2 additions & 1 deletion be/src/cloud/cloud_delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ Status CloudDeltaWriter::batch_init(std::vector<CloudDeltaWriter*> writers) {
if (writer->_is_init || writer->_is_cancelled) {
return Status::OK();
}
return writer->init();
Status st = writer->init(); // included in SCOPED_ATTACH_TASK
return st;
});
}

Expand Down
15 changes: 13 additions & 2 deletions be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "cloud/cloud_engine_calc_delete_bitmap_task.h"

#include <fmt/format.h>

#include <memory>

#include "cloud/cloud_meta_mgr.h"
Expand All @@ -29,6 +31,7 @@
#include "olap/tablet_meta.h"
#include "olap/txn_manager.h"
#include "olap/utils.h"
#include "runtime/memory/mem_tracker_limiter.h"

namespace doris {

Expand All @@ -38,7 +41,10 @@ CloudEngineCalcDeleteBitmapTask::CloudEngineCalcDeleteBitmapTask(
: _engine(engine),
_cal_delete_bitmap_req(cal_delete_bitmap_req),
_error_tablet_ids(error_tablet_ids),
_succ_tablet_ids(succ_tablet_ids) {}
_succ_tablet_ids(succ_tablet_ids) {
_mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::SCHEMA_CHANGE,
"CloudEngineCalcDeleteBitmapTask");
}

void CloudEngineCalcDeleteBitmapTask::add_error_tablet_id(int64_t tablet_id, const Status& err) {
std::lock_guard<std::mutex> lck(_mutex);
Expand Down Expand Up @@ -126,9 +132,14 @@ CloudTabletCalcDeleteBitmapTask::CloudTabletCalcDeleteBitmapTask(
_engine_calc_delete_bitmap_task(engine_task),
_tablet(tablet),
_transaction_id(transaction_id),
_version(version) {}
_version(version) {
_mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::SCHEMA_CHANGE,
fmt::format("CloudTabletCalcDeleteBitmapTask#_transaction_id={}", _transaction_id));
}

void CloudTabletCalcDeleteBitmapTask::handle() const {
SCOPED_ATTACH_TASK(_mem_tracker);
RowsetSharedPtr rowset;
DeleteBitmapPtr delete_bitmap;
RowsetIdUnorderedSet rowset_ids;
Expand Down
2 changes: 2 additions & 0 deletions be/src/cloud/cloud_engine_calc_delete_bitmap_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
namespace doris {

class CloudEngineCalcDeleteBitmapTask;
class MemTrackerLimiter;

class CloudTabletCalcDeleteBitmapTask {
public:
Expand All @@ -46,6 +47,7 @@ class CloudTabletCalcDeleteBitmapTask {
std::shared_ptr<CloudTablet> _tablet;
int64_t _transaction_id;
int64_t _version;
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
};

class CloudEngineCalcDeleteBitmapTask : public EngineTask {
Expand Down
1 change: 1 addition & 0 deletions be/src/io/cache/fs_file_cache_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ Status FSFileCacheStorage::rebuild_data_structure() const {
auto rebuild_dir = [&](std::filesystem::directory_iterator& upgrade_key_it) -> Status {
for (; upgrade_key_it != std::filesystem::directory_iterator(); ++upgrade_key_it) {
if (upgrade_key_it->path().filename().native().find('_') == std::string::npos) {
RETURN_IF_ERROR(fs->delete_directory(upgrade_key_it->path().native() + "_0"));
RETURN_IF_ERROR(
fs->rename(upgrade_key_it->path(), upgrade_key_it->path().native() + "_0"));
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,8 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPla
_is_first_phase(tnode.agg_node.__isset.is_first_phase && tnode.agg_node.is_first_phase),
_pool(pool),
_limit(tnode.limit),
_have_conjuncts(tnode.__isset.vconjunct && !tnode.vconjunct.nodes.empty()),
_have_conjuncts((tnode.__isset.vconjunct && !tnode.vconjunct.nodes.empty()) ||
(tnode.__isset.conjuncts && !tnode.conjuncts.empty())),
_partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0]
: std::vector<TExpr> {}),
_is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate) {}
Expand Down
8 changes: 5 additions & 3 deletions be/src/pipeline/exec/nested_loop_join_build_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,20 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkSta
}
_runtime_filters.resize(p._runtime_filter_descs.size());
for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) {
RETURN_IF_ERROR(state->register_producer_runtime_filter(p._runtime_filter_descs[i], false,
&_runtime_filters[i], false));
RETURN_IF_ERROR(state->register_producer_runtime_filter(
p._runtime_filter_descs[i], p._need_local_merge, &_runtime_filters[i], false));
}
return Status::OK();
}

NestedLoopJoinBuildSinkOperatorX::NestedLoopJoinBuildSinkOperatorX(ObjectPool* pool,
int operator_id,
const TPlanNode& tnode,
const DescriptorTbl& descs)
const DescriptorTbl& descs,
bool need_local_merge)
: JoinBuildSinkOperatorX<NestedLoopJoinBuildSinkLocalState>(pool, operator_id, tnode,
descs),
_need_local_merge(need_local_merge),
_is_output_left_side_only(tnode.nested_loop_join_node.__isset.is_output_left_side_only &&
tnode.nested_loop_join_node.is_output_left_side_only),
_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {}
Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/exec/nested_loop_join_build_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class NestedLoopJoinBuildSinkOperatorX final
: public JoinBuildSinkOperatorX<NestedLoopJoinBuildSinkLocalState> {
public:
NestedLoopJoinBuildSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode,
const DescriptorTbl& descs);
const DescriptorTbl& descs, bool need_local_merge);
Status init(const TDataSink& tsink) override {
return Status::InternalError(
"{} should not init with TDataSink",
Expand All @@ -105,6 +105,7 @@ class NestedLoopJoinBuildSinkOperatorX final

vectorized::VExprContextSPtrs _filter_src_expr_ctxs;

bool _need_local_merge;
const bool _is_output_left_side_only;
RowDescriptor _row_descriptor;
};
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1092,8 +1092,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
_dag[downstream_pipeline_id].push_back(build_side_pipe->id());

DataSinkOperatorXPtr sink;
sink.reset(
new NestedLoopJoinBuildSinkOperatorX(pool, next_sink_operator_id(), tnode, descs));
sink.reset(new NestedLoopJoinBuildSinkOperatorX(pool, next_sink_operator_id(), tnode, descs,
_need_local_merge));
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, _runtime_state.get()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public AddPartitionRecord(long commitSeq, PartitionPersistInfo partitionPersistI
sb.append(")");
}
sb.append("(\"version_info\" = \"");
sb.append(partition.getVisibleVersion()).append("\"");
sb.append(partition.getCachedVisibleVersion()).append("\"");
sb.append(");");
this.sql = sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ public void updateVisibleVersionAndTime(long visibleVersion, long visibleVersion
this.setVisibleVersionAndTime(visibleVersion, visibleVersionTime);
}

public long getCachedVisibleVersion() {
return visibleVersion;
}

public long getVisibleVersion() {
return visibleVersion;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ public void setCachedVisibleVersion(long version) {
lock.unlock();
}

@Override
public long getCachedVisibleVersion() {
return super.getVisibleVersion();
}

@Override
public long getVisibleVersion() {
if (LOG.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ public Set<SlotId> computeInputSlotIds(Analyzer analyzer) throws NotImplementedE
}
}
allRequestIds.forEach(allUnRequestIds::remove);
groupingSlotIds.forEach(allUnRequestIds::remove);
allUnRequestIds.forEach(result::remove);
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,6 @@
-- !select5 --
3

-- !select5 --
3

2 changes: 2 additions & 0 deletions regression-test/data/nereids_p0/aggregate/aggregate.out
Original file line number Diff line number Diff line change
Expand Up @@ -377,3 +377,5 @@ TESTING AGAIN
-- !select_quantile_percent --
5000.0

-- !having_with_limit --
7 -32767.0
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,67 @@ suite("test_inlineview_with_project") {
) as a;
"""

qt_select5 """
select
count(*)
from
(
select
cast(random() * 10000000000000000 as bigint) as id,
ga.column1 as column1,
ga.column6 as column2,
CAST(count(CAST(ga.column1 AS CHAR)) AS CHAR) as column3
from
(
select
t1.id as id,
upper(t1.caseId) as column1,
t1.`timestamp` as column2,
lower(t1.content) as column6
from
(
select
id,
caseId,
content,
`timestamp`
from
(
select
id,
caseId,
content,
`timestamp`
from
dr_user_test_t2
) aaa
) t1
left join (
select
id,
caseId,
content,
`timestamp`
from
(
select
id,
caseId,
content,
`timestamp`
from
dr_user_test_t2
) bbb
) t2 on t1.id = t2.id
) as ga
group by
ga.column1,
ga.column6
) as tda;
"""

sql """DROP TABLE IF EXISTS `dr_user_test_t1`;"""
sql """DROP TABLE IF EXISTS `dr_user_test_t2`;"""
}
6 changes: 6 additions & 0 deletions regression-test/suites/nereids_p0/aggregate/aggregate.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -343,4 +343,10 @@ suite("aggregate") {
"""
exception "aggregate function cannot contain aggregate parameters"
}

sql " set parallel_pipeline_task_num = 1; "
sql " set enable_pipeline_x_engine = 1; "
qt_having_with_limit """
select k1 as k, avg(k2) as k2 from tempbaseall group by k1 having k2 < -32765 limit 1;
"""
}

0 comments on commit 68b00c6

Please sign in to comment.