diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 20b204c64b15e..51bda9132146e 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -335,7 +335,7 @@ class QueryConfig { /// Returns 'is distinct aggregation spilling enabled' flag. bool distinctAggregationSpillEnabled() const { - return get(kDistinctAggregationSpillEnabled, false); + return get(kDistinctAggregationSpillEnabled, true); } /// Returns 'is join spilling enabled' flag. Must also check the diff --git a/velox/exec/HashAggregation.cpp b/velox/exec/HashAggregation.cpp index 318ea58243ffe..12ca5db24f0fd 100644 --- a/velox/exec/HashAggregation.cpp +++ b/velox/exec/HashAggregation.cpp @@ -201,12 +201,12 @@ void HashAggregation::addInput(RowVectorPtr input) { const bool abandonPartialEarly = isPartialOutput_ && !isGlobal_ && abandonPartialAggregationEarly(groupingSet_->numDistinct()); - if (isDistinct_) { + if (isDistinct_ && !distinctAggregationSpillEnabled_) { newDistincts_ = !groupingSet_->hashLookup().newGroups.empty(); if (newDistincts_) { // Save input to use for output in getOutput(). - // input_ = input; + input_ = input; } else if (abandonPartialEarly) { // If no new distinct groups (meaning we don't have anything to output) // and we are abandoning the partial aggregation, then we need to ensure @@ -352,14 +352,13 @@ RowVectorPtr HashAggregation::getOutput() { // - partial aggregation reached memory limit; // - distinct aggregation has new keys; // - running in partial streaming mode and have some output ready. - if (!noMoreInput_ && !partialFull_ && + if (!noMoreInput_ && !partialFull_ && !newDistincts_ && !groupingSet_->hasOutput()) { input_ = nullptr; return nullptr; } -#if 0 - if (isDistinct_) { + if (isDistinct_ && !distinctAggregationSpillEnabled_) { if (!newDistincts_) { if (noMoreInput_) { finished_ = true; @@ -367,25 +366,22 @@ RowVectorPtr HashAggregation::getOutput() { return nullptr; } - if (!distinctAggregationSpillEnabled_) { - auto lookup = groupingSet_->hashLookup(); - auto size = lookup.newGroups.size(); - BufferPtr indices = allocateIndices(size, operatorCtx_->pool()); - auto indicesPtr = indices->asMutable(); - std::copy(lookup.newGroups.begin(), lookup.newGroups.end(), indicesPtr); - newDistincts_ = false; - auto output = fillOutput(size, indices); - numOutputRows_ += size; - - // Drop reference to input_ to make it singly-referenced at the producer - // and allow for memory reuse. - input_ = nullptr; - - resetPartialOutputIfNeed(); - return output; - } + auto lookup = groupingSet_->hashLookup(); + auto size = lookup.newGroups.size(); + BufferPtr indices = allocateIndices(size, operatorCtx_->pool()); + auto indicesPtr = indices->asMutable(); + std::copy(lookup.newGroups.begin(), lookup.newGroups.end(), indicesPtr); + newDistincts_ = false; + auto output = fillOutput(size, indices); + numOutputRows_ += size; + + // Drop reference to input_ to make it singly-referenced at the producer + // and allow for memory reuse. + input_ = nullptr; + + resetPartialOutputIfNeed(); + return output; } -#endif const auto batchSize = isGlobal_ ? 1 : outputBatchRows(groupingSet_->estimateRowSize());