Skip to content

Commit

Permalink
[GLUTEN-4092][CH] Disable streaming aggregating before bugs fixed
Browse files Browse the repository at this point in the history
What changes were proposed in this pull request?
(Please fill in changes proposed in this fix)

Fixes: #4092

How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)

unit tests

(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
  • Loading branch information
lgbo-ustc authored Dec 19, 2023
1 parent f3d1a12 commit 3b0a43a
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite
.set("spark.gluten.sql.columnar.backend.ch.use.v2", "false")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm", "sparkMurmurHash3_32")
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_config.enable_streaming_aggregating",
"true")
}

override protected def createTPCHNotNullTables(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite
.set("spark.sql.shuffle.partitions", "1")
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
.set("spark.gluten.sql.columnar.backend.ch.use.v2", "false")
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_config.enable_streaming_aggregating",
"true")
}

override protected def createTPCHNotNullTables(): Unit = {
Expand Down
7 changes: 4 additions & 3 deletions cpp-ch/local-engine/Parser/AggregateRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,8 @@ void AggregateRelParser::addMergingAggregatedStep()
buildAggregateDescriptions(aggregate_descriptions);
auto settings = getContext()->getSettingsRef();
Aggregator::Params params(grouping_keys, aggregate_descriptions, false, settings.max_threads, settings.max_block_size);

if (settings.distributed_aggregation_memory_efficient)
bool enable_streaming_aggregating = getContext()->getConfigRef().getBool("enable_streaming_aggregating", false);
if (enable_streaming_aggregating)
{
auto merging_step = std::make_unique<GraceMergingAggregatedStep>(getContext(), plan->getCurrentDataStream(), params);
steps.emplace_back(merging_step.get());
Expand Down Expand Up @@ -269,8 +269,9 @@ void AggregateRelParser::addAggregatingStep()
AggregateDescriptions aggregate_descriptions;
buildAggregateDescriptions(aggregate_descriptions);
auto settings = getContext()->getSettingsRef();
bool enable_streaming_aggregating = getContext()->getConfigRef().getBool("enable_streaming_aggregating", false);

if (settings.distributed_aggregation_memory_efficient)
if (enable_streaming_aggregating)
{
// Disable spilling to disk.
Aggregator::Params params(
Expand Down

0 comments on commit 3b0a43a

Please sign in to comment.