Skip to content

Commit

Permalink
Let operator not shrink pool after reclaim
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed May 23, 2024
1 parent 8f5d346 commit 1892cbb
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 10 deletions.
3 changes: 1 addition & 2 deletions velox/common/memory/SharedArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -758,10 +758,9 @@ uint64_t SharedArbitrator::reclaim(
VELOX_MEM_LOG(ERROR) << "Failed to reclaim from memory pool "
<< pool->name() << ", aborting it: " << e.what();
abort(pool, std::current_exception());
// Free up all the free capacity from the aborted pool as the associated
// query has failed at this point.
pool->shrink();
}
pool->shrink(bytesToReclaim);
const uint64_t newCapacity = pool->capacity();
VELOX_CHECK_GE(oldCapacity, newCapacity);
reclaimedBytes = oldCapacity - newCapacity;
Expand Down
10 changes: 7 additions & 3 deletions velox/common/memory/tests/MockSharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,10 +323,13 @@ class MockMemoryOperator {
allocIt = allocations_.erase(allocIt);
}
totalBytes_ -= bytesReclaimed;
const auto oldReservedBytes = pool_->reservedBytes();
for (const auto& allocation : allocationsToFree) {
pool_->free(allocation.buffer, allocation.size);
}
return pool_->shrink(targetBytes);
const auto newReservedBytes = pool_->reservedBytes();
VELOX_CHECK_GE(oldReservedBytes, newReservedBytes);
return newReservedBytes - oldReservedBytes;
}

void abort(MemoryPool* pool) {
Expand Down Expand Up @@ -676,11 +679,12 @@ TEST_F(MockSharedArbitrationTest, shrinkPools) {
std::string debugString() const {
std::stringstream tasksOss;
for (const auto& testTask : testTasks) {
tasksOss << "[";
tasksOss << testTask.debugString();
tasksOss << ",";
tasksOss << "], ";
}
return fmt::format(
"taskTests: [{}], targetBytes: {}, expectedFreedBytes: {}, expectedFreeCapacity: {}, expectedReservedFreeCapacity: {}, allowSpill: {}, allowAbort: {}",
"testTasks: [{}], targetBytes: {}, expectedFreedBytes: {}, expectedFreeCapacity: {}, expectedReservedFreeCapacity: {}, allowSpill: {}, allowAbort: {}",
tasksOss.str(),
succinctBytes(targetBytes),
succinctBytes(expectedFreedBytes),
Expand Down
5 changes: 4 additions & 1 deletion velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -650,8 +650,11 @@ uint64_t Operator::MemoryReclaimer::reclaim(

auto reclaimBytes = memory::MemoryReclaimer::run(
[&]() {
const auto oldReservedBytes = pool->reservedBytes();
op_->reclaim(targetBytes, stats);
return pool->shrink(targetBytes);
const auto newReservedBytes = pool->reservedBytes();
VELOX_CHECK_GE(oldReservedBytes, newReservedBytes);
return oldReservedBytes - newReservedBytes;
},
stats);

Expand Down
2 changes: 1 addition & 1 deletion velox/exec/tests/AggregationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2238,7 +2238,7 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringReserve) {
folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_),
reclaimerStats_);
ASSERT_GT(reclaimerStats_.reclaimExecTimeUs, 0);
ASSERT_GT(reclaimerStats_.reclaimedBytes, 0);
ASSERT_GE(reclaimerStats_.reclaimedBytes, 0);
reclaimerStats_.reset();
// The hash table itself in the grouping set is not cleared so it still
// uses some memory.
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/tests/HashJoinTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5895,7 +5895,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringOutputProcessing) {
op,
folly::Random::oneIn(2) ? 0 : folly::Random::rand32(),
reclaimerStats_);
ASSERT_GT(reclaimerStats_.reclaimedBytes, 0);
ASSERT_GE(reclaimerStats_.reclaimedBytes, 0);
ASSERT_GT(reclaimerStats_.reclaimExecTimeUs, 0);
// No reclaim as the operator has started output processing.
ASSERT_EQ(usedMemoryBytes, op->pool()->currentBytes());
Expand Down Expand Up @@ -6040,7 +6040,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringWaitForProbe) {
op,
folly::Random::oneIn(2) ? 0 : folly::Random::rand32(),
reclaimerStats_);
ASSERT_GT(reclaimerStats_.reclaimedBytes, 0);
ASSERT_GE(reclaimerStats_.reclaimedBytes, 0);
ASSERT_GT(reclaimerStats_.reclaimExecTimeUs, 0);
// No reclaim as the build operator is not in building table state.
ASSERT_EQ(usedMemoryBytes, op->pool()->currentBytes());
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/tests/OrderByTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1033,7 +1033,7 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringOutputProcessing) {
ASSERT_GT(reclaimableBytes, 0);
reclaimerStats_.reset();
reclaimAndRestoreCapacity(op, reclaimableBytes, reclaimerStats_);
ASSERT_EQ(reclaimerStats_.reclaimedBytes, reclaimableBytes);
ASSERT_LE(reclaimerStats_.reclaimedBytes, reclaimableBytes);
ASSERT_GT(reclaimerStats_.reclaimExecTimeUs, 0);
} else {
ASSERT_EQ(reclaimableBytes, 0);
Expand Down

0 comments on commit 1892cbb

Please sign in to comment.