Skip to content

Commit

Permalink
Revert "Use shrink pools by arbitrator for spill test in hash join un…
Browse files Browse the repository at this point in the history
…i test (facebookincubator#8932)"

This reverts commit f391c02.
  • Loading branch information
PHILO-HE committed Mar 4, 2024
1 parent 4109261 commit cf23b9f
Show file tree
Hide file tree
Showing 11 changed files with 39 additions and 61 deletions.
14 changes: 0 additions & 14 deletions velox/common/memory/MemoryArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,18 +457,4 @@ MemoryArbitrationContext* memoryArbitrationContext() {
bool underMemoryArbitration() {
return memoryArbitrationContext() != nullptr;
}

void testingRunArbitration(uint64_t targetBytes, MemoryManager* manager) {
if (manager == nullptr) {
manager = memory::memoryManager();
}
manager->shrinkPools(targetBytes);
}

void testingRunArbitration(MemoryPool* pool, uint64_t targetBytes) {
pool->enterArbitration();
static_cast<MemoryPoolImpl*>(pool)->testingManager()->shrinkPools(
targetBytes);
pool->leaveArbitration();
}
} // namespace facebook::velox::memory
14 changes: 0 additions & 14 deletions velox/common/memory/MemoryArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -383,18 +383,4 @@ MemoryArbitrationContext* memoryArbitrationContext();

/// Returns true if the running thread is under memory arbitration or not.
bool underMemoryArbitration();

/// The function triggers memory arbitration by shrinking memory pools from
/// 'manager' by invoking shrinkPools API. If 'manager' is not set, then it
/// shrinks from the process wide memory manager. If 'targetBytes' is zero, then
/// reclaims all the memory from 'manager' if possible.
class MemoryManager;
void testingRunArbitration(
uint64_t targetBytes = 0,
MemoryManager* manager = nullptr);

/// The function triggers memory arbitration by shrinking memory pools from
/// 'manager' of 'pool' by invoking its shrinkPools API. If 'targetBytes' is
/// zero, then reclaims all the memory from 'manager' if possible.
void testingRunArbitration(MemoryPool* pool, uint64_t targetBytes = 0);
} // namespace facebook::velox::memory
4 changes: 0 additions & 4 deletions velox/common/memory/MemoryPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -679,10 +679,6 @@ class MemoryPoolImpl : public MemoryPool {

void testingSetCapacity(int64_t bytes);

MemoryManager* testingManager() const {
return manager_;
}

MemoryAllocator* testingAllocator() const {
return allocator_;
}
Expand Down
6 changes: 2 additions & 4 deletions velox/core/QueryCtx.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,8 @@ class QueryCtx {

void initPool(const std::string& queryId) {
if (pool_ == nullptr) {
pool_ = memory::memoryManager()->addRootPool(
QueryCtx::generatePoolName(queryId),
memory::kMaxMemory,
memory::MemoryReclaimer::create());
pool_ = memory::deprecatedDefaultMemoryManager().addRootPool(
QueryCtx::generatePoolName(queryId));
}
}

Expand Down
15 changes: 8 additions & 7 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,6 @@ bool HashBuild::ensureInputFits(RowVectorPtr& input) {
bool HashBuild::reserveMemory(const RowVectorPtr& input) {
VELOX_CHECK(spillEnabled());

Operator::ReclaimableSectionGuard guard(this);
numSpillRows_ = 0;
numSpillBytes_ = 0;

Expand All @@ -469,10 +468,9 @@ bool HashBuild::reserveMemory(const RowVectorPtr& input) {
if (numRows != 0) {
// Test-only spill path.
if (testingTriggerSpill()) {
memory::testingRunArbitration(pool());
// NOTE: the memory arbitration should have triggered spilling on this
// hash build operator so we return true to indicate have enough memory.
return true;
numSpillRows_ = std::max<int64_t>(1, numRows / 10);
numSpillBytes_ = numSpillRows_ * outOfLineBytesPerRow;
return false;
}

// We check usage from the parent pool to take peers' allocations into
Expand Down Expand Up @@ -524,8 +522,11 @@ bool HashBuild::reserveMemory(const RowVectorPtr& input) {
incrementBytes * 2,
currentUsage * spillConfig_->spillableReservationGrowthPct / 100);

if (pool()->maybeReserve(targetIncrementBytes)) {
return true;
{
Operator::ReclaimableSectionGuard guard(this);
if (pool()->maybeReserve(targetIncrementBytes)) {
return true;
}
}

LOG(WARNING) << "Failed to reserve " << succinctBytes(targetIncrementBytes)
Expand Down
6 changes: 1 addition & 5 deletions velox/exec/tests/HashJoinTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "velox/exec/HashBuild.h"
#include "velox/exec/HashJoinBridge.h"
#include "velox/exec/PlanNodeStats.h"
#include "velox/exec/TableScan.h"
#include "velox/exec/tests/utils/ArbitratorTestUtil.h"
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
#include "velox/exec/tests/utils/Cursor.h"
Expand Down Expand Up @@ -733,11 +734,6 @@ class HashJoinBuilder {

class HashJoinTest : public HiveConnectorTestBase {
protected:
static void SetUpTestCase() {
FLAGS_velox_testing_enable_arbitration = true;
HiveConnectorTestBase::SetUpTestCase();
}

HashJoinTest() : HashJoinTest(TestParam(1)) {}

explicit HashJoinTest(const TestParam& param)
Expand Down
17 changes: 17 additions & 0 deletions velox/exec/tests/utils/ArbitratorTestUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,4 +348,21 @@ QueryTestResult runWriteTask(
}
return result;
}

void testingRunArbitration(
memory::MemoryPool* pool,
uint64_t targetBytes,
memory::MemoryManager* manager) {
if (manager == nullptr) {
manager = memory::memoryManager();
}
if (pool != nullptr) {
pool->enterArbitration();
manager->shrinkPools(targetBytes);
pool->leaveArbitration();
} else {
manager->shrinkPools(targetBytes);
}
}

} // namespace facebook::velox::exec::test
11 changes: 11 additions & 0 deletions velox/exec/tests/utils/ArbitratorTestUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,4 +179,15 @@ QueryTestResult runWriteTask(
const std::string& kHiveConnectorId,
bool enableSpilling,
const RowVectorPtr& expectedResult = nullptr);

/// The function triggers memory arbitration by shrinking memory pools from
/// 'manager' by invoking shrinkPools API. If 'manager' is not set, then it
/// shrinks from the process wide memory manager. If 'pool' is provided, the
/// function puts 'pool' in arbitration state before the arbitration to ease
/// test use. If 'targetBytes' is zero, then reclaims all the memory from
/// 'manager' if possible.
void testingRunArbitration(
memory::MemoryPool* pool = nullptr,
uint64_t targetBytes = 0,
memory::MemoryManager* manager = nullptr);
} // namespace facebook::velox::exec::test
10 changes: 0 additions & 10 deletions velox/exec/tests/utils/OperatorTestBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@

DECLARE_bool(velox_memory_leak_check_enabled);
DECLARE_bool(velox_enable_memory_usage_track_in_default_memory_pool);
DEFINE_bool(
velox_testing_enable_arbitration,
false,
"Enable to turn on arbitration for tests by default");

using namespace facebook::velox::common::testutil;

Expand All @@ -62,12 +58,6 @@ void OperatorTestBase::SetUpTestCase() {
exec::SharedArbitrator::registerFactory();
memory::MemoryManagerOptions options;
options.allocatorCapacity = 8L << 30;
if (FLAGS_velox_testing_enable_arbitration) {
options.arbitratorCapacity = 6L << 30;
options.arbitratorKind = "SHARED";
options.checkUsageLeak = true;
options.arbitrationStateCheckCb = memoryArbitrationStateCheck;
}
memory::MemoryManager::testingSetInstance(options);
asyncDataCache_ = cache::AsyncDataCache::create(memoryManager()->allocator());
cache::AsyncDataCache::setInstance(asyncDataCache_.get());
Expand Down
2 changes: 0 additions & 2 deletions velox/exec/tests/utils/OperatorTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
#include "velox/vector/tests/utils/VectorMaker.h"
#include "velox/vector/tests/utils/VectorTestBase.h"

DECLARE_bool(velox_testing_enable_arbitration);

namespace facebook::velox::exec::test {
class OperatorTestBase : public testing::Test,
public velox::test::VectorTestBase {
Expand Down
1 change: 0 additions & 1 deletion velox/expression/tests/FuzzerRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,6 @@ int FuzzerRunner::run(
void FuzzerRunner::runFromGtest(
size_t seed,
const std::unordered_set<std::string>& skipFunctions) {
memory::MemoryManager::testingSetInstance({});
auto signatures = facebook::velox::getFunctionSignatures();
ExpressionFuzzerVerifier(
signatures, seed, getExpressionFuzzerVerifierOptions(skipFunctions))
Expand Down

0 comments on commit cf23b9f

Please sign in to comment.