Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryange committed Mar 30, 2024
1 parent 2b94cd1 commit 197a4ec
Showing 1 changed file with 17 additions and 8 deletions.
25 changes: 17 additions & 8 deletions be/src/pipeline/pipeline_x/pipeline_x_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,16 +189,25 @@ Status PipelineXTask::_open() {
_dry_run = _sink->should_dry_run(_state);
for (auto& o : _operators) {
auto* local_state = _state->get_local_state(o->operator_id());
auto st = local_state->open(_state);
if (st.is<ErrorCode::PIP_WAIT_FOR_RF>()) {
DCHECK(_filter_dependency);
_blocked_dep = _filter_dependency->is_blocked_by(this);
if (_blocked_dep) {
set_state(PipelineTaskState::BLOCKED_FOR_RF);
// Here, it needs to loop twice because it's possible that when "open" happens,
// the filter is not ready yet.
// However, during the execution of "is_blocked_by," the filter may become ready,
// so it needs to be "open" again.
for (size_t i = 0; i < 2; i++) {
auto st = local_state->open(_state);
if (st.is<ErrorCode::PIP_WAIT_FOR_RF>()) {
DCHECK(_filter_dependency);
_blocked_dep = _filter_dependency->is_blocked_by(this);
if (_blocked_dep) {
set_state(PipelineTaskState::BLOCKED_FOR_RF);
RETURN_IF_ERROR(st);
} else if (i == 1) {
return Status::InternalError("Unknown RF error, task was blocked by RF twice");
}
} else {
RETURN_IF_ERROR(st);
break;
}
} else {
RETURN_IF_ERROR(st);
}
}
RETURN_IF_ERROR(_state->get_sink_local_state()->open(_state));
Expand Down

0 comments on commit 197a4ec

Please sign in to comment.