diff --git a/velox/common/memory/tests/MemoryArbitratorTest.cpp b/velox/common/memory/tests/MemoryArbitratorTest.cpp index 9ea3434dac9ae..940da805792f4 100644 --- a/velox/common/memory/tests/MemoryArbitratorTest.cpp +++ b/velox/common/memory/tests/MemoryArbitratorTest.cpp @@ -221,6 +221,7 @@ class FakeTestArbitrator : public MemoryArbitrator { } uint64_t shrinkCapacity( + memory::MemoryPool* /*unused*/, const std::vector>& /*unused*/, uint64_t /*unused*/) override { VELOX_NYI(); diff --git a/velox/common/memory/tests/MemoryManagerTest.cpp b/velox/common/memory/tests/MemoryManagerTest.cpp index 17d83ef9cfcbe..cf89fa4b91756 100644 --- a/velox/common/memory/tests/MemoryManagerTest.cpp +++ b/velox/common/memory/tests/MemoryManagerTest.cpp @@ -132,6 +132,7 @@ class FakeTestArbitrator : public MemoryArbitrator { } uint64_t shrinkCapacity( + memory::MemoryPool* /*unused*/, const std::vector>& /*unused*/, uint64_t /*unused*/) override { VELOX_NYI(); diff --git a/velox/common/memory/tests/MockSharedArbitratorTest.cpp b/velox/common/memory/tests/MockSharedArbitratorTest.cpp index 0071a04c1d673..6ae088faf6974 100644 --- a/velox/common/memory/tests/MockSharedArbitratorTest.cpp +++ b/velox/common/memory/tests/MockSharedArbitratorTest.cpp @@ -234,6 +234,7 @@ class MockMemoryOperator { void* allocate(uint64_t bytes) { VELOX_CHECK_EQ(bytes % pool_->alignment(), 0); void* buffer = pool_->allocate(bytes); + allocations_.size(); std::lock_guard l(mu_); totalBytes_ += bytes; allocations_.emplace(buffer, bytes); diff --git a/velox/exec/HashBuild.cpp b/velox/exec/HashBuild.cpp index 86bb552ac4181..b1d4572288a3b 100644 --- a/velox/exec/HashBuild.cpp +++ b/velox/exec/HashBuild.cpp @@ -66,17 +66,10 @@ HashBuild::HashBuild( joinBridge_(operatorCtx_->task()->getHashJoinBridgeLocked( operatorCtx_->driverCtx()->splitGroupId, planNodeId())), - spillMemoryThreshold_( - operatorCtx_->driverCtx()->queryConfig().joinSpillMemoryThreshold()), keyChannelMap_(joinNode_->rightKeys().size()) { VELOX_CHECK(pool()->trackUsage()); VELOX_CHECK_NOT_NULL(joinBridge_); - spillGroup_ = spillEnabled() - ? operatorCtx_->task()->getSpillOperatorGroupLocked( - operatorCtx_->driverCtx()->splitGroupId, planNodeId()) - : nullptr; - joinBridge_->addBuilder(); auto inputType = joinNode_->sources()[1]->outputType(); @@ -208,12 +201,8 @@ void HashBuild::setupSpiller(SpillPartition* spillPartition) { HashBitRange hashBits( spillConfig.startPartitionBit, spillConfig.startPartitionBit + spillConfig.joinPartitionBits); - if (spillPartition == nullptr) { - spillGroup_->addOperator( - *this, - operatorCtx_->driver()->shared_from_this(), - [&](const std::vector& operators) { runSpill(operators); }); - } else { + + if (spillPartition != nullptr) { LOG(INFO) << "Setup reader to read spilled input from " << spillPartition->toString() << ", memory pool: " << pool()->name(); @@ -236,6 +225,8 @@ void HashBuild::setupSpiller(SpillPartition* spillPartition) { hashBits = HashBitRange(startBit, startBit + spillConfig.joinPartitionBits); } + LOG(INFO) << "Setup spiller at spill level " + << spillConfig.joinSpillLevel(hashBits.begin()); spiller_ = std::make_unique( Spiller::Type::kHashJoinBuild, table_->rows(), @@ -437,25 +428,12 @@ bool HashBuild::ensureInputFits(RowVectorPtr& input) { // spilling directly. It is okay as we will accumulate the extra reservation // in the operator's memory pool, and won't make any new reservation if there // is already sufficient reservations. - if (!reserveMemory(input)) { - if (!requestSpill(input)) { - return false; - } - } else { - // Check if any other peer operator has requested group spill. - if (waitSpill(input)) { - return false; - } - } - return true; + return reserveMemory(input); } bool HashBuild::reserveMemory(const RowVectorPtr& input) { VELOX_CHECK(spillEnabled()); - numSpillRows_ = 0; - numSpillBytes_ = 0; - auto* rows = table_->rows(); const auto numRows = rows->numRows(); @@ -469,21 +447,12 @@ bool HashBuild::reserveMemory(const RowVectorPtr& input) { if (numRows != 0) { // Test-only spill path. if (testingTriggerSpill()) { - numSpillRows_ = std::max(1, numRows / 10); - numSpillBytes_ = numSpillRows_ * outOfLineBytesPerRow; - return false; - } - - // We check usage from the parent pool to take peers' allocations into - // account. - const auto nodeUsage = pool()->parent()->currentBytes(); - if (spillMemoryThreshold_ != 0 && nodeUsage > spillMemoryThreshold_) { - const int64_t bytesToSpill = - nodeUsage * spillConfig()->spillableReservationGrowthPct / 100; - numSpillRows_ = std::max( - 1, bytesToSpill / (rows->fixedRowSize() + outOfLineBytesPerRow)); - numSpillBytes_ = numSpillRows_ * outOfLineBytesPerRow; - return false; + Operator::ReclaimableSectionGuard guard(this); + SuspendedSection suspendedSection(operatorCtx_->driver()); + auto shrinkedBytes = memory::memoryManager()->shrinkPools( + std::numeric_limits::max()); + LOG(INFO) << "shrinkedBytes " << succinctBytes(shrinkedBytes); + return true; } } @@ -540,6 +509,8 @@ bool HashBuild::reserveMemory(const RowVectorPtr& input) { void HashBuild::spillInput(const RowVectorPtr& input) { VELOX_CHECK_EQ(input->size(), activeRows_.size()); + // Q(jtan6): Why do we have !spiller_->isAnySpilled() condition here? If + // nothing has spilled we don't spill? if (!spillEnabled() || spiller_ == nullptr || !spiller_->isAnySpilled() || !activeRows_.hasSelections()) { return; @@ -550,17 +521,15 @@ void HashBuild::spillInput(const RowVectorPtr& input) { computeSpillPartitions(input); vector_size_t numSpillInputs = 0; - for (auto row = 0; row < numInput; ++row) { - const auto partition = spillPartitions_[row]; - if (FOLLY_UNLIKELY(!activeRows_.isValid(row))) { - continue; - } - if (!spiller_->isSpilled(partition)) { + for (auto rowIdx = 0; rowIdx < numInput; ++rowIdx) { + const auto partition = spillPartitions_[rowIdx]; + if (FOLLY_UNLIKELY(!activeRows_.isValid(rowIdx))) { continue; } - activeRows_.setValid(row, false); + activeRows_.setValid(rowIdx, false); ++numSpillInputs; - rawSpillInputIndicesBuffers_[partition][numSpillInputs_[partition]++] = row; + rawSpillInputIndicesBuffers_[partition][numSpillInputs_[partition]++] = + rowIdx; } if (numSpillInputs == 0) { return; @@ -646,70 +615,6 @@ void HashBuild::spillPartition( } } -bool HashBuild::requestSpill(RowVectorPtr& input) { - VELOX_CHECK_GT(numSpillRows_, 0); - VELOX_CHECK_GT(numSpillBytes_, 0); - - // If all the partitions have been spilled, then nothing to spill. - if (spiller_->isAllSpilled()) { - return true; - } - - input_ = std::move(input); - if (spillGroup_->requestSpill(*this, future_)) { - VELOX_CHECK(future_.valid()); - setState(State::kWaitForSpill); - return false; - } - input = std::move(input_); - return true; -} - -bool HashBuild::waitSpill(RowVectorPtr& input) { - if (!spillGroup_->needSpill()) { - return false; - } - - if (spillGroup_->waitSpill(*this, future_)) { - VELOX_CHECK(future_.valid()); - input_ = std::move(input); - setState(State::kWaitForSpill); - return true; - } - return false; -} - -void HashBuild::runSpill(const std::vector& spillOperators) { - VELOX_CHECK(spillEnabled()); - VELOX_CHECK(!spiller_->state().isAllPartitionSpilled()); - - uint64_t targetRows = 0; - uint64_t targetBytes = 0; - for (auto& spillOp : spillOperators) { - HashBuild* build = dynamic_cast(spillOp); - VELOX_CHECK_NOT_NULL(build); - build->addAndClearSpillTarget(targetRows, targetBytes); - } - VELOX_CHECK_GT(targetRows, 0); - VELOX_CHECK_GT(targetBytes, 0); - - // TODO: consider to offload the partition spill processing to an executor to - // run in parallel. - for (auto& spillOp : spillOperators) { - HashBuild* build = dynamic_cast(spillOp); - build->spiller_->spill(); - build->table_->clear(); - build->pool()->release(); - } -} - -void HashBuild::addAndClearSpillTarget(uint64_t& numRows, uint64_t& numBytes) { - numRows += numSpillRows_; - numSpillRows_ = 0; - numBytes += numSpillBytes_; - numSpillBytes_ = 0; -} - void HashBuild::noMoreInput() { checkRunning(); @@ -722,10 +627,6 @@ void HashBuild::noMoreInput() { } void HashBuild::noMoreInputInternal() { - if (spillEnabled()) { - spillGroup_->operatorStopped(*this); - } - if (!finishHashBuild()) { return; } @@ -849,7 +750,6 @@ bool HashBuild::finishHashBuild() { if (joinBridge_->setHashTable( std::move(table_), std::move(spillPartitions), joinHasNullKeys_)) { intermediateStateCleared_ = true; - spillGroup_->restart(); } // Release the unused memory reservation since we have finished the merged diff --git a/velox/exec/HashBuild.h b/velox/exec/HashBuild.h index 6359569fccfdc..0b080f22f8216 100644 --- a/velox/exec/HashBuild.h +++ b/velox/exec/HashBuild.h @@ -101,9 +101,7 @@ class HashBuild final : public Operator { // Invoked when operator has finished processing the build input and wait for // all the other drivers to finish the processing. The last driver that // reaches to the hash build barrier, is responsible to build the hash table - // merged from all the other drivers. If the disk spilling is enabled, the - // last driver will also restart 'spillGroup_' and add a new hash build - // barrier for the next round of hash table build operation if it needs. + // merged from all the other drivers. bool finishHashBuild(); // Invoked after the hash table has been built. It waits for any spill data to @@ -194,28 +192,6 @@ class HashBuild final : public Operator { vector_size_t numInput, const SpillPartitionNumSet& spillPartitions); - // Invoked to send group spill request to 'spillGroup_'. The function returns - // true if group spill has been inline executed, otherwise returns false. In - // the latter case, the operator will transition to 'kWaitForSpill' state and - // 'input' will be saved in 'input_' to be processed after the group spill has - // been executed. - bool requestSpill(RowVectorPtr& input); - - // Invoked to check if it needs to wait for any pending group spill to run. - // The function returns true if it needs to wait, otherwise false. The latter - // case is either because there is no pending group spill or this operator is - // the last one to reach to the group spill barrier and execute the group - // spill inline. - bool waitSpill(RowVectorPtr& input); - - // The callback registered to 'spillGroup_' to run group spill on - // 'spillOperators'. - void runSpill(const std::vector& spillOperators); - - // Invoked by 'runSpill' to sum up the spill targets from all the operators in - // 'numRows' and 'numBytes'. - void addAndClearSpillTarget(uint64_t& numRows, uint64_t& numBytes); - // Invoked to reset the operator state to restore previously spilled data. It // setup (recursive) spiller and spill input reader from 'spillInput' received // from 'joinBride_'. 'spillInput' contains a shard of previously spilled @@ -255,14 +231,8 @@ class HashBuild final : public Operator { std::shared_ptr joinBridge_; - // The maximum memory usage that a hash build can hold before spilling. - // If it is zero, then there is no such limit. - const uint64_t spillMemoryThreshold_; - bool exceededMaxSpillLevelLimit_{false}; - std::shared_ptr spillGroup_; - State state_{State::kRunning}; // The row type used for hash table build and disk spilling. @@ -315,13 +285,6 @@ class HashBuild final : public Operator { // 'testSpillPct_';. uint64_t spillTestCounter_{0}; - // The spill targets set by 'requestSpill()' to request group spill. - uint64_t numSpillRows_{0}; - uint64_t numSpillBytes_{0}; - - // This can be nullptr if either spilling is not allowed or it has been - // trsnaferred to the last hash build operator while in kWaitForBuild state or - // it has been cleared to setup a new one for recursive spilling. std::unique_ptr spiller_; // Used to read input from previously spilled data for restoring. diff --git a/velox/exec/tests/HashJoinTest.cpp b/velox/exec/tests/HashJoinTest.cpp index 08dc5b8f76041..016c93cdd847c 100644 --- a/velox/exec/tests/HashJoinTest.cpp +++ b/velox/exec/tests/HashJoinTest.cpp @@ -24,6 +24,7 @@ #include "velox/exec/HashBuild.h" #include "velox/exec/HashJoinBridge.h" #include "velox/exec/PlanNodeStats.h" +#include "velox/exec/SharedArbitrator.h" #include "velox/exec/TableScan.h" #include "velox/exec/tests/utils/ArbitratorTestUtil.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" @@ -34,6 +35,8 @@ #include "velox/exec/tests/utils/VectorTestUtil.h" #include "velox/vector/fuzzer/VectorFuzzer.h" +DECLARE_bool(velox_test_enable_arbitration); + using namespace facebook::velox; using namespace facebook::velox::exec; using namespace facebook::velox::exec::test; @@ -201,6 +204,12 @@ using JoinResultsVerifier = class HashJoinBuilder { public: + enum SpillOption { + WITHOUT_INJECTION, + WITH_INJECTION, + WITH_AND_WITHOUT_INJECTION + }; + HashJoinBuilder( memory::MemoryPool& pool, DuckDbQueryRunner& duckDbQueryRunner, @@ -372,13 +381,8 @@ class HashJoinBuilder { return *this; } - HashJoinBuilder& spillMemoryThreshold(uint64_t threshold) { - spillMemoryThreshold_ = threshold; - return *this; - } - - HashJoinBuilder& injectSpill(bool injectSpill) { - injectSpill_ = injectSpill; + HashJoinBuilder& spillOption(SpillOption spillOption) { + spillOption_ = spillOption; return *this; } @@ -561,8 +565,12 @@ class HashJoinBuilder { } void runTest(const core::PlanNodePtr& planNode) { - runTest(planNode, false, maxSpillLevel_.value_or(-1)); - if (injectSpill_) { + if (spillOption_ == SpillOption::WITHOUT_INJECTION || + spillOption_ == SpillOption::WITH_AND_WITHOUT_INJECTION) { + runTest(planNode, false, maxSpillLevel_.value_or(-1)); + } + if (spillOption_ == SpillOption::WITH_INJECTION || + spillOption_ == SpillOption::WITH_AND_WITHOUT_INJECTION) { if (maxSpillLevel_.has_value()) { runTest(planNode, true, maxSpillLevel_.value(), 100); } else { @@ -584,7 +592,15 @@ class HashJoinBuilder { builder.splits(splitEntry.first, splitEntry.second); } } - auto queryCtx = std::make_shared(executor_); + auto queryCtx = std::make_shared( + executor_, + core::QueryConfig{{}}, + std::unordered_map>{}, + cache::AsyncDataCache::getInstance(), + memory::MemoryManager::getInstance()->addRootPool( + "query_pool", + memory::kMaxMemory, + memory::MemoryReclaimer::create())); std::shared_ptr spillDirectory; if (injectSpill) { spillDirectory = exec::test::TempDirectoryPath::create(); @@ -596,15 +612,6 @@ class HashJoinBuilder { // many spilled vectors in a spilled file to trigger recursive spilling. config(core::QueryConfig::kSpillWriteBufferSize, std::to_string(0)); config(core::QueryConfig::kTestingSpillPct, "100"); - } else if (spillMemoryThreshold_ != 0) { - spillDirectory = exec::test::TempDirectoryPath::create(); - builder.spillDirectory(spillDirectory->path); - config(core::QueryConfig::kSpillEnabled, "true"); - config(core::QueryConfig::kMaxSpillLevel, std::to_string(maxSpillLevel)); - config(core::QueryConfig::kJoinSpillEnabled, "true"); - config( - core::QueryConfig::kJoinSpillMemoryThreshold, - std::to_string(spillMemoryThreshold_)); } else if (!spillDirectory_.empty()) { builder.spillDirectory(spillDirectory_); config(core::QueryConfig::kSpillEnabled, "true"); @@ -661,9 +668,9 @@ class HashJoinBuilder { ASSERT_GE( memory::spillMemoryPool()->stats().peakBytes, peakSpillMemoryUsage); } - // NOTE: if 'spillDirectory_' is not empty and spill threshold is not - // set, the test might trigger spilling by its own. - } else if (spillDirectory_.empty() && spillMemoryThreshold_ == 0) { + // NOTE: if 'spillDirectory_' is not empty, the test might trigger + // spilling by its own. + } else if (spillDirectory_.empty()) { ASSERT_EQ(statsPair.first.spilledRows, 0); ASSERT_EQ(statsPair.first.spilledInputBytes, 0); ASSERT_EQ(statsPair.first.spilledBytes, 0); @@ -711,8 +718,7 @@ class HashJoinBuilder { std::vector joinOutputLayout_; std::vector outputProjections_; - uint64_t spillMemoryThreshold_{0}; - bool injectSpill_{true}; + SpillOption spillOption_{SpillOption::WITH_AND_WITHOUT_INJECTION}; // If not set, then the test will run the test with different settings: // 0, 2. std::optional maxSpillLevel_; @@ -737,6 +743,16 @@ class HashJoinTest : public HiveConnectorTestBase { explicit HashJoinTest(const TestParam& param) : numDrivers_(param.numDrivers) {} + static void SetUpTestCase() { + FLAGS_velox_test_enable_arbitration = true; + OperatorTestBase::SetUpTestCase(); + } + + static void TearDownTestCase() { + FLAGS_velox_test_enable_arbitration = false; + OperatorTestBase::TearDownTestCase(); + } + void SetUp() override { HiveConnectorTestBase::SetUp(); @@ -987,104 +1003,6 @@ TEST_P(MultiThreadedHashJoinTest, emptyProbe) { .run(); } -TEST_P(MultiThreadedHashJoinTest, emptyProbeWithSpillMemoryThreshold) { - HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) - .numDrivers(numDrivers_) - .keyTypes({BIGINT()}) - .probeVectors(0, 5) - .buildVectors(1500, 5) - .injectSpill(false) - .spillMemoryThreshold(1) - .maxSpillLevel(0) - .referenceQuery( - "SELECT t_k0, t_data, u_k0, u_data FROM t, u WHERE t_k0 = u_k0") - .verifier([&](const std::shared_ptr& task, bool /*unused*/) { - const auto statsPair = taskSpilledStats(*task); - ASSERT_GT(statsPair.first.spilledRows, 0); - ASSERT_GT(statsPair.first.spilledBytes, 0); - ASSERT_GT(statsPair.first.spilledPartitions, 0); - ASSERT_GT(statsPair.first.spilledFiles, 0); - ASSERT_EQ(statsPair.second.spilledRows, 0); - ASSERT_EQ(statsPair.second.spilledBytes, 0); - ASSERT_GT(statsPair.second.spilledPartitions, 0); - ASSERT_EQ(statsPair.second.spilledFiles, 0); - }) - .run(); -} - -DEBUG_ONLY_TEST_P( - MultiThreadedHashJoinTest, - raceBetweenTaskTerminationAndThesholdTriggeredSpill) { - std::atomic task{nullptr}; - SCOPED_TESTVALUE_SET( - "facebook::velox::exec::Driver::runInternal::addInput", - std::function([&](Operator* op) { - if (op->operatorType() != "HashBuild") { - return; - } - task = op->testingOperatorCtx()->task().get(); - })); - SCOPED_TESTVALUE_SET( - "facebook::velox::exec::SpillOperatorGroup::runSpill", - std::function([&](Operator* op) { - ASSERT_TRUE(task.load() != nullptr); - task.load()->requestAbort(); - // Wait a bit to ensure the other peer hash build drivers have been - // closed. - std::this_thread::sleep_for(std::chrono::milliseconds(1'000)); - })); - - VectorFuzzer fuzzer({.vectorSize = 10}, pool()); - const int32_t numBuildVectors = 2; - std::vector buildVectors; - for (int32_t i = 0; i < numBuildVectors; ++i) { - auto vector = fuzzer.fuzzRow(buildType_); - // Build the build vector with the same join key to make sure there is only - // one hash build operator running. - vector->childAt(0) = makeFlatVector( - vector->size(), [](auto /*unused*/) { return 1; }); - buildVectors.push_back(std::move(vector)); - } - const int32_t numProbeVectors = 2; - std::vector probeVectors; - for (int32_t i = 0; i < numProbeVectors; ++i) { - probeVectors.push_back(fuzzer.fuzzRow(probeType_)); - } - - createDuckDbTable("t", probeVectors); - createDuckDbTable("u", buildVectors); - - auto planNodeIdGenerator = std::make_shared(); - auto plan = PlanBuilder(planNodeIdGenerator) - .values(probeVectors, true) - .hashJoin( - {"t_k1"}, - {"u_k1"}, - PlanBuilder(planNodeIdGenerator) - .values(buildVectors, true) - // NOTE: add local partition here to ensure all the - // build inputs go to the same hash build operator. - .localPartition({"u_k1"}) - .planNode(), - "", - {"t_k1", "t_k2"}, - core::JoinType::kInner) - .planNode(); - - VELOX_ASSERT_THROW( - HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) - .planNode(plan) - .injectSpill(false) - .checkSpillStats(false) - .spillMemoryThreshold(1) - .maxSpillLevel(0) - .numDrivers(numDrivers_) - .referenceQuery( - "SELECT t.t_k1, t.t_k2 from t, u WHERE t.t_k1 = u.u_k1") - .run(), - "Aborted for external error"); -} - TEST_P(MultiThreadedHashJoinTest, normalizedKey) { HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) .numDrivers(numDrivers_) @@ -1118,7 +1036,7 @@ DEBUG_ONLY_TEST_P(MultiThreadedHashJoinTest, parallelJoinBuildCheck) { .buildVectors(1500, 5) .referenceQuery( "SELECT t_k0, t_k1, t_data, u_k0, u_k1, u_data FROM t, u WHERE t_k0 = u_k0 AND t_k1 = u_k1") - .injectSpill(false) + .spillOption(HashJoinBuilder::SpillOption::WITHOUT_INJECTION) .run(); ASSERT_EQ(numDrivers_ == 1, !isParallelBuild); } @@ -1140,7 +1058,7 @@ DEBUG_ONLY_TEST_P( .buildVectors(1500, 5) .referenceQuery( "SELECT t_k0, t_k1, t_data, u_k0, u_k1, u_data FROM t, u WHERE t_k0 = u_k0 AND t_k1 = u_k1") - .injectSpill(false) + .spillOption(HashJoinBuilder::SpillOption::WITHOUT_INJECTION) .run(), "Aborted for external error"); } @@ -3733,7 +3651,7 @@ TEST_F(HashJoinTest, semiProjectWithFilter) { .planNode(plan) .referenceQuery(fmt::format( "SELECT t0, t1, t0 IN (SELECT u0 FROM u WHERE {}) FROM t", filter)) - .injectSpill(false) + .spillOption(HashJoinBuilder::SpillOption::WITHOUT_INJECTION) .run(); plan = makePlan(false /*nullAware*/, filter); @@ -3745,7 +3663,7 @@ TEST_F(HashJoinTest, semiProjectWithFilter) { .referenceQuery(fmt::format( "SELECT t0, t1, EXISTS (SELECT * FROM u WHERE (u0 is not null OR t0 is not null) AND u0 = t0 AND {}) FROM t", filter)) - .injectSpill(false) + .spillOption(HashJoinBuilder::SpillOption::WITHOUT_INJECTION) .run(); } } @@ -4881,7 +4799,7 @@ TEST_F(HashJoinTest, dynamicFiltersAppliedToPreloadedSplits) { HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) .planNode(std::move(op)) .config(core::QueryConfig::kMaxSplitPreloadPerDriver, "3") - .injectSpill(false) + .spillOption(HashJoinBuilder::SpillOption::WITHOUT_INJECTION) .inputSplits({{probeScanId, probeSplits}}) .referenceQuery("select p.p0 from p, b where b.b0 = p.p1") .checkSpillStats(false) @@ -4998,7 +4916,7 @@ TEST_F(HashJoinTest, smallOutputBatchSize) { .planNode(std::move(plan)) .config(core::QueryConfig::kPreferredOutputBatchRows, std::to_string(10)) .referenceQuery("SELECT c0, u_c1 FROM t, u WHERE c0 = u_c0 AND c1 < u_c1") - .injectSpill(false) + .spillOption(HashJoinBuilder::SpillOption::WITHOUT_INJECTION) .run(); } @@ -5254,7 +5172,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringInputProcessing) { .numDrivers(numDrivers_) .planNode(plan) .queryPool(std::move(queryPool)) - .injectSpill(false) + .spillOption(HashJoinBuilder::SpillOption::WITHOUT_INJECTION) .spillDirectory(testData.spillEnabled ? tempDirectory->path : "") .referenceQuery( "SELECT t_k1, t_k2, t_v1, u_k1, u_k2, u_v1 FROM t, u WHERE t.t_k1 = u.u_k1") @@ -5263,9 +5181,9 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringInputProcessing) { const auto statsPair = taskSpilledStats(*task); if (testData.expectedReclaimable) { ASSERT_GT(statsPair.first.spilledBytes, 0); - ASSERT_EQ(statsPair.first.spilledPartitions, 4); + ASSERT_GE(statsPair.first.spilledPartitions, 4); ASSERT_GT(statsPair.second.spilledBytes, 0); - ASSERT_EQ(statsPair.second.spilledPartitions, 4); + ASSERT_GE(statsPair.second.spilledPartitions, 4); verifyTaskSpilledRuntimeStats(*task, true); } else { ASSERT_EQ(statsPair.first.spilledBytes, 0); @@ -5407,7 +5325,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringReserve) { .numDrivers(numDrivers_) .planNode(plan) .queryPool(std::move(queryPool)) - .injectSpill(false) + .spillOption(HashJoinBuilder::SpillOption::WITH_AND_WITHOUT_INJECTION) .spillDirectory(tempDirectory->path) .referenceQuery( "SELECT t_k1, t_k2, t_v1, u_k1, u_k2, u_v1 FROM t, u WHERE t.t_k1 = u.u_k1") @@ -5415,9 +5333,9 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringReserve) { .verifier([&](const std::shared_ptr& task, bool /*unused*/) { const auto statsPair = taskSpilledStats(*task); ASSERT_GT(statsPair.first.spilledBytes, 0); - ASSERT_EQ(statsPair.first.spilledPartitions, 4); + ASSERT_GE(statsPair.first.spilledPartitions, 4); ASSERT_GT(statsPair.second.spilledBytes, 0); - ASSERT_EQ(statsPair.second.spilledPartitions, 4); + ASSERT_GE(statsPair.second.spilledPartitions, 4); verifyTaskSpilledRuntimeStats(*task, true); }) .run(); @@ -5537,7 +5455,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringAllocation) { .numDrivers(numDrivers_) .planNode(plan) .queryPool(std::move(queryPool)) - .injectSpill(false) + .spillOption(HashJoinBuilder::SpillOption::WITHOUT_INJECTION) .spillDirectory(enableSpilling ? tempDirectory->path : "") .referenceQuery( "SELECT t_k1, t_k2, t_v1, u_k1, u_k2, u_v1 FROM t, u WHERE t.t_k1 = u.u_k1") @@ -5656,7 +5574,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringOutputProcessing) { .numDrivers(numDrivers_) .planNode(plan) .queryPool(std::move(queryPool)) - .injectSpill(false) + .spillOption(HashJoinBuilder::SpillOption::WITHOUT_INJECTION) .spillDirectory(enableSpilling ? tempDirectory->path : "") .referenceQuery( "SELECT t_k1, t_k2, t_v1, u_k1, u_k2, u_v1 FROM t, u WHERE t.t_k1 = u.u_k1") @@ -5802,7 +5720,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringWaitForProbe) { .numDrivers(numDrivers_) .planNode(plan) .queryPool(std::move(queryPool)) - .injectSpill(false) + .spillOption(HashJoinBuilder::SpillOption::WITHOUT_INJECTION) .spillDirectory(tempDirectory->path) .referenceQuery( "SELECT t_k1, t_k2, t_v1, u_k1, u_k2, u_v1 FROM t, u WHERE t.t_k1 = u.u_k1") @@ -5931,7 +5849,8 @@ DEBUG_ONLY_TEST_F(HashJoinTest, hashBuildAbortDuringOutputProcessing) { .numDrivers(numDrivers_) .planNode(plan) .queryPool(std::move(queryPool)) - .injectSpill(false) + .spillOption( + HashJoinBuilder::SpillOption::WITHOUT_INJECTION) .referenceQuery( "SELECT t_k1, t_k2, t_v1, u_k1, u_k2, u_v1 FROM t, u WHERE t.t_k1 = u.u_k1") .run(), @@ -5953,7 +5872,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, hashBuildAbortDuringOutputProcessing) { } } -DEBUG_ONLY_TEST_F(HashJoinTest, hashBuildAbortDuringInputgProcessing) { +DEBUG_ONLY_TEST_F(HashJoinTest, hashBuildAbortDuringInputProcessing) { constexpr int64_t kMaxBytes = 1LL << 30; // 1GB VectorFuzzer fuzzer({.vectorSize = 1000}, pool()); const int32_t numBuildVectors = 10; @@ -6037,7 +5956,8 @@ DEBUG_ONLY_TEST_F(HashJoinTest, hashBuildAbortDuringInputgProcessing) { .numDrivers(numDrivers_) .planNode(plan) .queryPool(std::move(queryPool)) - .injectSpill(false) + .spillOption( + HashJoinBuilder::SpillOption::WITHOUT_INJECTION) .referenceQuery( "SELECT t_k1, t_k2, t_v1, u_k1, u_k2, u_v1 FROM t, u WHERE t.t_k1 = u.u_k1") .run(), @@ -6143,7 +6063,8 @@ DEBUG_ONLY_TEST_F(HashJoinTest, hashProbeAbortDuringInputProcessing) { .numDrivers(numDrivers_) .planNode(plan) .queryPool(std::move(queryPool)) - .injectSpill(false) + .spillOption( + HashJoinBuilder::SpillOption::WITHOUT_INJECTION) .referenceQuery( "SELECT t_k1, t_k2, t_v1, u_k1, u_k2, u_v1 FROM t, u WHERE t.t_k1 = u.u_k1") .run(), @@ -6194,9 +6115,8 @@ TEST_F(HashJoinTest, leftJoinWithMissAtEndOfBatch) { HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) .planNode(plan) - .injectSpill(false) + .spillOption(HashJoinBuilder::SpillOption::WITH_INJECTION) .checkSpillStats(false) - .spillMemoryThreshold(1) .maxSpillLevel(0) .numDrivers(1) .config( @@ -6246,9 +6166,8 @@ TEST_F(HashJoinTest, leftJoinWithMissAtEndOfBatchMultipleBuildMatches) { HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) .planNode(plan) - .injectSpill(false) + .spillOption(HashJoinBuilder::SpillOption::WITH_INJECTION) .checkSpillStats(false) - .spillMemoryThreshold(1) .maxSpillLevel(0) .numDrivers(1) .config( @@ -6318,7 +6237,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, minSpillableMemoryReservation) { HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) .numDrivers(numDrivers_) .planNode(plan) - .injectSpill(false) + .spillOption(HashJoinBuilder::SpillOption::WITHOUT_INJECTION) .spillDirectory(tempDirectory->path) .referenceQuery( "SELECT t_k1, t_k2, t_v1, u_k1, u_k2, u_v1 FROM t, u WHERE t.t_k1 = u.u_k1") @@ -6363,9 +6282,8 @@ TEST_F(HashJoinTest, exceededMaxSpillLevel) { HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) .numDrivers(1) .planNode(plan) - .injectSpill(false) // Always trigger spilling. - .spillMemoryThreshold(1) + .spillOption(HashJoinBuilder::SpillOption::WITH_INJECTION) .maxSpillLevel(0) .spillDirectory(tempDirectory->path) .referenceQuery( @@ -6426,6 +6344,7 @@ TEST_F(HashJoinTest, maxSpillBytes) { .queryCtx(queryCtx) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kJoinSpillEnabled, true) + // TOOD(jtan6): Use global flag to trigger this spilling // Set a small capacity to trigger threshold based spilling .config(core::QueryConfig::kJoinSpillMemoryThreshold, 5 << 20) .config(core::QueryConfig::kMaxSpillBytes, testData.maxSpilledBytes) @@ -6483,6 +6402,7 @@ TEST_F(HashJoinTest, onlyHashBuildMaxSpillBytes) { .queryCtx(queryCtx) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kJoinSpillEnabled, true) + // TOOD(jtan6): Use global flag to trigger this spilling // Set a small capacity to trigger threshold based spilling .config(core::QueryConfig::kJoinSpillMemoryThreshold, 5 << 20) .config(core::QueryConfig::kMaxSpillBytes, testData.maxSpilledBytes) diff --git a/velox/exec/tests/utils/OperatorTestBase.cpp b/velox/exec/tests/utils/OperatorTestBase.cpp index 4871d58ca3254..5eb84124692e3 100644 --- a/velox/exec/tests/utils/OperatorTestBase.cpp +++ b/velox/exec/tests/utils/OperatorTestBase.cpp @@ -19,11 +19,9 @@ #include "velox/common/file/FileSystems.h" #include "velox/common/memory/MallocAllocator.h" #include "velox/common/testutil/TestValue.h" -#include "velox/dwio/common/FileSink.h" #include "velox/exec/Exchange.h" #include "velox/exec/OutputBufferManager.h" #include "velox/exec/SharedArbitrator.h" -#include "velox/exec/tests/utils/ArbitratorTestUtil.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/functions/prestosql/aggregates/RegisterAggregateFunctions.h" #include "velox/functions/prestosql/registration/RegistrationFunctions.h" @@ -32,6 +30,10 @@ #include "velox/parse/TypeResolver.h" #include "velox/serializers/PrestoSerializer.h" +DEFINE_bool( + velox_test_enable_arbitration, + false, + "Enable to turn on arbitration for tests"); DECLARE_bool(velox_memory_leak_check_enabled); DECLARE_bool(velox_enable_memory_usage_track_in_default_memory_pool); @@ -53,11 +55,17 @@ OperatorTestBase::~OperatorTestBase() { } void OperatorTestBase::SetUpTestCase() { + memory::MemoryManagerOptions options; + options.allocatorCapacity = 8L << 30; FLAGS_velox_enable_memory_usage_track_in_default_memory_pool = true; FLAGS_velox_memory_leak_check_enabled = true; exec::SharedArbitrator::registerFactory(); - memory::MemoryManagerOptions options; - options.allocatorCapacity = 8L << 30; + if (FLAGS_velox_test_enable_arbitration) { + options.arbitratorCapacity = 4L << 30; + options.arbitratorKind = "SHARED"; + options.checkUsageLeak = true; + options.arbitrationStateCheckCb = memoryArbitrationStateCheck; + } memory::MemoryManager::testingSetInstance(options); asyncDataCache_ = cache::AsyncDataCache::create(memoryManager()->allocator()); cache::AsyncDataCache::setInstance(asyncDataCache_.get()); diff --git a/velox/exec/tests/utils/OperatorTestBase.h b/velox/exec/tests/utils/OperatorTestBase.h index 54eb0d6b97eb4..59563a7cd24bd 100644 --- a/velox/exec/tests/utils/OperatorTestBase.h +++ b/velox/exec/tests/utils/OperatorTestBase.h @@ -47,6 +47,8 @@ class OperatorTestBase : public testing::Test, static void TearDownTestCase(); + static void setupMemory(memory::MemoryManagerOptions options = {}); + void createDuckDbTable(const std::vector& data) { duckDbQueryRunner_.createTable("tmp", data); }