Skip to content

Commit

Permalink
Remove threshold based spilling from HashBuild join
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Feb 28, 2024
1 parent e6a986c commit 6910619
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 309 deletions.
1 change: 1 addition & 0 deletions velox/common/memory/tests/MemoryArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ class FakeTestArbitrator : public MemoryArbitrator {
}

uint64_t shrinkCapacity(
memory::MemoryPool* /*unused*/,
const std::vector<std::shared_ptr<MemoryPool>>& /*unused*/,
uint64_t /*unused*/) override {
VELOX_NYI();
Expand Down
1 change: 1 addition & 0 deletions velox/common/memory/tests/MemoryManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ class FakeTestArbitrator : public MemoryArbitrator {
}

uint64_t shrinkCapacity(
memory::MemoryPool* /*unused*/,
const std::vector<std::shared_ptr<MemoryPool>>& /*unused*/,
uint64_t /*unused*/) override {
VELOX_NYI();
Expand Down
1 change: 1 addition & 0 deletions velox/common/memory/tests/MockSharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ class MockMemoryOperator {
void* allocate(uint64_t bytes) {
VELOX_CHECK_EQ(bytes % pool_->alignment(), 0);
void* buffer = pool_->allocate(bytes);
allocations_.size();
std::lock_guard<std::mutex> l(mu_);
totalBytes_ += bytes;
allocations_.emplace(buffer, bytes);
Expand Down
138 changes: 19 additions & 119 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,10 @@ HashBuild::HashBuild(
joinBridge_(operatorCtx_->task()->getHashJoinBridgeLocked(
operatorCtx_->driverCtx()->splitGroupId,
planNodeId())),
spillMemoryThreshold_(
operatorCtx_->driverCtx()->queryConfig().joinSpillMemoryThreshold()),
keyChannelMap_(joinNode_->rightKeys().size()) {
VELOX_CHECK(pool()->trackUsage());
VELOX_CHECK_NOT_NULL(joinBridge_);

spillGroup_ = spillEnabled()
? operatorCtx_->task()->getSpillOperatorGroupLocked(
operatorCtx_->driverCtx()->splitGroupId, planNodeId())
: nullptr;

joinBridge_->addBuilder();

auto inputType = joinNode_->sources()[1]->outputType();
Expand Down Expand Up @@ -208,12 +201,8 @@ void HashBuild::setupSpiller(SpillPartition* spillPartition) {
HashBitRange hashBits(
spillConfig.startPartitionBit,
spillConfig.startPartitionBit + spillConfig.joinPartitionBits);
if (spillPartition == nullptr) {
spillGroup_->addOperator(
*this,
operatorCtx_->driver()->shared_from_this(),
[&](const std::vector<Operator*>& operators) { runSpill(operators); });
} else {

if (spillPartition != nullptr) {
LOG(INFO) << "Setup reader to read spilled input from "
<< spillPartition->toString()
<< ", memory pool: " << pool()->name();
Expand All @@ -236,6 +225,8 @@ void HashBuild::setupSpiller(SpillPartition* spillPartition) {
hashBits = HashBitRange(startBit, startBit + spillConfig.joinPartitionBits);
}

LOG(INFO) << "Setup spiller at spill level "
<< spillConfig.joinSpillLevel(hashBits.begin());
spiller_ = std::make_unique<Spiller>(
Spiller::Type::kHashJoinBuild,
table_->rows(),
Expand Down Expand Up @@ -437,25 +428,12 @@ bool HashBuild::ensureInputFits(RowVectorPtr& input) {
// spilling directly. It is okay as we will accumulate the extra reservation
// in the operator's memory pool, and won't make any new reservation if there
// is already sufficient reservations.
if (!reserveMemory(input)) {
if (!requestSpill(input)) {
return false;
}
} else {
// Check if any other peer operator has requested group spill.
if (waitSpill(input)) {
return false;
}
}
return true;
return reserveMemory(input);
}

bool HashBuild::reserveMemory(const RowVectorPtr& input) {
VELOX_CHECK(spillEnabled());

numSpillRows_ = 0;
numSpillBytes_ = 0;

auto* rows = table_->rows();
const auto numRows = rows->numRows();

Expand All @@ -469,21 +447,12 @@ bool HashBuild::reserveMemory(const RowVectorPtr& input) {
if (numRows != 0) {
// Test-only spill path.
if (testingTriggerSpill()) {
numSpillRows_ = std::max<int64_t>(1, numRows / 10);
numSpillBytes_ = numSpillRows_ * outOfLineBytesPerRow;
return false;
}

// We check usage from the parent pool to take peers' allocations into
// account.
const auto nodeUsage = pool()->parent()->currentBytes();
if (spillMemoryThreshold_ != 0 && nodeUsage > spillMemoryThreshold_) {
const int64_t bytesToSpill =
nodeUsage * spillConfig()->spillableReservationGrowthPct / 100;
numSpillRows_ = std::max<int64_t>(
1, bytesToSpill / (rows->fixedRowSize() + outOfLineBytesPerRow));
numSpillBytes_ = numSpillRows_ * outOfLineBytesPerRow;
return false;
Operator::ReclaimableSectionGuard guard(this);
SuspendedSection suspendedSection(operatorCtx_->driver());
auto shrinkedBytes = memory::memoryManager()->shrinkPools(
std::numeric_limits<int64_t>::max());
LOG(INFO) << "shrinkedBytes " << succinctBytes(shrinkedBytes);
return true;
}
}

Expand Down Expand Up @@ -540,6 +509,8 @@ bool HashBuild::reserveMemory(const RowVectorPtr& input) {
void HashBuild::spillInput(const RowVectorPtr& input) {
VELOX_CHECK_EQ(input->size(), activeRows_.size());

// Q(jtan6): Why do we have !spiller_->isAnySpilled() condition here? If
// nothing has spilled we don't spill?
if (!spillEnabled() || spiller_ == nullptr || !spiller_->isAnySpilled() ||
!activeRows_.hasSelections()) {
return;
Expand All @@ -550,17 +521,15 @@ void HashBuild::spillInput(const RowVectorPtr& input) {
computeSpillPartitions(input);

vector_size_t numSpillInputs = 0;
for (auto row = 0; row < numInput; ++row) {
const auto partition = spillPartitions_[row];
if (FOLLY_UNLIKELY(!activeRows_.isValid(row))) {
continue;
}
if (!spiller_->isSpilled(partition)) {
for (auto rowIdx = 0; rowIdx < numInput; ++rowIdx) {
const auto partition = spillPartitions_[rowIdx];
if (FOLLY_UNLIKELY(!activeRows_.isValid(rowIdx))) {
continue;
}
activeRows_.setValid(row, false);
activeRows_.setValid(rowIdx, false);
++numSpillInputs;
rawSpillInputIndicesBuffers_[partition][numSpillInputs_[partition]++] = row;
rawSpillInputIndicesBuffers_[partition][numSpillInputs_[partition]++] =
rowIdx;
}
if (numSpillInputs == 0) {
return;
Expand Down Expand Up @@ -646,70 +615,6 @@ void HashBuild::spillPartition(
}
}

bool HashBuild::requestSpill(RowVectorPtr& input) {
VELOX_CHECK_GT(numSpillRows_, 0);
VELOX_CHECK_GT(numSpillBytes_, 0);

// If all the partitions have been spilled, then nothing to spill.
if (spiller_->isAllSpilled()) {
return true;
}

input_ = std::move(input);
if (spillGroup_->requestSpill(*this, future_)) {
VELOX_CHECK(future_.valid());
setState(State::kWaitForSpill);
return false;
}
input = std::move(input_);
return true;
}

bool HashBuild::waitSpill(RowVectorPtr& input) {
if (!spillGroup_->needSpill()) {
return false;
}

if (spillGroup_->waitSpill(*this, future_)) {
VELOX_CHECK(future_.valid());
input_ = std::move(input);
setState(State::kWaitForSpill);
return true;
}
return false;
}

void HashBuild::runSpill(const std::vector<Operator*>& spillOperators) {
VELOX_CHECK(spillEnabled());
VELOX_CHECK(!spiller_->state().isAllPartitionSpilled());

uint64_t targetRows = 0;
uint64_t targetBytes = 0;
for (auto& spillOp : spillOperators) {
HashBuild* build = dynamic_cast<HashBuild*>(spillOp);
VELOX_CHECK_NOT_NULL(build);
build->addAndClearSpillTarget(targetRows, targetBytes);
}
VELOX_CHECK_GT(targetRows, 0);
VELOX_CHECK_GT(targetBytes, 0);

// TODO: consider to offload the partition spill processing to an executor to
// run in parallel.
for (auto& spillOp : spillOperators) {
HashBuild* build = dynamic_cast<HashBuild*>(spillOp);
build->spiller_->spill();
build->table_->clear();
build->pool()->release();
}
}

void HashBuild::addAndClearSpillTarget(uint64_t& numRows, uint64_t& numBytes) {
numRows += numSpillRows_;
numSpillRows_ = 0;
numBytes += numSpillBytes_;
numSpillBytes_ = 0;
}

void HashBuild::noMoreInput() {
checkRunning();

Expand All @@ -722,10 +627,6 @@ void HashBuild::noMoreInput() {
}

void HashBuild::noMoreInputInternal() {
if (spillEnabled()) {
spillGroup_->operatorStopped(*this);
}

if (!finishHashBuild()) {
return;
}
Expand Down Expand Up @@ -849,7 +750,6 @@ bool HashBuild::finishHashBuild() {
if (joinBridge_->setHashTable(
std::move(table_), std::move(spillPartitions), joinHasNullKeys_)) {
intermediateStateCleared_ = true;
spillGroup_->restart();
}

// Release the unused memory reservation since we have finished the merged
Expand Down
39 changes: 1 addition & 38 deletions velox/exec/HashBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,7 @@ class HashBuild final : public Operator {
// Invoked when operator has finished processing the build input and wait for
// all the other drivers to finish the processing. The last driver that
// reaches to the hash build barrier, is responsible to build the hash table
// merged from all the other drivers. If the disk spilling is enabled, the
// last driver will also restart 'spillGroup_' and add a new hash build
// barrier for the next round of hash table build operation if it needs.
// merged from all the other drivers.
bool finishHashBuild();

// Invoked after the hash table has been built. It waits for any spill data to
Expand Down Expand Up @@ -194,28 +192,6 @@ class HashBuild final : public Operator {
vector_size_t numInput,
const SpillPartitionNumSet& spillPartitions);

// Invoked to send group spill request to 'spillGroup_'. The function returns
// true if group spill has been inline executed, otherwise returns false. In
// the latter case, the operator will transition to 'kWaitForSpill' state and
// 'input' will be saved in 'input_' to be processed after the group spill has
// been executed.
bool requestSpill(RowVectorPtr& input);

// Invoked to check if it needs to wait for any pending group spill to run.
// The function returns true if it needs to wait, otherwise false. The latter
// case is either because there is no pending group spill or this operator is
// the last one to reach to the group spill barrier and execute the group
// spill inline.
bool waitSpill(RowVectorPtr& input);

// The callback registered to 'spillGroup_' to run group spill on
// 'spillOperators'.
void runSpill(const std::vector<Operator*>& spillOperators);

// Invoked by 'runSpill' to sum up the spill targets from all the operators in
// 'numRows' and 'numBytes'.
void addAndClearSpillTarget(uint64_t& numRows, uint64_t& numBytes);

// Invoked to reset the operator state to restore previously spilled data. It
// setup (recursive) spiller and spill input reader from 'spillInput' received
// from 'joinBride_'. 'spillInput' contains a shard of previously spilled
Expand Down Expand Up @@ -255,14 +231,8 @@ class HashBuild final : public Operator {

std::shared_ptr<HashJoinBridge> joinBridge_;

// The maximum memory usage that a hash build can hold before spilling.
// If it is zero, then there is no such limit.
const uint64_t spillMemoryThreshold_;

bool exceededMaxSpillLevelLimit_{false};

std::shared_ptr<SpillOperatorGroup> spillGroup_;

State state_{State::kRunning};

// The row type used for hash table build and disk spilling.
Expand Down Expand Up @@ -315,13 +285,6 @@ class HashBuild final : public Operator {
// 'testSpillPct_';.
uint64_t spillTestCounter_{0};

// The spill targets set by 'requestSpill()' to request group spill.
uint64_t numSpillRows_{0};
uint64_t numSpillBytes_{0};

// This can be nullptr if either spilling is not allowed or it has been
// trsnaferred to the last hash build operator while in kWaitForBuild state or
// it has been cleared to setup a new one for recursive spilling.
std::unique_ptr<Spiller> spiller_;

// Used to read input from previously spilled data for restoring.
Expand Down
Loading

0 comments on commit 6910619

Please sign in to comment.