Skip to content

Commit

Permalink
xx
Browse files Browse the repository at this point in the history
  • Loading branch information
zuochunwei committed Aug 4, 2023
1 parent 2fa4ca6 commit 429dff6
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 24 deletions.
2 changes: 1 addition & 1 deletion velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ class QueryConfig {

/// Returns 'is distinct aggregation spilling enabled' flag.
bool distinctAggregationSpillEnabled() const {
return get<bool>(kDistinctAggregationSpillEnabled, false);
return get<bool>(kDistinctAggregationSpillEnabled, true);
}

/// Returns 'is join spilling enabled' flag. Must also check the
Expand Down
42 changes: 19 additions & 23 deletions velox/exec/HashAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,12 @@ void HashAggregation::addInput(RowVectorPtr input) {

const bool abandonPartialEarly = isPartialOutput_ && !isGlobal_ &&
abandonPartialAggregationEarly(groupingSet_->numDistinct());
if (isDistinct_) {
if (isDistinct_ && !distinctAggregationSpillEnabled_) {
newDistincts_ = !groupingSet_->hashLookup().newGroups.empty();

if (newDistincts_) {
// Save input to use for output in getOutput().
// input_ = input;
input_ = input;
} else if (abandonPartialEarly) {
// If no new distinct groups (meaning we don't have anything to output)
// and we are abandoning the partial aggregation, then we need to ensure
Expand Down Expand Up @@ -352,40 +352,36 @@ RowVectorPtr HashAggregation::getOutput() {
// - partial aggregation reached memory limit;
// - distinct aggregation has new keys;
// - running in partial streaming mode and have some output ready.
if (!noMoreInput_ && !partialFull_ &&
if (!noMoreInput_ && !partialFull_ && !newDistincts_ &&
!groupingSet_->hasOutput()) {
input_ = nullptr;
return nullptr;
}

#if 0
if (isDistinct_) {
if (isDistinct_ && !distinctAggregationSpillEnabled_) {
if (!newDistincts_) {
if (noMoreInput_) {
finished_ = true;
}
return nullptr;
}

if (!distinctAggregationSpillEnabled_) {
auto lookup = groupingSet_->hashLookup();
auto size = lookup.newGroups.size();
BufferPtr indices = allocateIndices(size, operatorCtx_->pool());
auto indicesPtr = indices->asMutable<vector_size_t>();
std::copy(lookup.newGroups.begin(), lookup.newGroups.end(), indicesPtr);
newDistincts_ = false;
auto output = fillOutput(size, indices);
numOutputRows_ += size;

// Drop reference to input_ to make it singly-referenced at the producer
// and allow for memory reuse.
input_ = nullptr;

resetPartialOutputIfNeed();
return output;
}
auto lookup = groupingSet_->hashLookup();
auto size = lookup.newGroups.size();
BufferPtr indices = allocateIndices(size, operatorCtx_->pool());
auto indicesPtr = indices->asMutable<vector_size_t>();
std::copy(lookup.newGroups.begin(), lookup.newGroups.end(), indicesPtr);
newDistincts_ = false;
auto output = fillOutput(size, indices);
numOutputRows_ += size;

// Drop reference to input_ to make it singly-referenced at the producer
// and allow for memory reuse.
input_ = nullptr;

resetPartialOutputIfNeed();
return output;
}
#endif

const auto batchSize =
isGlobal_ ? 1 : outputBatchRows(groupingSet_->estimateRowSize());
Expand Down

0 comments on commit 429dff6

Please sign in to comment.