diff --git a/velox/exec/HashBuild.cpp b/velox/exec/HashBuild.cpp index df08e859020b7..eb20a0e6f4125 100644 --- a/velox/exec/HashBuild.cpp +++ b/velox/exec/HashBuild.cpp @@ -66,17 +66,10 @@ HashBuild::HashBuild( joinBridge_(operatorCtx_->task()->getHashJoinBridgeLocked( operatorCtx_->driverCtx()->splitGroupId, planNodeId())), - spillMemoryThreshold_( - operatorCtx_->driverCtx()->queryConfig().joinSpillMemoryThreshold()), keyChannelMap_(joinNode_->rightKeys().size()) { VELOX_CHECK(pool()->trackUsage()); VELOX_CHECK_NOT_NULL(joinBridge_); - spillGroup_ = spillEnabled() - ? operatorCtx_->task()->getSpillOperatorGroupLocked( - operatorCtx_->driverCtx()->splitGroupId, planNodeId()) - : nullptr; - joinBridge_->addBuilder(); auto inputType = joinNode_->sources()[1]->outputType(); @@ -208,12 +201,8 @@ void HashBuild::setupSpiller(SpillPartition* spillPartition) { HashBitRange hashBits( spillConfig.startPartitionBit, spillConfig.startPartitionBit + spillConfig.joinPartitionBits); - if (spillPartition == nullptr) { - spillGroup_->addOperator( - *this, - operatorCtx_->driver()->shared_from_this(), - [&](const std::vector& operators) { runSpill(operators); }); - } else { + + if (spillPartition != nullptr) { LOG(INFO) << "Setup reader to read spilled input from " << spillPartition->toString() << ", memory pool: " << pool()->name(); @@ -316,12 +305,7 @@ void HashBuild::removeInputRowsForAntiJoinFilter() { void HashBuild::addInput(RowVectorPtr input) { checkRunning(); - - if (!ensureInputFits(input)) { - VELOX_CHECK_NOT_NULL(input_); - VELOX_CHECK(future_.valid()); - return; - } + ensureInputFits(input); TestValue::adjust("facebook::velox::exec::HashBuild::addInput", this); @@ -425,37 +409,20 @@ void HashBuild::addInput(RowVectorPtr input) { }); } -bool HashBuild::ensureInputFits(RowVectorPtr& input) { +void HashBuild::ensureInputFits(RowVectorPtr& input) { // NOTE: we don't need memory reservation if all the partitions are spilling // as we spill all the input rows to disk directly. if (!spillEnabled() || spiller_ == nullptr || spiller_->isAllSpilled()) { - return true; + return; } // NOTE: we simply reserve memory all inputs even though some of them are // spilling directly. It is okay as we will accumulate the extra reservation // in the operator's memory pool, and won't make any new reservation if there // is already sufficient reservations. - if (!reserveMemory(input)) { - if (!requestSpill(input)) { - return false; - } - } else { - // Check if any other peer operator has requested group spill. - if (waitSpill(input)) { - return false; - } - } - return true; -} - -bool HashBuild::reserveMemory(const RowVectorPtr& input) { VELOX_CHECK(spillEnabled()); Operator::ReclaimableSectionGuard guard(this); - numSpillRows_ = 0; - numSpillBytes_ = 0; - auto* rows = table_->rows(); const auto numRows = rows->numRows(); @@ -472,19 +439,7 @@ bool HashBuild::reserveMemory(const RowVectorPtr& input) { memory::testingRunArbitration(pool()); // NOTE: the memory arbitration should have triggered spilling on this // hash build operator so we return true to indicate have enough memory. - return true; - } - - // We check usage from the parent pool to take peers' allocations into - // account. - const auto nodeUsage = pool()->parent()->currentBytes(); - if (spillMemoryThreshold_ != 0 && nodeUsage > spillMemoryThreshold_) { - const int64_t bytesToSpill = - nodeUsage * spillConfig()->spillableReservationGrowthPct / 100; - numSpillRows_ = std::max( - 1, bytesToSpill / (rows->fixedRowSize() + outOfLineBytesPerRow)); - numSpillBytes_ = numSpillRows_ * outOfLineBytesPerRow; - return false; + return; } } @@ -506,14 +461,14 @@ bool HashBuild::reserveMemory(const RowVectorPtr& input) { // Enough free rows for input rows and enough variable length free // space for the flat size of the whole vector. If outOfLineBytes // is 0 there is no need for variable length space. - return true; + return; } // If there is variable length data we take the flat size of the // input as a cap on the new variable length data needed. There must be at // least 2x the increments in reservation. if (pool()->availableReservation() > 2 * incrementBytes) { - return true; + return; } } @@ -524,15 +479,12 @@ bool HashBuild::reserveMemory(const RowVectorPtr& input) { incrementBytes * 2, currentUsage * spillConfig_->spillableReservationGrowthPct / 100); - if (pool()->maybeReserve(targetIncrementBytes)) { - return true; + if (!pool()->maybeReserve(targetIncrementBytes)) { + LOG(WARNING) << "Failed to reserve " << succinctBytes(targetIncrementBytes) + << " for memory pool " << pool()->name() + << ", usage: " << succinctBytes(pool()->currentBytes()) + << ", reservation: " << succinctBytes(pool()->reservedBytes()); } - - LOG(WARNING) << "Failed to reserve " << succinctBytes(targetIncrementBytes) - << " for memory pool " << pool()->name() - << ", usage: " << succinctBytes(pool()->currentBytes()) - << ", reservation: " << succinctBytes(pool()->reservedBytes()); - return true; } void HashBuild::spillInput(const RowVectorPtr& input) { @@ -548,17 +500,18 @@ void HashBuild::spillInput(const RowVectorPtr& input) { computeSpillPartitions(input); vector_size_t numSpillInputs = 0; - for (auto row = 0; row < numInput; ++row) { - const auto partition = spillPartitions_[row]; - if (FOLLY_UNLIKELY(!activeRows_.isValid(row))) { + for (auto rowIdx = 0; rowIdx < numInput; ++rowIdx) { + const auto partition = spillPartitions_[rowIdx]; + if (FOLLY_UNLIKELY(!activeRows_.isValid(rowIdx))) { continue; } if (!spiller_->isSpilled(partition)) { continue; } - activeRows_.setValid(row, false); + activeRows_.setValid(rowIdx, false); ++numSpillInputs; - rawSpillInputIndicesBuffers_[partition][numSpillInputs_[partition]++] = row; + rawSpillInputIndicesBuffers_[partition][numSpillInputs_[partition]++] = + rowIdx; } if (numSpillInputs == 0) { return; @@ -644,70 +597,6 @@ void HashBuild::spillPartition( } } -bool HashBuild::requestSpill(RowVectorPtr& input) { - VELOX_CHECK_GT(numSpillRows_, 0); - VELOX_CHECK_GT(numSpillBytes_, 0); - - // If all the partitions have been spilled, then nothing to spill. - if (spiller_->isAllSpilled()) { - return true; - } - - input_ = std::move(input); - if (spillGroup_->requestSpill(*this, future_)) { - VELOX_CHECK(future_.valid()); - setState(State::kWaitForSpill); - return false; - } - input = std::move(input_); - return true; -} - -bool HashBuild::waitSpill(RowVectorPtr& input) { - if (!spillGroup_->needSpill()) { - return false; - } - - if (spillGroup_->waitSpill(*this, future_)) { - VELOX_CHECK(future_.valid()); - input_ = std::move(input); - setState(State::kWaitForSpill); - return true; - } - return false; -} - -void HashBuild::runSpill(const std::vector& spillOperators) { - VELOX_CHECK(spillEnabled()); - VELOX_CHECK(!spiller_->state().isAllPartitionSpilled()); - - uint64_t targetRows = 0; - uint64_t targetBytes = 0; - for (auto& spillOp : spillOperators) { - HashBuild* build = dynamic_cast(spillOp); - VELOX_CHECK_NOT_NULL(build); - build->addAndClearSpillTarget(targetRows, targetBytes); - } - VELOX_CHECK_GT(targetRows, 0); - VELOX_CHECK_GT(targetBytes, 0); - - // TODO: consider to offload the partition spill processing to an executor to - // run in parallel. - for (auto& spillOp : spillOperators) { - HashBuild* build = dynamic_cast(spillOp); - build->spiller_->spill(); - build->table_->clear(); - build->pool()->release(); - } -} - -void HashBuild::addAndClearSpillTarget(uint64_t& numRows, uint64_t& numBytes) { - numRows += numSpillRows_; - numSpillRows_ = 0; - numBytes += numSpillBytes_; - numSpillBytes_ = 0; -} - void HashBuild::noMoreInput() { checkRunning(); @@ -720,10 +609,6 @@ void HashBuild::noMoreInput() { } void HashBuild::noMoreInputInternal() { - if (spillEnabled()) { - spillGroup_->operatorStopped(*this); - } - if (!finishHashBuild()) { return; } @@ -751,6 +636,7 @@ bool HashBuild::finishHashBuild() { return false; } + LOG(ERROR) << "=================== peers.size() " << peers.size(); TestValue::adjust("facebook::velox::exec::HashBuild::finishHashBuild", this); auto promisesGuard = folly::makeGuard([&]() { @@ -848,7 +734,6 @@ bool HashBuild::finishHashBuild() { std::move(table_), std::move(spillPartitions), joinHasNullKeys_); if (spillEnabled()) { intermediateStateCleared_ = true; - spillGroup_->restart(); } // Release the unused memory reservation since we have finished the merged @@ -870,6 +755,8 @@ void HashBuild::recordSpillStats(Spiller* spiller) { exceededMaxSpillLevelLimit_ = false; common::SpillStats spillStats; spillStats.spillMaxLevelExceededCount = 1; + static int32_t counter = 0; + LOG(ERROR) << "================== " << ++counter; Operator::recordSpillStats(spillStats); } } diff --git a/velox/exec/HashBuild.h b/velox/exec/HashBuild.h index 1c3b790fb5421..d3c267e234cbb 100644 --- a/velox/exec/HashBuild.h +++ b/velox/exec/HashBuild.h @@ -101,9 +101,7 @@ class HashBuild final : public Operator { // Invoked when operator has finished processing the build input and wait for // all the other drivers to finish the processing. The last driver that // reaches to the hash build barrier, is responsible to build the hash table - // merged from all the other drivers. If the disk spilling is enabled, the - // last driver will also restart 'spillGroup_' and add a new hash build - // barrier for the next round of hash table build operation if it needs. + // merged from all the other drivers. bool finishHashBuild(); // Invoked after the hash table has been built. It waits for any spill data to @@ -142,27 +140,14 @@ class HashBuild final : public Operator { // Invoked to ensure there is a sufficient memory to process 'input' by // reserving a sufficient amount of memory in advance if disk spilling is - // enabled. The function returns true if the disk spilling is not enabled, or - // the memory reservation succeeds. If the memory reservation fails, the - // function will trigger a group spill which needs coordination among the - // other build drivers in the same group. The function returns true if the - // group spill has been inline executed which could happen if there is only - // one driver in the group, or it happens that all the other drivers have - // also requested group spill and this driver is the last one to reach the - // group spill barrier. Otherwise, the function returns false to wait for the - // group spill to run. The operator will transition to 'kWaitForSpill' state - // accordingly. - bool ensureInputFits(RowVectorPtr& input); + // enabled. + void ensureInputFits(RowVectorPtr& input); // Invoked to ensure there is sufficient memory to build the join table with // the specified 'numRows' if spilling is enabled. The function throws to fail // the query if the memory reservation fails. void ensureTableFits(uint64_t numRows); - // Invoked to reserve memory for 'input' if disk spilling is enabled. The - // function returns true on success, otherwise false. - bool reserveMemory(const RowVectorPtr& input); - // Invoked to compute spill partitions numbers for each row 'input' and spill // rows to spiller directly if the associated partition(s) is spilling. The // function will skip processing if disk spilling is not enabled or there is @@ -190,28 +175,6 @@ class HashBuild final : public Operator { vector_size_t numInput, const SpillPartitionNumSet& spillPartitions); - // Invoked to send group spill request to 'spillGroup_'. The function returns - // true if group spill has been inline executed, otherwise returns false. In - // the latter case, the operator will transition to 'kWaitForSpill' state and - // 'input' will be saved in 'input_' to be processed after the group spill has - // been executed. - bool requestSpill(RowVectorPtr& input); - - // Invoked to check if it needs to wait for any pending group spill to run. - // The function returns true if it needs to wait, otherwise false. The latter - // case is either because there is no pending group spill or this operator is - // the last one to reach to the group spill barrier and execute the group - // spill inline. - bool waitSpill(RowVectorPtr& input); - - // The callback registered to 'spillGroup_' to run group spill on - // 'spillOperators'. - void runSpill(const std::vector& spillOperators); - - // Invoked by 'runSpill' to sum up the spill targets from all the operators in - // 'numRows' and 'numBytes'. - void addAndClearSpillTarget(uint64_t& numRows, uint64_t& numBytes); - // Invoked to reset the operator state to restore previously spilled data. It // setup (recursive) spiller and spill input reader from 'spillInput' received // from 'joinBride_'. 'spillInput' contains a shard of previously spilled @@ -248,14 +211,8 @@ class HashBuild final : public Operator { std::shared_ptr joinBridge_; - // The maximum memory usage that a hash build can hold before spilling. - // If it is zero, then there is no such limit. - const uint64_t spillMemoryThreshold_; - bool exceededMaxSpillLevelLimit_{false}; - std::shared_ptr spillGroup_; - State state_{State::kRunning}; // The row type used for hash table build and disk spilling. @@ -304,13 +261,6 @@ class HashBuild final : public Operator { // at least one entry with null join keys. bool joinHasNullKeys_{false}; - // The spill targets set by 'requestSpill()' to request group spill. - uint64_t numSpillRows_{0}; - uint64_t numSpillBytes_{0}; - - // This can be nullptr if either spilling is not allowed or it has been - // transferred to the last hash build operator while in kWaitForBuild state or - // it has been cleared to set up a new one for recursive spilling. std::unique_ptr spiller_; // Used to read input from previously spilled data for restoring. diff --git a/velox/exec/tests/HashJoinTest.cpp b/velox/exec/tests/HashJoinTest.cpp index e1d76f7bfd721..6a71c4d6f90ba 100644 --- a/velox/exec/tests/HashJoinTest.cpp +++ b/velox/exec/tests/HashJoinTest.cpp @@ -24,6 +24,7 @@ #include "velox/exec/HashBuild.h" #include "velox/exec/HashJoinBridge.h" #include "velox/exec/PlanNodeStats.h" +#include "velox/exec/SharedArbitrator.h" #include "velox/exec/tests/utils/ArbitratorTestUtil.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/Cursor.h" @@ -371,11 +372,6 @@ class HashJoinBuilder { return *this; } - HashJoinBuilder& spillMemoryThreshold(uint64_t threshold) { - spillMemoryThreshold_ = threshold; - return *this; - } - HashJoinBuilder& injectSpill(bool injectSpill) { injectSpill_ = injectSpill; return *this; @@ -583,7 +579,15 @@ class HashJoinBuilder { builder.splits(splitEntry.first, splitEntry.second); } } - auto queryCtx = std::make_shared(executor_); + auto queryCtx = std::make_shared( + executor_, + core::QueryConfig{{}}, + std::unordered_map>{}, + cache::AsyncDataCache::getInstance(), + memory::MemoryManager::getInstance()->addRootPool( + "query_pool", + memory::kMaxMemory, + memory::MemoryReclaimer::create())); std::shared_ptr spillDirectory; int32_t spillPct{0}; if (injectSpill) { @@ -596,15 +600,6 @@ class HashJoinBuilder { // many spilled vectors in a spilled file to trigger recursive spilling. config(core::QueryConfig::kSpillWriteBufferSize, std::to_string(0)); spillPct = 100; - } else if (spillMemoryThreshold_ != 0) { - spillDirectory = exec::test::TempDirectoryPath::create(); - builder.spillDirectory(spillDirectory->path); - config(core::QueryConfig::kSpillEnabled, "true"); - config(core::QueryConfig::kMaxSpillLevel, std::to_string(maxSpillLevel)); - config(core::QueryConfig::kJoinSpillEnabled, "true"); - config( - core::QueryConfig::kJoinSpillMemoryThreshold, - std::to_string(spillMemoryThreshold_)); } else if (!spillDirectory_.empty()) { builder.spillDirectory(spillDirectory_); config(core::QueryConfig::kSpillEnabled, "true"); @@ -679,9 +674,9 @@ class HashJoinBuilder { ASSERT_GE( memory::spillMemoryPool()->stats().peakBytes, peakSpillMemoryUsage); } - // NOTE: if 'spillDirectory_' is not empty and spill threshold is not - // set, the test might trigger spilling by its own. - } else if (spillDirectory_.empty() && spillMemoryThreshold_ == 0) { + // NOTE: if 'spillDirectory_' is not empty, the test might trigger + // spilling by its own. + } else if (spillDirectory_.empty()) { ASSERT_EQ(statsPair.first.spilledRows, 0); ASSERT_EQ(statsPair.first.spilledInputBytes, 0); ASSERT_EQ(statsPair.first.spilledBytes, 0); @@ -729,7 +724,6 @@ class HashJoinBuilder { std::vector joinOutputLayout_; std::vector outputProjections_; - uint64_t spillMemoryThreshold_{0}; bool injectSpill_{true}; // If not set, then the test will run the test with different settings: // 0, 2. @@ -750,16 +744,21 @@ class HashJoinBuilder { class HashJoinTest : public HiveConnectorTestBase { protected: - static void SetUpTestCase() { - FLAGS_velox_testing_enable_arbitration = true; - HiveConnectorTestBase::SetUpTestCase(); - } - HashJoinTest() : HashJoinTest(TestParam(1)) {} explicit HashJoinTest(const TestParam& param) : numDrivers_(param.numDrivers) {} + static void SetUpTestCase() { + FLAGS_velox_testing_enable_arbitration = true; + OperatorTestBase::SetUpTestCase(); + } + + static void TearDownTestCase() { + FLAGS_velox_testing_enable_arbitration = false; + OperatorTestBase::TearDownTestCase(); + } + void SetUp() override { HiveConnectorTestBase::SetUp(); @@ -1010,104 +1009,6 @@ TEST_P(MultiThreadedHashJoinTest, emptyProbe) { .run(); } -TEST_P(MultiThreadedHashJoinTest, emptyProbeWithSpillMemoryThreshold) { - HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) - .numDrivers(numDrivers_) - .keyTypes({BIGINT()}) - .probeVectors(0, 5) - .buildVectors(1500, 5) - .injectSpill(false) - .spillMemoryThreshold(1) - .maxSpillLevel(0) - .referenceQuery( - "SELECT t_k0, t_data, u_k0, u_data FROM t, u WHERE t_k0 = u_k0") - .verifier([&](const std::shared_ptr& task, bool /*unused*/) { - const auto statsPair = taskSpilledStats(*task); - ASSERT_GT(statsPair.first.spilledRows, 0); - ASSERT_GT(statsPair.first.spilledBytes, 0); - ASSERT_GT(statsPair.first.spilledPartitions, 0); - ASSERT_GT(statsPair.first.spilledFiles, 0); - ASSERT_EQ(statsPair.second.spilledRows, 0); - ASSERT_EQ(statsPair.second.spilledBytes, 0); - ASSERT_GT(statsPair.second.spilledPartitions, 0); - ASSERT_EQ(statsPair.second.spilledFiles, 0); - }) - .run(); -} - -DEBUG_ONLY_TEST_P( - MultiThreadedHashJoinTest, - raceBetweenTaskTerminationAndThesholdTriggeredSpill) { - std::atomic task{nullptr}; - SCOPED_TESTVALUE_SET( - "facebook::velox::exec::Driver::runInternal::addInput", - std::function([&](Operator* op) { - if (op->operatorType() != "HashBuild") { - return; - } - task = op->testingOperatorCtx()->task().get(); - })); - SCOPED_TESTVALUE_SET( - "facebook::velox::exec::SpillOperatorGroup::runSpill", - std::function([&](Operator* op) { - ASSERT_TRUE(task.load() != nullptr); - task.load()->requestAbort(); - // Wait a bit to ensure the other peer hash build drivers have been - // closed. - std::this_thread::sleep_for(std::chrono::milliseconds(1'000)); - })); - - VectorFuzzer fuzzer({.vectorSize = 10}, pool()); - const int32_t numBuildVectors = 2; - std::vector buildVectors; - for (int32_t i = 0; i < numBuildVectors; ++i) { - auto vector = fuzzer.fuzzRow(buildType_); - // Build the build vector with the same join key to make sure there is only - // one hash build operator running. - vector->childAt(0) = makeFlatVector( - vector->size(), [](auto /*unused*/) { return 1; }); - buildVectors.push_back(std::move(vector)); - } - const int32_t numProbeVectors = 2; - std::vector probeVectors; - for (int32_t i = 0; i < numProbeVectors; ++i) { - probeVectors.push_back(fuzzer.fuzzRow(probeType_)); - } - - createDuckDbTable("t", probeVectors); - createDuckDbTable("u", buildVectors); - - auto planNodeIdGenerator = std::make_shared(); - auto plan = PlanBuilder(planNodeIdGenerator) - .values(probeVectors, true) - .hashJoin( - {"t_k1"}, - {"u_k1"}, - PlanBuilder(planNodeIdGenerator) - .values(buildVectors, true) - // NOTE: add local partition here to ensure all the - // build inputs go to the same hash build operator. - .localPartition({"u_k1"}) - .planNode(), - "", - {"t_k1", "t_k2"}, - core::JoinType::kInner) - .planNode(); - - VELOX_ASSERT_THROW( - HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) - .planNode(plan) - .injectSpill(false) - .checkSpillStats(false) - .spillMemoryThreshold(1) - .maxSpillLevel(0) - .numDrivers(numDrivers_) - .referenceQuery( - "SELECT t.t_k1, t.t_k2 from t, u WHERE t.t_k1 = u.u_k1") - .run(), - "Aborted for external error"); -} - TEST_P(MultiThreadedHashJoinTest, normalizedKey) { HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) .numDrivers(numDrivers_) @@ -5976,7 +5877,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, hashBuildAbortDuringOutputProcessing) { } } -DEBUG_ONLY_TEST_F(HashJoinTest, hashBuildAbortDuringInputgProcessing) { +DEBUG_ONLY_TEST_F(HashJoinTest, hashBuildAbortDuringInputProcessing) { constexpr int64_t kMaxBytes = 1LL << 30; // 1GB VectorFuzzer fuzzer({.vectorSize = 1000}, pool()); const int32_t numBuildVectors = 10; @@ -6219,7 +6120,6 @@ TEST_F(HashJoinTest, leftJoinWithMissAtEndOfBatch) { .planNode(plan) .injectSpill(false) .checkSpillStats(false) - .spillMemoryThreshold(1) .maxSpillLevel(0) .numDrivers(1) .config( @@ -6271,7 +6171,6 @@ TEST_F(HashJoinTest, leftJoinWithMissAtEndOfBatchMultipleBuildMatches) { .planNode(plan) .injectSpill(false) .checkSpillStats(false) - .spillMemoryThreshold(1) .maxSpillLevel(0) .numDrivers(1) .config( @@ -6350,7 +6249,6 @@ DEBUG_ONLY_TEST_F(HashJoinTest, minSpillableMemoryReservation) { } TEST_F(HashJoinTest, exceededMaxSpillLevel) { - constexpr int64_t kMaxBytes = 1LL << 30; // 1GB VectorFuzzer fuzzer({.vectorSize = 1000}, pool()); const int32_t numBuildVectors = 10; std::vector buildVectors; @@ -6383,12 +6281,17 @@ TEST_F(HashJoinTest, exceededMaxSpillLevel) { auto tempDirectory = exec::test::TempDirectoryPath::create(); const int exceededMaxSpillLevelCount = common::globalSpillStats().spillMaxLevelExceededCount; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::HashBuild::addInput", + std::function(([&](exec::HashBuild* hashBuild) { + Operator::ReclaimableSectionGuard guard(hashBuild); + testingRunArbitration(hashBuild->pool()); + }))); HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) .numDrivers(1) .planNode(plan) - .injectSpill(false) // Always trigger spilling. - .spillMemoryThreshold(1) + .injectSpill(false) .maxSpillLevel(0) .spillDirectory(tempDirectory->path) .referenceQuery( @@ -6444,13 +6347,12 @@ TEST_F(HashJoinTest, maxSpillBytes) { for (const auto& testData : testSettings) { SCOPED_TRACE(testData.debugString()); try { + TestScopedSpillInjection scopedSpillInjection(100); AssertQueryBuilder(plan) .spillDirectory(spillDirectory->path) .queryCtx(queryCtx) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kJoinSpillEnabled, true) - // Set a small capacity to trigger threshold based spilling - .config(core::QueryConfig::kJoinSpillMemoryThreshold, 5 << 20) .config(core::QueryConfig::kMaxSpillBytes, testData.maxSpilledBytes) .copyResults(pool_.get()); ASSERT_FALSE(testData.expectedExceedLimit); @@ -6464,6 +6366,7 @@ TEST_F(HashJoinTest, maxSpillBytes) { e.errorCode(), facebook::velox::error_code::kSpillLimitExceeded); } } + waitForAllTasksToBeDeleted(); } TEST_F(HashJoinTest, onlyHashBuildMaxSpillBytes) { @@ -6501,13 +6404,12 @@ TEST_F(HashJoinTest, onlyHashBuildMaxSpillBytes) { for (const auto& testData : testSettings) { SCOPED_TRACE(testData.debugString()); try { + TestScopedSpillInjection scopedSpillInjection(100); AssertQueryBuilder(plan) .spillDirectory(spillDirectory->path) .queryCtx(queryCtx) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kJoinSpillEnabled, true) - // Set a small capacity to trigger threshold based spilling - .config(core::QueryConfig::kJoinSpillMemoryThreshold, 5 << 20) .config(core::QueryConfig::kMaxSpillBytes, testData.maxSpilledBytes) .copyResults(pool_.get()); ASSERT_FALSE(testData.expectedExceedLimit); @@ -6682,8 +6584,6 @@ TEST_F(HashJoinTest, reclaimFromCompletedJoinBuilder) { } TEST_F(HashJoinTest, reclaimFromJoinBuilderWithMultiDrivers) { - std::unique_ptr memoryManager = createMemoryManager(); - const auto& arbitrator = memoryManager->arbitrator(); auto rowType = ROW({ {"c0", INTEGER()}, {"c1", INTEGER()}, @@ -6691,13 +6591,30 @@ TEST_F(HashJoinTest, reclaimFromJoinBuilderWithMultiDrivers) { }); const auto vectors = createVectors(rowType, 64 << 20, fuzzerOpts_); const int numDrivers = 4; + + memory::MemoryManagerOptions options; + options.allocatorCapacity = 8L << 30; + auto memoryManagerWithoutArbitrator = + std::make_unique(options); const auto expectedResult = - runHashJoinTask(vectors, nullptr, numDrivers, pool(), false).data; + runHashJoinTask( + vectors, + newQueryCtx(memoryManagerWithoutArbitrator, executor_, 8L << 30), + numDrivers, + pool(), + false) + .data; + + auto memoryManagerWithArbitrator = createMemoryManager(); + const auto& arbitrator = memoryManagerWithArbitrator->arbitrator(); // Create a query ctx with a small capacity to trigger spilling. - std::shared_ptr queryCtx = - newQueryCtx(memoryManager, executor_, 128 << 20); auto result = runHashJoinTask( - vectors, queryCtx, numDrivers, pool(), true, expectedResult); + vectors, + newQueryCtx(memoryManagerWithArbitrator, executor_, 128 << 20), + numDrivers, + pool(), + true, + expectedResult); auto taskStats = exec::toPlanStats(result.task->taskStats()); auto& planStats = taskStats.at(result.planNodeId); ASSERT_GT(planStats.spilledBytes, 0); diff --git a/velox/exec/tests/utils/OperatorTestBase.cpp b/velox/exec/tests/utils/OperatorTestBase.cpp index 4f1ae5c15238c..0faba034d2107 100644 --- a/velox/exec/tests/utils/OperatorTestBase.cpp +++ b/velox/exec/tests/utils/OperatorTestBase.cpp @@ -20,7 +20,6 @@ #include "velox/common/memory/MallocAllocator.h" #include "velox/common/memory/SharedArbitrator.h" #include "velox/common/testutil/TestValue.h" -#include "velox/dwio/common/FileSink.h" #include "velox/exec/Exchange.h" #include "velox/exec/OutputBufferManager.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h"