diff --git a/velox/common/memory/SharedArbitrator.cpp b/velox/common/memory/SharedArbitrator.cpp index 743c8c68eb4a8..23e9be597925a 100644 --- a/velox/common/memory/SharedArbitrator.cpp +++ b/velox/common/memory/SharedArbitrator.cpp @@ -758,10 +758,8 @@ uint64_t SharedArbitrator::reclaim( VELOX_MEM_LOG(ERROR) << "Failed to reclaim from memory pool " << pool->name() << ", aborting it: " << e.what(); abort(pool, std::current_exception()); - // Free up all the free capacity from the aborted pool as the associated - // query has failed at this point. - pool->shrink(); } + pool->shrink(); const uint64_t newCapacity = pool->capacity(); VELOX_CHECK_GE(oldCapacity, newCapacity); reclaimedBytes = oldCapacity - newCapacity; diff --git a/velox/common/memory/tests/MockSharedArbitratorTest.cpp b/velox/common/memory/tests/MockSharedArbitratorTest.cpp index d53a3779a8bd8..c08cadb155e67 100644 --- a/velox/common/memory/tests/MockSharedArbitratorTest.cpp +++ b/velox/common/memory/tests/MockSharedArbitratorTest.cpp @@ -323,10 +323,13 @@ class MockMemoryOperator { allocIt = allocations_.erase(allocIt); } totalBytes_ -= bytesReclaimed; + const auto oldReservedBytes = pool_->reservedBytes(); for (const auto& allocation : allocationsToFree) { pool_->free(allocation.buffer, allocation.size); } - return pool_->shrink(targetBytes); + const auto newReservedBytes = pool_->reservedBytes(); + VELOX_CHECK_GE(oldReservedBytes, newReservedBytes); + return newReservedBytes - oldReservedBytes; } void abort(MemoryPool* pool) { @@ -676,11 +679,12 @@ TEST_F(MockSharedArbitrationTest, shrinkPools) { std::string debugString() const { std::stringstream tasksOss; for (const auto& testTask : testTasks) { + tasksOss << "["; tasksOss << testTask.debugString(); - tasksOss << ","; + tasksOss << "], "; } return fmt::format( - "taskTests: [{}], targetBytes: {}, expectedFreedBytes: {}, expectedFreeCapacity: {}, expectedReservedFreeCapacity: {}, allowSpill: {}, allowAbort: {}", + "testTasks: [{}], targetBytes: {}, expectedFreedBytes: {}, expectedFreeCapacity: {}, expectedReservedFreeCapacity: {}, allowSpill: {}, allowAbort: {}", tasksOss.str(), succinctBytes(targetBytes), succinctBytes(expectedFreedBytes), @@ -905,8 +909,8 @@ TEST_F(MockSharedArbitrationTest, shrinkPools) { memoryPoolReserveCapacity, false}}, 12 << 20, - 12 << 20, - 18 << 20, + 16 << 20, + 22 << 20, reservedMemoryCapacity, true, false}, diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index 932289c52f53f..3b2a51223115c 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -650,8 +650,11 @@ uint64_t Operator::MemoryReclaimer::reclaim( auto reclaimBytes = memory::MemoryReclaimer::run( [&]() { + const auto oldReservedBytes = pool->reservedBytes(); op_->reclaim(targetBytes, stats); - return pool->shrink(targetBytes); + const auto newReservedBytes = pool->reservedBytes(); + VELOX_CHECK_GE(oldReservedBytes, newReservedBytes); + return oldReservedBytes - newReservedBytes; }, stats); diff --git a/velox/exec/tests/AggregationTest.cpp b/velox/exec/tests/AggregationTest.cpp index 66505d6d53e0e..b9cbd57f6b270 100644 --- a/velox/exec/tests/AggregationTest.cpp +++ b/velox/exec/tests/AggregationTest.cpp @@ -2238,7 +2238,7 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringReserve) { folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_), reclaimerStats_); ASSERT_GT(reclaimerStats_.reclaimExecTimeUs, 0); - ASSERT_GT(reclaimerStats_.reclaimedBytes, 0); + ASSERT_GE(reclaimerStats_.reclaimedBytes, 0); reclaimerStats_.reset(); // The hash table itself in the grouping set is not cleared so it still // uses some memory. diff --git a/velox/exec/tests/HashJoinTest.cpp b/velox/exec/tests/HashJoinTest.cpp index d3ae9ec21603e..cf82308bf9ec0 100644 --- a/velox/exec/tests/HashJoinTest.cpp +++ b/velox/exec/tests/HashJoinTest.cpp @@ -5895,7 +5895,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringOutputProcessing) { op, folly::Random::oneIn(2) ? 0 : folly::Random::rand32(), reclaimerStats_); - ASSERT_GT(reclaimerStats_.reclaimedBytes, 0); + ASSERT_GE(reclaimerStats_.reclaimedBytes, 0); ASSERT_GT(reclaimerStats_.reclaimExecTimeUs, 0); // No reclaim as the operator has started output processing. ASSERT_EQ(usedMemoryBytes, op->pool()->currentBytes());