Skip to content

Commit

Permalink
Refactor spilling for RowNumber
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Sep 24, 2024
1 parent dcaae29 commit be19f61
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 49 deletions.
67 changes: 20 additions & 47 deletions velox/exec/RowNumber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,42 +96,21 @@ void RowNumber::addInput(RowVectorPtr input) {
input_ = std::move(input);
}

void RowNumber::addSpillInput() {
VELOX_CHECK_NOT_NULL(input_);
VELOX_CHECK_NULL(inputSpiller_);
ensureInputFits(input_);
if (input_ == nullptr) {
VELOX_CHECK_NOT_NULL(inputSpiller_);
// Memory arbitration might be triggered by ensureInputFits() which will
// spill 'input_'.
return;
}

const auto numInput = input_->size();
SelectivityVector rows(numInput);

VELOX_CHECK(spillConfig_.has_value());
table_->prepareForGroupProbe(
*lookup_, input_, rows, spillConfig_->startPartitionBit);
table_->groupProbe(*lookup_, spillConfig_->startPartitionBit);

// Initialize new partitions with zeros.
for (auto i : lookup_->newGroups) {
setNumRows(lookup_->hits[i], 0);
}
}

void RowNumber::noMoreInput() {
Operator::noMoreInput();

if (inputSpiller_ != nullptr) {
inputSpiller_->finishSpill(spillInputPartitionSet_);
inputSpiller_.reset();
removeEmptyPartitions(spillInputPartitionSet_);
restoreNextSpillPartition();
finishSpillInputAndRestoreNext();
}
}

void RowNumber::finishSpillInputAndRestoreNext() {
inputSpiller_->finishSpill(spillInputPartitionSet_);
inputSpiller_.reset();
removeEmptyPartitions(spillInputPartitionSet_);
restoreNextSpillPartition();
}

void RowNumber::restoreNextSpillPartition() {
if (spillInputPartitionSet_.empty()) {
return;
Expand Down Expand Up @@ -181,10 +160,11 @@ void RowNumber::restoreNextSpillPartition() {

spillInputPartitionSet_.erase(it);

spillInputReader_->nextBatch(input_);
VELOX_CHECK_NOT_NULL(input_);
RowVectorPtr unspilledInput;
spillInputReader_->nextBatch(unspilledInput);
VELOX_CHECK_NOT_NULL(unspilledInput);
// NOTE: spillInputReader_ will at least produce one batch output.
addSpillInput();
addInput(std::move(unspilledInput));
}

void RowNumber::ensureInputFits(const RowVectorPtr& input) {
Expand Down Expand Up @@ -339,19 +319,17 @@ RowVectorPtr RowNumber::getOutput() {
output = fillOutput(numInput, nullptr);
}

input_ = nullptr;
if (spillInputReader_ != nullptr) {
if (spillInputReader_->nextBatch(input_)) {
addSpillInput();
RowVectorPtr unspilledInput;
if (spillInputReader_->nextBatch(unspilledInput)) {
addInput(std::move(unspilledInput));
} else {
input_ = nullptr;
spillInputReader_ = nullptr;
table_->clear();
restoreNextSpillPartition();
}
} else {
input_ = nullptr;
}

return output;
}

Expand Down Expand Up @@ -522,22 +500,17 @@ void RowNumber::spillInput(
}

void RowNumber::recursiveSpillInput() {
RowVectorPtr input;
while (spillInputReader_->nextBatch(input)) {
spillInput(input, pool());
RowVectorPtr unspilledInput;
while (spillInputReader_->nextBatch(unspilledInput)) {
spillInput(unspilledInput, pool());

if (operatorCtx_->driver()->shouldYield()) {
yield_ = true;
return;
}
}

inputSpiller_->finishSpill(spillInputPartitionSet_);
inputSpiller_.reset();
spillInputReader_ = nullptr;

removeEmptyPartitions(spillInputPartitionSet_);
restoreNextSpillPartition();
finishSpillInputAndRestoreNext();
}

void RowNumber::setSpillPartitionBits(
Expand Down
6 changes: 4 additions & 2 deletions velox/exec/RowNumber.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ class RowNumber : public Operator {

void spill();

void addSpillInput();

void restoreNextSpillPartition();

SpillPartitionNumSet spillHashTable();
Expand All @@ -78,6 +76,10 @@ class RowNumber : public Operator {

FlatVector<int64_t>& getOrCreateRowNumberVector(vector_size_t size);

// Finishes the current input spilling and restore the next processing
// partition.
void finishSpillInputAndRestoreNext();

// Used by recursive spill processing to read the spilled input data from the
// previous spill run through 'spillInputReader_' and then spill them back
// into a number of sub-partitions. After that, the function restores one of
Expand Down

0 comments on commit be19f61

Please sign in to comment.