Skip to content

Commit

Permalink
Remove threshold based spilling from HashBuild join
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Mar 5, 2024
1 parent e4f33f4 commit 89e28a9
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 336 deletions.
154 changes: 19 additions & 135 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,10 @@ HashBuild::HashBuild(
joinBridge_(operatorCtx_->task()->getHashJoinBridgeLocked(
operatorCtx_->driverCtx()->splitGroupId,
planNodeId())),
spillMemoryThreshold_(
operatorCtx_->driverCtx()->queryConfig().joinSpillMemoryThreshold()),
keyChannelMap_(joinNode_->rightKeys().size()) {
VELOX_CHECK(pool()->trackUsage());
VELOX_CHECK_NOT_NULL(joinBridge_);

spillGroup_ = spillEnabled()
? operatorCtx_->task()->getSpillOperatorGroupLocked(
operatorCtx_->driverCtx()->splitGroupId, planNodeId())
: nullptr;

joinBridge_->addBuilder();

auto inputType = joinNode_->sources()[1]->outputType();
Expand Down Expand Up @@ -208,12 +201,8 @@ void HashBuild::setupSpiller(SpillPartition* spillPartition) {
HashBitRange hashBits(
spillConfig.startPartitionBit,
spillConfig.startPartitionBit + spillConfig.joinPartitionBits);
if (spillPartition == nullptr) {
spillGroup_->addOperator(
*this,
operatorCtx_->driver()->shared_from_this(),
[&](const std::vector<Operator*>& operators) { runSpill(operators); });
} else {

if (spillPartition != nullptr) {
LOG(INFO) << "Setup reader to read spilled input from "
<< spillPartition->toString()
<< ", memory pool: " << pool()->name();
Expand Down Expand Up @@ -316,12 +305,7 @@ void HashBuild::removeInputRowsForAntiJoinFilter() {

void HashBuild::addInput(RowVectorPtr input) {
checkRunning();

if (!ensureInputFits(input)) {
VELOX_CHECK_NOT_NULL(input_);
VELOX_CHECK(future_.valid());
return;
}
ensureInputFits(input);

TestValue::adjust("facebook::velox::exec::HashBuild::addInput", this);

Expand Down Expand Up @@ -425,37 +409,20 @@ void HashBuild::addInput(RowVectorPtr input) {
});
}

bool HashBuild::ensureInputFits(RowVectorPtr& input) {
void HashBuild::ensureInputFits(RowVectorPtr& input) {
// NOTE: we don't need memory reservation if all the partitions are spilling
// as we spill all the input rows to disk directly.
if (!spillEnabled() || spiller_ == nullptr || spiller_->isAllSpilled()) {
return true;
return;
}

// NOTE: we simply reserve memory all inputs even though some of them are
// spilling directly. It is okay as we will accumulate the extra reservation
// in the operator's memory pool, and won't make any new reservation if there
// is already sufficient reservations.
if (!reserveMemory(input)) {
if (!requestSpill(input)) {
return false;
}
} else {
// Check if any other peer operator has requested group spill.
if (waitSpill(input)) {
return false;
}
}
return true;
}

bool HashBuild::reserveMemory(const RowVectorPtr& input) {
VELOX_CHECK(spillEnabled());

Operator::ReclaimableSectionGuard guard(this);
numSpillRows_ = 0;
numSpillBytes_ = 0;

auto* rows = table_->rows();
const auto numRows = rows->numRows();

Expand All @@ -472,19 +439,7 @@ bool HashBuild::reserveMemory(const RowVectorPtr& input) {
memory::testingRunArbitration(pool());
// NOTE: the memory arbitration should have triggered spilling on this
// hash build operator so we return true to indicate have enough memory.
return true;
}

// We check usage from the parent pool to take peers' allocations into
// account.
const auto nodeUsage = pool()->parent()->currentBytes();
if (spillMemoryThreshold_ != 0 && nodeUsage > spillMemoryThreshold_) {
const int64_t bytesToSpill =
nodeUsage * spillConfig()->spillableReservationGrowthPct / 100;
numSpillRows_ = std::max<int64_t>(
1, bytesToSpill / (rows->fixedRowSize() + outOfLineBytesPerRow));
numSpillBytes_ = numSpillRows_ * outOfLineBytesPerRow;
return false;
return;
}
}

Expand All @@ -506,14 +461,14 @@ bool HashBuild::reserveMemory(const RowVectorPtr& input) {
// Enough free rows for input rows and enough variable length free
// space for the flat size of the whole vector. If outOfLineBytes
// is 0 there is no need for variable length space.
return true;
return;
}

// If there is variable length data we take the flat size of the
// input as a cap on the new variable length data needed. There must be at
// least 2x the increments in reservation.
if (pool()->availableReservation() > 2 * incrementBytes) {
return true;
return;
}
}

Expand All @@ -524,15 +479,12 @@ bool HashBuild::reserveMemory(const RowVectorPtr& input) {
incrementBytes * 2,
currentUsage * spillConfig_->spillableReservationGrowthPct / 100);

if (pool()->maybeReserve(targetIncrementBytes)) {
return true;
if (!pool()->maybeReserve(targetIncrementBytes)) {
LOG(WARNING) << "Failed to reserve " << succinctBytes(targetIncrementBytes)
<< " for memory pool " << pool()->name()
<< ", usage: " << succinctBytes(pool()->currentBytes())
<< ", reservation: " << succinctBytes(pool()->reservedBytes());
}

LOG(WARNING) << "Failed to reserve " << succinctBytes(targetIncrementBytes)
<< " for memory pool " << pool()->name()
<< ", usage: " << succinctBytes(pool()->currentBytes())
<< ", reservation: " << succinctBytes(pool()->reservedBytes());
return true;
}

void HashBuild::spillInput(const RowVectorPtr& input) {
Expand All @@ -548,17 +500,18 @@ void HashBuild::spillInput(const RowVectorPtr& input) {
computeSpillPartitions(input);

vector_size_t numSpillInputs = 0;
for (auto row = 0; row < numInput; ++row) {
const auto partition = spillPartitions_[row];
if (FOLLY_UNLIKELY(!activeRows_.isValid(row))) {
for (auto rowIdx = 0; rowIdx < numInput; ++rowIdx) {
const auto partition = spillPartitions_[rowIdx];
if (FOLLY_UNLIKELY(!activeRows_.isValid(rowIdx))) {
continue;
}
if (!spiller_->isSpilled(partition)) {
continue;
}
activeRows_.setValid(row, false);
activeRows_.setValid(rowIdx, false);
++numSpillInputs;
rawSpillInputIndicesBuffers_[partition][numSpillInputs_[partition]++] = row;
rawSpillInputIndicesBuffers_[partition][numSpillInputs_[partition]++] =
rowIdx;
}
if (numSpillInputs == 0) {
return;
Expand Down Expand Up @@ -644,70 +597,6 @@ void HashBuild::spillPartition(
}
}

bool HashBuild::requestSpill(RowVectorPtr& input) {
VELOX_CHECK_GT(numSpillRows_, 0);
VELOX_CHECK_GT(numSpillBytes_, 0);

// If all the partitions have been spilled, then nothing to spill.
if (spiller_->isAllSpilled()) {
return true;
}

input_ = std::move(input);
if (spillGroup_->requestSpill(*this, future_)) {
VELOX_CHECK(future_.valid());
setState(State::kWaitForSpill);
return false;
}
input = std::move(input_);
return true;
}

bool HashBuild::waitSpill(RowVectorPtr& input) {
if (!spillGroup_->needSpill()) {
return false;
}

if (spillGroup_->waitSpill(*this, future_)) {
VELOX_CHECK(future_.valid());
input_ = std::move(input);
setState(State::kWaitForSpill);
return true;
}
return false;
}

void HashBuild::runSpill(const std::vector<Operator*>& spillOperators) {
VELOX_CHECK(spillEnabled());
VELOX_CHECK(!spiller_->state().isAllPartitionSpilled());

uint64_t targetRows = 0;
uint64_t targetBytes = 0;
for (auto& spillOp : spillOperators) {
HashBuild* build = dynamic_cast<HashBuild*>(spillOp);
VELOX_CHECK_NOT_NULL(build);
build->addAndClearSpillTarget(targetRows, targetBytes);
}
VELOX_CHECK_GT(targetRows, 0);
VELOX_CHECK_GT(targetBytes, 0);

// TODO: consider to offload the partition spill processing to an executor to
// run in parallel.
for (auto& spillOp : spillOperators) {
HashBuild* build = dynamic_cast<HashBuild*>(spillOp);
build->spiller_->spill();
build->table_->clear();
build->pool()->release();
}
}

void HashBuild::addAndClearSpillTarget(uint64_t& numRows, uint64_t& numBytes) {
numRows += numSpillRows_;
numSpillRows_ = 0;
numBytes += numSpillBytes_;
numSpillBytes_ = 0;
}

void HashBuild::noMoreInput() {
checkRunning();

Expand All @@ -720,10 +609,6 @@ void HashBuild::noMoreInput() {
}

void HashBuild::noMoreInputInternal() {
if (spillEnabled()) {
spillGroup_->operatorStopped(*this);
}

if (!finishHashBuild()) {
return;
}
Expand Down Expand Up @@ -847,7 +732,6 @@ bool HashBuild::finishHashBuild() {
if (joinBridge_->setHashTable(
std::move(table_), std::move(spillPartitions), joinHasNullKeys_)) {
intermediateStateCleared_ = true;
spillGroup_->restart();
}

// Release the unused memory reservation since we have finished the merged
Expand Down
56 changes: 3 additions & 53 deletions velox/exec/HashBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,7 @@ class HashBuild final : public Operator {
// Invoked when operator has finished processing the build input and wait for
// all the other drivers to finish the processing. The last driver that
// reaches to the hash build barrier, is responsible to build the hash table
// merged from all the other drivers. If the disk spilling is enabled, the
// last driver will also restart 'spillGroup_' and add a new hash build
// barrier for the next round of hash table build operation if it needs.
// merged from all the other drivers.
bool finishHashBuild();

// Invoked after the hash table has been built. It waits for any spill data to
Expand Down Expand Up @@ -142,27 +140,14 @@ class HashBuild final : public Operator {

// Invoked to ensure there is a sufficient memory to process 'input' by
// reserving a sufficient amount of memory in advance if disk spilling is
// enabled. The function returns true if the disk spilling is not enabled, or
// the memory reservation succeeds. If the memory reservation fails, the
// function will trigger a group spill which needs coordination among the
// other build drivers in the same group. The function returns true if the
// group spill has been inline executed which could happen if there is only
// one driver in the group, or it happens that all the other drivers have
// also requested group spill and this driver is the last one to reach the
// group spill barrier. Otherwise, the function returns false to wait for the
// group spill to run. The operator will transition to 'kWaitForSpill' state
// accordingly.
bool ensureInputFits(RowVectorPtr& input);
// enabled.
void ensureInputFits(RowVectorPtr& input);

// Invoked to ensure there is sufficient memory to build the join table with
// the specified 'numRows' if spilling is enabled. The function throws to fail
// the query if the memory reservation fails.
void ensureTableFits(uint64_t numRows);

// Invoked to reserve memory for 'input' if disk spilling is enabled. The
// function returns true on success, otherwise false.
bool reserveMemory(const RowVectorPtr& input);

// Invoked to compute spill partitions numbers for each row 'input' and spill
// rows to spiller directly if the associated partition(s) is spilling. The
// function will skip processing if disk spilling is not enabled or there is
Expand Down Expand Up @@ -190,28 +175,6 @@ class HashBuild final : public Operator {
vector_size_t numInput,
const SpillPartitionNumSet& spillPartitions);

// Invoked to send group spill request to 'spillGroup_'. The function returns
// true if group spill has been inline executed, otherwise returns false. In
// the latter case, the operator will transition to 'kWaitForSpill' state and
// 'input' will be saved in 'input_' to be processed after the group spill has
// been executed.
bool requestSpill(RowVectorPtr& input);

// Invoked to check if it needs to wait for any pending group spill to run.
// The function returns true if it needs to wait, otherwise false. The latter
// case is either because there is no pending group spill or this operator is
// the last one to reach to the group spill barrier and execute the group
// spill inline.
bool waitSpill(RowVectorPtr& input);

// The callback registered to 'spillGroup_' to run group spill on
// 'spillOperators'.
void runSpill(const std::vector<Operator*>& spillOperators);

// Invoked by 'runSpill' to sum up the spill targets from all the operators in
// 'numRows' and 'numBytes'.
void addAndClearSpillTarget(uint64_t& numRows, uint64_t& numBytes);

// Invoked to reset the operator state to restore previously spilled data. It
// setup (recursive) spiller and spill input reader from 'spillInput' received
// from 'joinBride_'. 'spillInput' contains a shard of previously spilled
Expand Down Expand Up @@ -248,14 +211,8 @@ class HashBuild final : public Operator {

std::shared_ptr<HashJoinBridge> joinBridge_;

// The maximum memory usage that a hash build can hold before spilling.
// If it is zero, then there is no such limit.
const uint64_t spillMemoryThreshold_;

bool exceededMaxSpillLevelLimit_{false};

std::shared_ptr<SpillOperatorGroup> spillGroup_;

State state_{State::kRunning};

// The row type used for hash table build and disk spilling.
Expand Down Expand Up @@ -304,13 +261,6 @@ class HashBuild final : public Operator {
// at least one entry with null join keys.
bool joinHasNullKeys_{false};

// The spill targets set by 'requestSpill()' to request group spill.
uint64_t numSpillRows_{0};
uint64_t numSpillBytes_{0};

// This can be nullptr if either spilling is not allowed or it has been
// trsnaferred to the last hash build operator while in kWaitForBuild state or
// it has been cleared to setup a new one for recursive spilling.
std::unique_ptr<Spiller> spiller_;

// Used to read input from previously spilled data for restoring.
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ class Operator : public BaseRuntimeStatWriter {
/// operator if it happens to be suspended for memory arbitration processing.
/// This only applies to a reclaimable operator.
tsan_atomic<bool> nonReclaimableSection_{false};

public:
/// Holds the last data from addInput until it is processed. Reset after the
/// input is processed.
RowVectorPtr input_;
Expand Down
Loading

0 comments on commit 89e28a9

Please sign in to comment.