Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
jacktengg committed Mar 29, 2024
1 parent ac83efc commit 243b57b
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 19 deletions.
18 changes: 13 additions & 5 deletions be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ Status PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state
uint32_t partition_index) {
auto& partitioned_build_blocks = _shared_state->partitioned_build_blocks;
auto& mutable_block = partitioned_build_blocks[partition_index];
if (!mutable_block || mutable_block->bytes() < 32 * 1024) {
if (!mutable_block ||
mutable_block->allocated_bytes() < vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
--_spilling_task_count;
return Status::OK();
}
Expand Down Expand Up @@ -232,7 +233,8 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat

auto& blocks = _probe_blocks[partition_index];
auto& partitioned_block = _partitioned_blocks[partition_index];
if (partitioned_block && partitioned_block->bytes() >= 32 * 1024) {
if (partitioned_block && partitioned_block->allocated_bytes() >=
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
blocks.emplace_back(partitioned_block->to_block());
partitioned_block.reset();
}
Expand Down Expand Up @@ -705,16 +707,22 @@ size_t PartitionedHashJoinProbeOperatorX::revocable_mem_size(RuntimeState* state
for (uint32_t i = spilling_start; i < _partition_count; ++i) {
auto& build_block = partitioned_build_blocks[i];
if (build_block) {
mem_size += build_block->bytes();
auto block_bytes = build_block->allocated_bytes();
if (block_bytes >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
mem_size += build_block->allocated_bytes();
}
}

for (auto& block : probe_blocks[i]) {
mem_size += block.bytes();
mem_size += block.allocated_bytes();
}

auto& partitioned_block = local_state._partitioned_blocks[i];
if (partitioned_block) {
mem_size += partitioned_block->bytes();
auto block_bytes = partitioned_block->allocated_bytes();
if (block_bytes >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
mem_size += block_bytes;
}
}
}
return mem_size;
Expand Down
8 changes: 6 additions & 2 deletions be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
vectorized::SpillStreamSPtr& spilling_stream = _shared_state->spilled_streams[i];
auto& mutable_block = _shared_state->partitioned_build_blocks[i];

if (!mutable_block || mutable_block->bytes() < 32 * 1024) {
if (!mutable_block ||
mutable_block->allocated_bytes() < vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
--_spilling_streams_count;
continue;
}
Expand Down Expand Up @@ -256,7 +257,10 @@ size_t PartitionedHashJoinSinkOperatorX::revocable_mem_size(RuntimeState* state)
for (uint32_t i = 0; i != _partition_count; ++i) {
auto& block = partitioned_blocks[i];
if (block) {
mem_size += block->bytes();
auto block_bytes = block->allocated_bytes();
if (block_bytes >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
mem_size += block_bytes;
}
}
}
return mem_size;
Expand Down
16 changes: 6 additions & 10 deletions be/src/pipeline/pipeline_x/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -379,11 +379,9 @@ class PipelineXLocalState : public PipelineXLocalStateBase {
}

void dec_running_big_mem_op_num(RuntimeState* state) {
if (!_big_mem_op_num_added) {
if (!_big_mem_op_num_deced) {
state->get_query_ctx()->dec_running_big_mem_op_num();
_big_mem_op_num_deced = true;
}
if (_big_mem_op_num_added && !_big_mem_op_num_deced) {
state->get_query_ctx()->dec_running_big_mem_op_num();
_big_mem_op_num_deced = true;
}
}

Expand Down Expand Up @@ -694,11 +692,9 @@ class PipelineXSinkLocalState : public PipelineXSinkLocalStateBase {
}

void dec_running_big_mem_op_num(RuntimeState* state) {
if (!_big_mem_op_num_added) {
if (!_big_mem_op_num_deced) {
state->get_query_ctx()->dec_running_big_mem_op_num();
_big_mem_op_num_deced = true;
}
if (_big_mem_op_num_added && !_big_mem_op_num_deced) {
state->get_query_ctx()->dec_running_big_mem_op_num();
_big_mem_op_num_deced = true;
}
}

Expand Down
6 changes: 4 additions & 2 deletions be/src/pipeline/pipeline_x/pipeline_x_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,10 @@ bool PipelineXTask::should_revoke_memory(RuntimeState* state, int64_t revocable_

const auto min_revocable_mem_bytes = state->min_revocable_mem();
LOG_EVERY_N(INFO, 5) << "revoke memory, low water mark, revocable_mem_bytes: "
<< revocable_mem_bytes << ", mem_limit_of_op: " << mem_limit_of_op
<< ", min_revocable_mem_bytes: " << min_revocable_mem_bytes;
<< revocable_mem_bytes
<< ", mem_limit_of_op: " << PrettyPrinter::print_bytes(mem_limit_of_op)
<< ", min_revocable_mem_bytes: "
<< PrettyPrinter::print_bytes(min_revocable_mem_bytes);
return (revocable_mem_bytes > mem_limit_of_op ||
revocable_mem_bytes > min_revocable_mem_bytes);
} else {
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/spill/spill_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class SpillDataDir;

class SpillStream {
public:
// to avoid too many small file writes
static constexpr int MIN_SPILL_WRITE_BATCH_MEM = 32 * 1024;
SpillStream(RuntimeState* state, int64_t stream_id, SpillDataDir* data_dir,
std::string spill_dir, size_t batch_rows, size_t batch_bytes,
RuntimeProfile* profile);
Expand Down

0 comments on commit 243b57b

Please sign in to comment.