Skip to content

Commit

Permalink
xx
Browse files Browse the repository at this point in the history
  • Loading branch information
zuochunwei committed Aug 4, 2023
1 parent 82433ff commit 5b87685
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 2,166 deletions.
21 changes: 19 additions & 2 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -681,8 +681,25 @@ class AggregationNode : public PlanNode {
// (https://github.com/facebookincubator/velox/issues/3263) and pre-grouped
// aggregation (https://github.com/facebookincubator/velox/issues/3264). We
// will add support later to re-enable.
return (isFinal() || isSingle()) && !(aggregates().empty()) &&
preGroupedKeys().empty() && queryConfig.aggregationSpillEnabled();
if (!queryConfig.aggregationSpillEnabled()) {
return false;
}

if (!isFinal() && !isSingle()) {
return false;
}

if (!preGroupedKeys().empty()) {
return false;
}

// aggregates().empty() means distinct aggregate
if (aggregates().empty() &&
!queryConfig.distinctAggregationSpillEnabled()) {
return false;
}

return true;
}

bool isFinal() const {
Expand Down
9 changes: 9 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ class QueryConfig {
static constexpr const char* kAggregationSpillEnabled =
"aggregation_spill_enabled";

/// Distinct aggregation spilling flag
static constexpr const char* kDistinctAggregationSpillEnabled =
"distinct_aggregation_spill_enabled";

/// Join spilling flag, only applies if "spill_enabled" flag is set.
static constexpr const char* kJoinSpillEnabled = "join_spill_enabled";

Expand Down Expand Up @@ -329,6 +333,11 @@ class QueryConfig {
return get<bool>(kAggregationSpillEnabled, true);
}

/// Returns 'is distinct aggregation spilling enabled' flag.
bool distinctAggregationSpillEnabled() const {
return get<bool>(kDistinctAggregationSpillEnabled, true);
}

/// Returns 'is join spilling enabled' flag. Must also check the
/// spillEnabled()!
bool joinSpillEnabled() const {
Expand Down
13 changes: 8 additions & 5 deletions velox/exec/HashAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ HashAggregation::HashAggregation(
spillConfig_.has_value() ? &spillConfig_.value() : nullptr,
&nonReclaimableSection_,
operatorCtx_.get());

distinctAggregationSpillEnabled_ =
driverCtx->queryConfig().distinctAggregationSpillEnabled();
}

bool HashAggregation::abandonPartialAggregationEarly(int64_t numOutput) const {
Expand Down Expand Up @@ -198,7 +201,7 @@ void HashAggregation::addInput(RowVectorPtr input) {

const bool abandonPartialEarly = isPartialOutput_ && !isGlobal_ &&
abandonPartialAggregationEarly(groupingSet_->numDistinct());
if (isDistinct_) {
if (isDistinct_ && !distinctAggregationSpillEnabled_) {
newDistincts_ = !groupingSet_->hashLookup().newGroups.empty();

if (newDistincts_) {
Expand Down Expand Up @@ -349,13 +352,13 @@ RowVectorPtr HashAggregation::getOutput() {
// - partial aggregation reached memory limit;
// - distinct aggregation has new keys;
// - running in partial streaming mode and have some output ready.
if (!noMoreInput_ && !partialFull_ && !newDistincts_ &&
if (!noMoreInput_ && !partialFull_ && !newDistincts_ &&
!groupingSet_->hasOutput()) {
input_ = nullptr;
return nullptr;
}

if (isDistinct_) {
if (isDistinct_ && !distinctAggregationSpillEnabled_) {
if (!newDistincts_) {
if (noMoreInput_) {
finished_ = true;
Expand All @@ -372,8 +375,8 @@ RowVectorPtr HashAggregation::getOutput() {
auto output = fillOutput(size, indices);
numOutputRows_ += size;

// Drop reference to input_ to make it singly-referenced at the producer and
// allow for memory reuse.
// Drop reference to input_ to make it singly-referenced at the producer
// and allow for memory reuse.
input_ = nullptr;

resetPartialOutputIfNeed();
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/HashAggregation.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ class HashAggregation : public Operator {
int64_t maxPartialAggregationMemoryUsage_;
std::unique_ptr<GroupingSet> groupingSet_;

bool distinctAggregationSpillEnabled_{false};

bool partialFull_ = false;
bool newDistincts_ = false;
bool finished_ = false;
Expand Down
Loading

0 comments on commit 5b87685

Please sign in to comment.