Skip to content

Commit

Permalink
Centralize probe table spill to join bridge
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Oct 22, 2024
1 parent 1051173 commit 6faf6d7
Show file tree
Hide file tree
Showing 9 changed files with 245 additions and 215 deletions.
1 change: 0 additions & 1 deletion velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -1697,7 +1697,6 @@ class HashJoinNode : public AbstractJoinNode {

private:
void addDetails(std::stringstream& stream) const override;

const bool nullAware_;
};

Expand Down
64 changes: 11 additions & 53 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,12 @@ HashBuild::HashBuild(

const auto numKeys = joinNode_->rightKeys().size();
keyChannels_.reserve(numKeys);
std::vector<std::string> names;
names.reserve(inputType->size());
std::vector<TypePtr> types;
types.reserve(inputType->size());

for (int i = 0; i < numKeys; ++i) {
auto& key = joinNode_->rightKeys()[i];
auto channel = exprToChannel(key.get(), inputType);
keyChannelMap_[channel] = i;
keyChannels_.emplace_back(channel);
names.emplace_back(inputType->nameOf(channel));
types.emplace_back(inputType->childAt(channel));
}

// Identify the non-key build side columns and make a decoder for each.
Expand All @@ -106,12 +100,10 @@ HashBuild::HashBuild(
if (keyChannelMap_.find(i) == keyChannelMap_.end()) {
dependentChannels_.emplace_back(i);
decoders_.emplace_back(std::make_unique<DecodedVector>());
names.emplace_back(inputType->nameOf(i));
types.emplace_back(inputType->childAt(i));
}
}

tableType_ = ROW(std::move(names), std::move(types));
tableType_ = joinTableType(joinNode_);
setupTable();
setupSpiller();
stateCleared_ = false;
Expand Down Expand Up @@ -1053,13 +1045,13 @@ void HashBuild::reclaim(
VELOX_CHECK_NOT_NULL(driver);
VELOX_CHECK(!nonReclaimableSection_);

const auto* config = spillConfig();
VELOX_CHECK_NOT_NULL(config);
if (UNLIKELY(exceededMaxSpillLevelLimit_)) {
// 'canReclaim()' already checks the spill limit is not exceeding max, there
// is only a small chance from the time 'canReclaim()' is checked to the
// actual reclaim happens that the operator has spilled such that the spill
// level exceeds max.
const auto* config = spillConfig();
VELOX_CHECK_NOT_NULL(config);
LOG(WARNING)
<< "Can't reclaim from hash build operator, exceeded maximum spill "
"level of "
Expand Down Expand Up @@ -1089,6 +1081,7 @@ void HashBuild::reclaim(
VELOX_CHECK(task->pauseRequested());
const std::vector<Operator*> operators =
task->findPeerOperators(operatorCtx_->driverCtx()->pipelineId, this);

for (auto* op : operators) {
HashBuild* buildOp = dynamic_cast<HashBuild*>(op);
VELOX_CHECK_NOT_NULL(buildOp);
Expand All @@ -1106,53 +1099,18 @@ void HashBuild::reclaim(
}
}

struct SpillResult {
const std::exception_ptr error{nullptr};

explicit SpillResult(std::exception_ptr _error) : error(_error) {}
};

std::vector<std::shared_ptr<AsyncSource<SpillResult>>> spillTasks;
auto* spillExecutor = spillConfig()->executor;
std::vector<Spiller*> spillers;
for (auto* op : operators) {
HashBuild* buildOp = static_cast<HashBuild*>(op);
spillTasks.push_back(
memory::createAsyncMemoryReclaimTask<SpillResult>([buildOp]() {
try {
buildOp->spiller_->spill();
buildOp->table_->clear();
// Release the minimum reserved memory.
buildOp->pool()->release();
return std::make_unique<SpillResult>(nullptr);
} catch (const std::exception& e) {
LOG(ERROR) << "Spill from hash build pool "
<< buildOp->pool()->name() << " failed: " << e.what();
// The exception is captured and thrown by the caller.
return std::make_unique<SpillResult>(std::current_exception());
}
}));
if ((operators.size() > 1) && (spillExecutor != nullptr)) {
spillExecutor->add([source = spillTasks.back()]() { source->prepare(); });
}
spillers.push_back(buildOp->spiller_.get());
}

auto syncGuard = folly::makeGuard([&]() {
for (auto& spillTask : spillTasks) {
// We consume the result for the pending tasks. This is a cleanup in the
// guard and must not throw. The first error is already captured before
// this runs.
try {
spillTask->move();
} catch (const std::exception&) {
}
}
});
spillHashJoinTableFromSpillers(spillers, config->executor);

for (auto& spillTask : spillTasks) {
const auto result = spillTask->move();
if (result->error) {
std::rethrow_exception(result->error);
}
for (auto* op : operators) {
HashBuild* buildOp = static_cast<HashBuild*>(op);
buildOp->table_->clear();
buildOp->pool()->release();
}
}

Expand Down
18 changes: 9 additions & 9 deletions velox/exec/HashBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@

namespace facebook::velox::exec {

// Builds a hash table for use in HashProbe. This is the final
// Operator in a build side Driver. The build side pipeline has
// multiple Drivers, each with its own HashBuild. The build finishes
// when the last Driver of the build pipeline finishes. Hence finishHashBuild()
// has a barrier where the last one to enter gathers the data
// accumulated by the other Drivers and makes the join hash
// table. This table is then passed to the probe side pipeline via
// JoinBridge. After this, all build side Drivers finish and free
// their state.
/// Builds a hash table for use in HashProbe. This is the final
/// Operator in a build side Driver. The build side pipeline has
/// multiple Drivers, each with its own HashBuild. The build finishes
/// when the last Driver of the build pipeline finishes. Hence finish()
/// has a barrier where the last one to enter gathers the data
/// accumulated by the other Drivers and makes the join hash
/// table. This table is then passed to the probe side pipeline via
/// JoinBridge. After this, all build side Drivers finish and free
/// their state.
class HashBuild final : public Operator {
public:
/// Define the internal execution state for hash build.
Expand Down
150 changes: 150 additions & 0 deletions velox/exec/HashJoinBridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,45 @@
*/

#include "velox/exec/HashJoinBridge.h"
#include "velox/common/memory/MemoryArbitrator.h"

namespace facebook::velox::exec {
namespace {
static const char* kSpillProbedFlagColumnName = "__probedFlag";
}

RowTypePtr joinTableType(
const std::shared_ptr<const core::HashJoinNode>& joinNode) {
const auto inputType = joinNode->sources()[1]->outputType();
const auto numKeys = joinNode->rightKeys().size();

std::vector<std::string> names;
names.reserve(inputType->size());
std::vector<TypePtr> types;
types.reserve(inputType->size());
std::unordered_set<uint32_t> keyChannelSet;
keyChannelSet.reserve(inputType->size());

for (int i = 0; i < numKeys; ++i) {
auto& key = joinNode->rightKeys()[i];
auto channel = exprToChannel(key.get(), inputType);
keyChannelSet.insert(channel);
names.emplace_back(inputType->nameOf(channel));
types.emplace_back(inputType->childAt(channel));
}

// Identify the non-key build side columns and make a decoder for each.
const int32_t numDependents = inputType->size() - numKeys;
for (auto i = 0; i < inputType->size(); ++i) {
if (keyChannelSet.find(i) == keyChannelSet.end()) {
names.emplace_back(inputType->nameOf(i));
types.emplace_back(inputType->childAt(i));
}
}

return ROW(std::move(names), std::move(types));
}

void HashJoinBridge::start() {
std::lock_guard<std::mutex> l(mutex_);
started_ = true;
Expand All @@ -33,6 +66,123 @@ void HashJoinBridge::addBuilder() {
++numBuilders_;
}

namespace {
// Spills the row container from one of the sub-table from
// 'table' to parallelize the table spilling. The function
// spills all the rows from the row container and returns the spiller for the
// caller to collect the spilled partitions and stats.
std::unique_ptr<Spiller> createSpiller(
RowContainer* subTableRows,
HashBitRange hashBitRange,
const std::shared_ptr<const core::HashJoinNode> joinNode,
const common::SpillConfig* spillConfig,
folly::Synchronized<common::SpillStats>* stats) {
VELOX_CHECK_NOT_NULL(joinNode);
const auto joinType = joinNode->joinType();
const auto tableType = joinTableType(joinNode);
return std::make_unique<Spiller>(
Spiller::Type::kHashJoinBuild,
joinType,
subTableRows,
hashJoinTableSpillType(tableType, joinType),
hashBitRange,
spillConfig,
stats);
}
} // namespace

std::vector<std::unique_ptr<HashJoinTableSpillResult>>
spillHashJoinTableFromSpillers(
const std::vector<Spiller*>& spillers,
folly::Executor* spillExecutor) {
std::vector<std::shared_ptr<AsyncSource<HashJoinTableSpillResult>>>
spillTasks;
for (auto* spiller : spillers) {
spillTasks.push_back(
memory::createAsyncMemoryReclaimTask<HashJoinTableSpillResult>(
[spiller]() {
try {
spiller->spill();
return std::make_unique<HashJoinTableSpillResult>(spiller);
} catch (const std::exception& e) {
LOG(ERROR) << "Spill from hash join bridge failed: "
<< e.what();
// The exception is captured and thrown by the caller.
return std::make_unique<HashJoinTableSpillResult>(
std::current_exception());
}
}));
if ((spillTasks.size() > 1) && (spillExecutor != nullptr)) {
spillExecutor->add([source = spillTasks.back()]() { source->prepare(); });
}
}

auto syncGuard = folly::makeGuard([&]() {
for (auto& spillTask : spillTasks) {
// We consume the result for the pending tasks. This is a cleanup in the
// guard and must not throw. The first error is already captured before
// this runs.
try {
spillTask->move();
} catch (const std::exception&) {
}
}
});

std::vector<std::unique_ptr<HashJoinTableSpillResult>> spillResults;
for (auto& spillTask : spillTasks) {
auto result = spillTask->move();
if (result->error) {
std::rethrow_exception(result->error);
}
spillResults.push_back(std::move(result));
}
return spillResults;
}

SpillPartitionSet spillHashJoinTable(
std::shared_ptr<BaseHashTable> table,
HashBitRange hashBitRange,
const std::shared_ptr<const core::HashJoinNode>& joinNode,
const common::SpillConfig* spillConfig,
folly::Synchronized<common::SpillStats>* stats) {
VELOX_CHECK_NOT_NULL(table);
VELOX_CHECK_NOT_NULL(spillConfig);
if (table->numDistinct() == 0) {
// Empty build side.
return {};
}

std::vector<std::unique_ptr<Spiller>> spillersHolder;
std::vector<Spiller*> spillers;
const std::vector<RowContainer*> rowContainers = table->allRows();
for (auto* rowContainer : rowContainers) {
if (rowContainer->numRows() == 0) {
continue;
}
spillersHolder.push_back(createSpiller(
rowContainer, hashBitRange, joinNode, spillConfig, stats));
spillers.push_back(spillersHolder.back().get());
}
if (spillersHolder.empty()) {
return {};
}

auto spillResults =
spillHashJoinTableFromSpillers(spillers, spillConfig->executor);

SpillPartitionSet spillPartitions;
for (auto& spillResult : spillResults) {
VELOX_CHECK_NULL(spillResult->error);
spillResult->spiller->finishSpill(spillPartitions);
}

// Remove the spilled partitions which are empty so as we don't need to
// trigger unnecessary spilling at hash probe side.
removeEmptyPartitions(spillPartitions);
return spillPartitions;
}

void HashJoinBridge::setHashTable(
std::unique_ptr<BaseHashTable> table,
SpillPartitionSet spillPartitionSet,
Expand Down
Loading

0 comments on commit 6faf6d7

Please sign in to comment.