diff --git a/velox/exec/HashBuild.cpp b/velox/exec/HashBuild.cpp index 0b7e64fcdf70..201a806fca75 100644 --- a/velox/exec/HashBuild.cpp +++ b/velox/exec/HashBuild.cpp @@ -678,7 +678,6 @@ bool HashBuild::finishHashBuild() { std::vector otherBuilds; otherBuilds.reserve(peers.size()); - uint64_t numRows = table_->rows()->numRows(); for (auto& peer : peers) { auto op = peer->findOperator(planNodeId()); HashBuild* build = dynamic_cast(op); @@ -696,13 +695,10 @@ bool HashBuild::finishHashBuild() { !build->stateCleared_, "Internal state for a peer is empty. It might have already" " been closed."); - numRows += build->table_->rows()->numRows(); } otherBuilds.push_back(build); } - ensureTableFits(numRows); - std::vector> otherTables; otherTables.reserve(peers.size()); SpillPartitionSet spillPartitions; @@ -723,20 +719,34 @@ bool HashBuild::finishHashBuild() { spiller->finishSpill(spillPartitions); } } - bool allowDuplicateRows = table_->rows()->nextOffset() != 0; - if (allowDuplicateRows) { - ensureNextRowVectorFits(numRows, otherBuilds); - } if (spiller_ != nullptr) { spiller_->finishSpill(spillPartitions); removeEmptyPartitions(spillPartitions); } - // TODO: re-enable parallel join build with spilling triggered after - // https://github.com/facebookincubator/velox/issues/3567 is fixed. + // TODO: Get accurate signal if parallel join build is going to be applied + // from hash table. Currently there is still a chance inside hash table that + // it might decide it is not going to trigger parallel join build. const bool allowParallelJoinBuild = !otherTables.empty() && spillPartitions.empty(); + ensureTableFits(otherBuilds, otherTables, allowParallelJoinBuild); + + SCOPE_EXIT { + // Make a guard to release the unused memory reservation since we have + // finished the merged table build. The guard makes sure we release the + // memory reserved for other operators even when exceptions are thrown to + // prevent memory leak. We cannot rely on other operator's cleanup mechanism + // because when exceptions are thrown, other operator's cleanup mechanism + // might already have finished. + pool()->release(); + for (auto* build : otherBuilds) { + build->pool()->release(); + } + }; + + // TODO: Re-enable parallel join build with spilling triggered after + // https://github.com/facebookincubator/velox/issues/3567 is fixed. CpuWallTiming timing; { CpuWallTimer cpuWallTimer{timing}; @@ -757,25 +767,19 @@ bool HashBuild::finishHashBuild() { if (spillEnabled()) { stateCleared_ = true; } - - // Release the unused memory reservation since we have finished the merged - // table build. - pool()->release(); - if (allowDuplicateRows) { - for (auto* build : otherBuilds) { - build->pool()->release(); - } - } return true; } -void HashBuild::ensureTableFits(uint64_t numRows) { +void HashBuild::ensureTableFits( + const std::vector& otherBuilds, + const std::vector>& otherTables, + bool isParallelJoin) { // NOTE: we don't need memory reservation if all the partitions have been // spilled as nothing need to be built. - if (!spillEnabled() || spiller_ == nullptr || spiller_->isAllSpilled() || - numRows == 0) { + if (!spillEnabled() || spiller_ == nullptr || spiller_->isAllSpilled()) { return; } + VELOX_CHECK_EQ(otherBuilds.size(), otherTables.size()); // Test-only spill path. if (testingTriggerSpill(pool()->name())) { @@ -784,58 +788,82 @@ void HashBuild::ensureTableFits(uint64_t numRows) { return; } - // NOTE: reserve a bit more memory to consider the extra memory used for - // parallel table build operation. - const uint64_t bytesToReserve = table_->estimateHashTableSize(numRows) * 1.1; + TestValue::adjust("facebook::velox::exec::HashBuild::ensureTableFits", this); + + const auto dupRowOverheadBytes = sizeof(char*) + sizeof(NextRowVector); + + uint64_t totalNumRows{0}; + uint64_t lastBuildBytesToReserve{0}; + bool allowDuplicateRows{false}; { - Operator::ReclaimableSectionGuard guard(this); - TestValue::adjust( - "facebook::velox::exec::HashBuild::ensureTableFits", this); - if (pool()->maybeReserve(bytesToReserve)) { - return; + std::lock_guard l(mutex_); + const auto numRows = table_->rows()->numRows(); + totalNumRows += numRows; + allowDuplicateRows = table_->rows()->nextOffset() != 0; + if (allowDuplicateRows) { + lastBuildBytesToReserve += numRows * dupRowOverheadBytes; } } - LOG(WARNING) << "Failed to reserve " << succinctBytes(bytesToReserve) - << " for memory pool " << pool()->name() - << ", usage: " << succinctBytes(pool()->usedBytes()) - << ", reservation: " << succinctBytes(pool()->reservedBytes()); -} + for (auto i = 0; i < otherTables.size(); i++) { + auto& otherTable = otherTables[i]; + VELOX_CHECK_NOT_NULL(otherTable); + auto& otherBuild = otherBuilds[i]; + const auto& rowContainer = otherTable->rows(); + int64_t numRows{0}; + { + std::lock_guard l(otherBuild->mutex_); + numRows = rowContainer->numRows(); + } + if (numRows == 0) { + continue; + } -void HashBuild::ensureNextRowVectorFits( - uint64_t numRows, - const std::vector& otherBuilds) { - if (!spillEnabled()) { - return; + totalNumRows += numRows; + if (!allowDuplicateRows) { + continue; + } + + const auto dupRowBytesToReserve = numRows * dupRowOverheadBytes; + if (!isParallelJoin) { + lastBuildBytesToReserve += dupRowBytesToReserve; + continue; + } + + Operator::ReclaimableSectionGuard guard(otherBuild); + auto* otherPool = otherBuild->pool(); + + // Reserve memory for memory allocations for next-row-vectors in + // otherBuild operators if it is parallel join build. Otherwise all + // next-row-vectors shall be allocated from the last build operator. + if (!otherPool->maybeReserve(dupRowBytesToReserve)) { + LOG(WARNING) + << "Failed to reserve " << succinctBytes(dupRowBytesToReserve) + << " for for duplicate row memory allocation from non-last memory pool " + << otherPool->name() + << ", usage: " << succinctBytes(otherPool->usedBytes()) + << ", reservation: " << succinctBytes(otherPool->reservedBytes()); + } } - TestValue::adjust( - "facebook::velox::exec::HashBuild::ensureNextRowVectorFits", this); + if (totalNumRows == 0) { + return; + } - // The memory allocation for next-row-vectors may stuck in - // 'SharedArbitrator::growCapacity' when memory arbitrating is also - // triggered. Reserve memory for next-row-vectors to prevent this issue. - auto bytesToReserve = numRows * (sizeof(char*) + sizeof(NextRowVector)); + // NOTE: reserve a bit more memory to consider the extra memory used for + // parallel table build operation. + lastBuildBytesToReserve += table_->estimateHashTableSize(totalNumRows) * 1.1; { Operator::ReclaimableSectionGuard guard(this); - if (!pool()->maybeReserve(bytesToReserve)) { - LOG(WARNING) << "Failed to reserve " << succinctBytes(bytesToReserve) - << " for memory pool " << pool()->name() - << ", usage: " << succinctBytes(pool()->usedBytes()) - << ", reservation: " - << succinctBytes(pool()->reservedBytes()); - } - } - for (auto* build : otherBuilds) { - Operator::ReclaimableSectionGuard guard(build); - if (!build->pool()->maybeReserve(bytesToReserve)) { - LOG(WARNING) << "Failed to reserve " << succinctBytes(bytesToReserve) - << " for memory pool " << build->pool()->name() - << ", usage: " << succinctBytes(build->pool()->usedBytes()) - << ", reservation: " - << succinctBytes(build->pool()->reservedBytes()); + if (pool()->maybeReserve(lastBuildBytesToReserve)) { + return; } } + + LOG(WARNING) << "Failed to reserve " << succinctBytes(lastBuildBytesToReserve) + << " for last build memory pool " << pool()->name() + << ", usage: " << succinctBytes(pool()->usedBytes()) + << ", reservation: " << succinctBytes(pool()->reservedBytes()); } void HashBuild::postHashBuildProcess() { diff --git a/velox/exec/HashBuild.h b/velox/exec/HashBuild.h index 53461bcf081f..562e57a73e0c 100644 --- a/velox/exec/HashBuild.h +++ b/velox/exec/HashBuild.h @@ -141,17 +141,12 @@ class HashBuild final : public Operator { // enabled. void ensureInputFits(RowVectorPtr& input); - // Invoked to ensure there is sufficient memory to build the join table with - // the specified 'numRows' if spilling is enabled. The function throws to fail - // the query if the memory reservation fails. - void ensureTableFits(uint64_t numRows); - - // Invoked to ensure there is sufficient memory to build the next-row-vectors - // with the specified 'numRows' if spilling is enabled. The function throws to - // fail the query if the memory reservation fails. - void ensureNextRowVectorFits( - uint64_t numRows, - const std::vector& otherBuilds); + // Invoked to ensure there is sufficient memory to build the join table. The + // function throws to fail the query if the memory reservation fails. + void ensureTableFits( + const std::vector& otherBuilds, + const std::vector>& otherTables, + bool isParallelJoin); // Invoked to compute spill partitions numbers for each row 'input' and spill // rows to spiller directly if the associated partition(s) is spilling. The diff --git a/velox/exec/tests/HashJoinTest.cpp b/velox/exec/tests/HashJoinTest.cpp index e43654f3df71..c5986183b270 100644 --- a/velox/exec/tests/HashJoinTest.cpp +++ b/velox/exec/tests/HashJoinTest.cpp @@ -7077,6 +7077,91 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringTableBuild) { .run(); } +DEBUG_ONLY_TEST_F(HashJoinTest, exceptionDuringFinishJoinBuild) { + // This test is to make sure there is no memory leak when exceptions are + // thrown while parallelly preparing join table. + auto memoryManager = memory::memoryManager(); + const auto& arbitrator = memoryManager->arbitrator(); + const uint64_t numDrivers = 2; + const auto expectedFreeCapacityBytes = arbitrator->stats().freeCapacityBytes; + + const uint64_t numBuildSideRows = 500; + auto buildKeyVector = makeFlatVector( + numBuildSideRows, + [](vector_size_t row) { return folly::Random::rand64(); }); + auto buildSideVector = + makeRowVector({"b0", "b1"}, {buildKeyVector, buildKeyVector}); + std::vector buildSideVectors; + for (int i = 0; i < numDrivers; ++i) { + buildSideVectors.push_back(buildSideVector); + } + createDuckDbTable("build", buildSideVectors); + + const uint64_t numProbeSideRows = 10; + auto probeKeyVector = makeFlatVector( + numProbeSideRows, + [&](vector_size_t row) { return buildKeyVector->valueAt(row); }); + auto probeSideVector = + makeRowVector({"p0", "p1"}, {probeKeyVector, probeKeyVector}); + std::vector probeSideVectors; + for (int i = 0; i < numDrivers; ++i) { + probeSideVectors.push_back(probeSideVector); + } + createDuckDbTable("probe", probeSideVectors); + + ASSERT_EQ(arbitrator->stats().freeCapacityBytes, expectedFreeCapacityBytes); + + // We set the task to fail right before we reserve memory for other operators. + // We rely on the driver suspension before parallel join build to throw + // exceptions (suspension on an already terminated task throws). + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::HashBuild::ensureTableFits", + std::function([&](HashBuild* buildOp) { + try { + VELOX_FAIL("Simulated failure"); + } catch (VeloxException& e) { + buildOp->testingOperatorCtx()->task()->setError( + std::current_exception()); + } + })); + + std::vector probeInput = {probeSideVector}; + std::vector buildInput = {buildSideVector}; + auto planNodeIdGenerator = std::make_shared(); + const auto spillDirectory = exec::test::TempDirectoryPath::create(); + + ASSERT_EQ(arbitrator->stats().freeCapacityBytes, expectedFreeCapacityBytes); + VELOX_ASSERT_THROW( + AssertQueryBuilder(duckDbQueryRunner_) + .spillDirectory(spillDirectory->getPath()) + .config(core::QueryConfig::kSpillEnabled, true) + .config(core::QueryConfig::kJoinSpillEnabled, true) + .queryCtx( + newQueryCtx(memoryManager, executor_.get(), kMemoryCapacity)) + .maxDrivers(numDrivers) + .plan(PlanBuilder(planNodeIdGenerator) + .values(probeInput, true) + .hashJoin( + {"p0"}, + {"b0"}, + PlanBuilder(planNodeIdGenerator) + .values(buildInput, true) + .planNode(), + "", + {"p0", "p1", "b0", "b1"}, + core::JoinType::kInner) + .planNode()) + .assertResults( + "SELECT probe.p0, probe.p1, build.b0, build.b1 FROM probe " + "INNER JOIN build ON probe.p0 = build.b0"), + "Simulated failure"); + // This test uses on-demand created memory manager instead of the global + // one. We need to make sure any used memory got cleaned up before exiting + // the scope + waitForAllTasksToBeDeleted(); + ASSERT_EQ(arbitrator->stats().freeCapacityBytes, expectedFreeCapacityBytes); +} + DEBUG_ONLY_TEST_F(HashJoinTest, arbitrationTriggeredDuringParallelJoinBuild) { std::unique_ptr memoryManager = createMemoryManager(); const auto& arbitrator = memoryManager->arbitrator(); @@ -7165,8 +7250,11 @@ DEBUG_ONLY_TEST_F(HashJoinTest, arbitrationTriggeredByEnsureJoinTableFit) { .injectSpill(false) .verifier([&](const std::shared_ptr& task, bool /*unused*/) { auto opStats = toOperatorStats(task->taskStats()); - ASSERT_GT(opStats.at("HashProbe").spilledBytes, 0); - ASSERT_GT(opStats.at("HashBuild").spilledBytes, 0); + ASSERT_GT( + opStats.at("HashBuild") + .runtimeStats["memoryArbitrationWallNanos"] + .count, + 0); }) .run(); }