Skip to content

Commit

Permalink
Fix hash build not able to reclaim in finishHashBuild
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Sep 16, 2024
1 parent 696f7d9 commit 40918a1
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 32 deletions.
40 changes: 12 additions & 28 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,11 @@ bool HashBuild::finishHashBuild() {

std::vector<HashBuild*> otherBuilds;
otherBuilds.reserve(peers.size());
uint64_t numRows{0};
{
std::lock_guard<std::mutex> l(mutex_);
numRows += table_->rows()->numRows();
}
for (auto& peer : peers) {
auto op = peer->findOperator(planNodeId());
HashBuild* build = dynamic_cast<HashBuild*>(op);
Expand All @@ -695,10 +700,13 @@ bool HashBuild::finishHashBuild() {
!build->stateCleared_,
"Internal state for a peer is empty. It might have already"
" been closed.");
numRows += build->table_->rows()->numRows();
}
otherBuilds.push_back(build);
}

ensureTableFits(numRows);

std::vector<std::unique_ptr<BaseHashTable>> otherTables;
otherTables.reserve(peers.size());
SpillPartitionSet spillPartitions;
Expand Down Expand Up @@ -730,7 +738,6 @@ bool HashBuild::finishHashBuild() {
// it might decide it is not going to trigger parallel join build.
const bool allowParallelJoinBuild =
!otherTables.empty() && spillPartitions.empty();
ensureTableFits(otherBuilds, otherTables, allowParallelJoinBuild);

SCOPE_EXIT {
// Make a guard to release the unused memory reservation since we have
Expand Down Expand Up @@ -773,16 +780,13 @@ bool HashBuild::finishHashBuild() {
return true;
}

void HashBuild::ensureTableFits(
const std::vector<HashBuild*>& otherBuilds,
const std::vector<std::unique_ptr<BaseHashTable>>& otherTables,
bool isParallelJoin) {
void HashBuild::ensureTableFits(uint64_t numRows) {
// NOTE: we don't need memory reservation if all the partitions have been
// spilled as nothing need to be built.
if (!spillEnabled() || spiller_ == nullptr || spiller_->isAllSpilled()) {
if (!spillEnabled() || spiller_ == nullptr || spiller_->isAllSpilled() ||
numRows == 0) {
return;
}
VELOX_CHECK_EQ(otherBuilds.size(), otherTables.size());

// Test-only spill path.
if (testingTriggerSpill(pool()->name())) {
Expand All @@ -793,32 +797,12 @@ void HashBuild::ensureTableFits(

TestValue::adjust("facebook::velox::exec::HashBuild::ensureTableFits", this);

uint64_t totalNumRows{0};
{
std::lock_guard<std::mutex> l(mutex_);
totalNumRows += table_->rows()->numRows();
}

for (auto i = 0; i < otherTables.size(); ++i) {
auto& otherTable = otherTables[i];
VELOX_CHECK_NOT_NULL(otherTable);
auto& otherBuild = otherBuilds[i];
{
std::lock_guard<std::mutex> l(otherBuild->mutex_);
totalNumRows += otherTable->rows()->numRows();
}
}

if (totalNumRows == 0) {
return;
}

// NOTE: reserve a bit more memory to consider the extra memory used for
// parallel table build operation.
//
// TODO: make this query configurable.
const uint64_t memoryBytesToReserve =
table_->estimateHashTableSize(totalNumRows) * 1.1;
table_->estimateHashTableSize(numRows) * 1.1;
{
Operator::ReclaimableSectionGuard guard(this);
if (pool()->maybeReserve(memoryBytesToReserve)) {
Expand Down
5 changes: 1 addition & 4 deletions velox/exec/HashBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,7 @@ class HashBuild final : public Operator {

// Invoked to ensure there is sufficient memory to build the join table. The
// function throws to fail the query if the memory reservation fails.
void ensureTableFits(
const std::vector<HashBuild*>& otherBuilds,
const std::vector<std::unique_ptr<BaseHashTable>>& otherTables,
bool isParallelJoin);
void ensureTableFits(uint64_t numRows);

// Invoked to compute spill partitions numbers for each row 'input' and spill
// rows to spiller directly if the associated partition(s) is spilling. The
Expand Down
28 changes: 28 additions & 0 deletions velox/exec/tests/HashJoinTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6217,6 +6217,34 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringWaitForProbe) {
ASSERT_EQ(reclaimerStats_.numNonReclaimableAttempts, 1);
}

DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringFinishHashBuild) {
// Use manual spill injection other than spill injection framework. This is
// because spill injection framework does not allow fine grain spill within a
// single operator (We do not want to spill during addInput() but only during
// finishHashBuild()).
SCOPED_TESTVALUE_SET(
"facebook::velox::exec::HashBuild::ensureTableFits",
std::function<void(Operator*)>(([&](Operator* op) {
Operator::ReclaimableSectionGuard guard(op);
memory::testingRunArbitration(op->pool());
})));
auto tempDirectory = exec::test::TempDirectoryPath::create();
HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get())
.numDrivers(numDrivers_)
.injectSpill(false)
.spillDirectory(tempDirectory->getPath())
.keyTypes({BIGINT()})
.probeVectors(1600, 5)
.buildVectors(1500, 5)
.referenceQuery(
"SELECT t_k0, t_data, u_k0, u_data FROM t, u WHERE t.t_k0 = u.u_k0")
.verifier([&](const std::shared_ptr<Task>& task, bool /*unused*/) {
const auto statsPair = taskSpilledStats(*task);
ASSERT_GT(statsPair.first.spilledBytes, 0);
})
.run();
}

DEBUG_ONLY_TEST_F(HashJoinTest, hashBuildAbortDuringOutputProcessing) {
const auto buildVectors = makeVectors(buildType_, 10, 128);
const auto probeVectors = makeVectors(probeType_, 5, 128);
Expand Down

0 comments on commit 40918a1

Please sign in to comment.