diff --git a/velox/common/base/SpillConfig.cpp b/velox/common/base/SpillConfig.cpp index ca52d79f63662..71188fc5c6dc0 100644 --- a/velox/common/base/SpillConfig.cpp +++ b/velox/common/base/SpillConfig.cpp @@ -33,7 +33,6 @@ SpillConfig::SpillConfig( int32_t _maxSpillLevel, uint64_t _maxSpillRunRows, uint64_t _writerFlushThresholdSize, - int32_t _testSpillPct, const std::string& _compressionKind, const std::string& _fileCreateConfig) : getSpillDirPathCb(std::move(_getSpillDirPathCb)), @@ -52,7 +51,6 @@ SpillConfig::SpillConfig( maxSpillLevel(_maxSpillLevel), maxSpillRunRows(_maxSpillRunRows), writerFlushThresholdSize(_writerFlushThresholdSize), - testSpillPct(_testSpillPct), compressionKind(common::stringToCompressionKind(_compressionKind)), fileCreateConfig(_fileCreateConfig) { VELOX_USER_CHECK_GE( diff --git a/velox/common/base/SpillConfig.h b/velox/common/base/SpillConfig.h index 27ff401df40e8..e018f2ad2f0b7 100644 --- a/velox/common/base/SpillConfig.h +++ b/velox/common/base/SpillConfig.h @@ -60,7 +60,6 @@ struct SpillConfig { int32_t _maxSpillLevel, uint64_t _maxSpillRunRows, uint64_t _writerFlushThresholdSize, - int32_t _testSpillPct, const std::string& _compressionKind, const std::string& _fileCreateConfig = {}); @@ -137,10 +136,6 @@ struct SpillConfig { /// writer by flushing its buffered data to disk. uint64_t writerFlushThresholdSize; - /// Percentage of input batches to be spilled for testing. 0 means no - /// spilling for test. - int32_t testSpillPct; - /// CompressionKind when spilling, CompressionKind_NONE means no compression. common::CompressionKind compressionKind; diff --git a/velox/common/base/tests/SpillConfigTest.cpp b/velox/common/base/tests/SpillConfigTest.cpp index 501634c2c1d7b..11b1f42748560 100644 --- a/velox/common/base/tests/SpillConfigTest.cpp +++ b/velox/common/base/tests/SpillConfigTest.cpp @@ -40,7 +40,6 @@ TEST(SpillConfig, spillLevel) { 0, 0, 0, - 0, "none"); struct { uint8_t bitOffset; @@ -127,7 +126,6 @@ TEST(SpillConfig, spillLevelLimit) { testData.maxSpillLevel, 0, 0, - 0, "none"); ASSERT_EQ( @@ -175,7 +173,6 @@ TEST(SpillConfig, spillableReservationPercentages) { 0, 1'000'000, 0, - 0, "none"); }; diff --git a/velox/connectors/hive/tests/HiveDataSinkTest.cpp b/velox/connectors/hive/tests/HiveDataSinkTest.cpp index 426d93f51b9c2..12515f2de9983 100644 --- a/velox/connectors/hive/tests/HiveDataSinkTest.cpp +++ b/velox/connectors/hive/tests/HiveDataSinkTest.cpp @@ -98,7 +98,6 @@ class HiveDataSinkTest : public exec::test::HiveConnectorTestBase { 0, 0, writerFlushThreshold, - 0, "none"); } diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 39bf52cfdab92..9597ec273cd8b 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -230,8 +230,6 @@ class QueryConfig { /// value is set to 100 GB. static constexpr const char* kMaxSpillBytes = "max_spill_bytes"; - static constexpr const char* kTestingSpillPct = "testing.spill_pct"; - /// The max allowed spilling level with zero being the initial spilling level. /// This only applies for hash build spilling which might trigger recursive /// spilling when the build table is too big. If it is set to -1, then there @@ -563,12 +561,6 @@ class QueryConfig { return get(kTopNRowNumberSpillEnabled, true); } - /// Returns a percentage of aggregation or join input batches that will be - /// forced to spill for testing. 0 means no extra spilling. - int32_t testingSpillPct() const { - return get(kTestingSpillPct, 0); - } - int32_t maxSpillLevel() const { return get(kMaxSpillLevel, 4); } diff --git a/velox/dwio/dwrf/test/E2EWriterTest.cpp b/velox/dwio/dwrf/test/E2EWriterTest.cpp index 54ff2c723e2a3..f23a1d77c2d86 100644 --- a/velox/dwio/dwrf/test/E2EWriterTest.cpp +++ b/velox/dwio/dwrf/test/E2EWriterTest.cpp @@ -258,7 +258,6 @@ class E2EWriterTest : public testing::Test { 0, 0, writerFlushThresholdSize, - 0, "none"); } diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index 551ac2131a95b..e37022f97ff6e 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -159,7 +159,6 @@ std::optional DriverCtx::makeSpillConfig( queryConfig.maxSpillLevel(), queryConfig.maxSpillRunRows(), queryConfig.writerFlushThresholdBytes(), - queryConfig.testingSpillPct(), queryConfig.spillCompressionKind(), queryConfig.spillFileCreateConfig()); } diff --git a/velox/exec/GroupingSet.cpp b/velox/exec/GroupingSet.cpp index 63035396ddd0f..0af733800e627 100644 --- a/velox/exec/GroupingSet.cpp +++ b/velox/exec/GroupingSet.cpp @@ -823,9 +823,7 @@ void GroupingSet::ensureInputFits(const RowVectorPtr& input) { const int64_t flatBytes = input->estimateFlatSize(); // Test-only spill path. - if (spillConfig_->testSpillPct > 0 && - (folly::hasher()(++spillTestCounter_)) % 100 <= - spillConfig_->testSpillPct) { + if (testingTriggerSpill()) { spill(); return; } @@ -895,9 +893,7 @@ void GroupingSet::ensureOutputFits() { } // Test-only spill path. - if (spillConfig_->testSpillPct > 0 && - (folly::hasher()(++spillTestCounter_)) % 100 <= - spillConfig_->testSpillPct) { + if (testingTriggerSpill()) { spill(RowContainerIterator{}); return; } diff --git a/velox/exec/GroupingSet.h b/velox/exec/GroupingSet.h index 40b4d9296afbf..db42f6f4190af 100644 --- a/velox/exec/GroupingSet.h +++ b/velox/exec/GroupingSet.h @@ -348,10 +348,6 @@ class GroupingSet { // Pool of the OperatorCtx. Used for spilling. memory::MemoryPool& pool_; - // Counts input batches and triggers spilling if folly hash of this % 100 <= - // 'spillConfig_->testSpillPct'. - uint64_t spillTestCounter_{0}; - // True if partial aggregation has been given up as non-productive. bool abandonedPartialAggregation_{false}; diff --git a/velox/exec/fuzzer/AggregationFuzzerBase.cpp b/velox/exec/fuzzer/AggregationFuzzerBase.cpp index 04d9249822972..260eaf4e7953d 100644 --- a/velox/exec/fuzzer/AggregationFuzzerBase.cpp +++ b/velox/exec/fuzzer/AggregationFuzzerBase.cpp @@ -373,12 +373,13 @@ velox::test::ResultOrError AggregationFuzzerBase::execute( builder.configs(queryConfigs_); + int32_t spillPct{0}; if (injectSpill) { spillDirectory = exec::test::TempDirectoryPath::create(); builder.spillDirectory(spillDirectory->path) .config(core::QueryConfig::kSpillEnabled, "true") - .config(core::QueryConfig::kAggregationSpillEnabled, "true") - .config(core::QueryConfig::kTestingSpillPct, "100"); + .config(core::QueryConfig::kAggregationSpillEnabled, "true"); + spillPct = 100; } if (abandonPartial) { @@ -392,6 +393,7 @@ velox::test::ResultOrError AggregationFuzzerBase::execute( builder.splits(splits); } + TestScopedSpillInjection scopedSpillInjection(spillPct); resultOrError.result = builder.maxDrivers(maxDrivers).copyResults(pool_.get()); } catch (VeloxUserError& e) { diff --git a/velox/exec/tests/AggregationTest.cpp b/velox/exec/tests/AggregationTest.cpp index 56ec6f3b3fe4a..05406c2ebd662 100644 --- a/velox/exec/tests/AggregationTest.cpp +++ b/velox/exec/tests/AggregationTest.cpp @@ -1650,12 +1650,12 @@ TEST_F(AggregationTest, spillDuringOutputProcessing) { const int numOutputRows = 5; auto tempDirectory = exec::test::TempDirectoryPath::create(); core::PlanNodeId aggrNodeId; + TestScopedSpillInjection scopedSpillInjection(100); auto task = AssertQueryBuilder(duckDbQueryRunner_) .spillDirectory(tempDirectory->path) .config(QueryConfig::kSpillEnabled, true) .config(QueryConfig::kAggregationSpillEnabled, true) - .config(QueryConfig::kTestingSpillPct, "100") // Set very large output buffer size, the number of output rows is // effectively controlled by 'kPreferredOutputBatchBytes'. .config( @@ -1818,11 +1818,11 @@ TEST_F(AggregationTest, distinctWithSpilling) { createDuckDbTable(vectors); auto spillDirectory = exec::test::TempDirectoryPath::create(); core::PlanNodeId aggrNodeId; + TestScopedSpillInjection scopedSpillInjection(100); auto task = AssertQueryBuilder(duckDbQueryRunner_) .spillDirectory(spillDirectory->path) .config(QueryConfig::kSpillEnabled, true) .config(QueryConfig::kAggregationSpillEnabled, true) - .config(QueryConfig::kTestingSpillPct, "100") .plan(PlanBuilder() .values(vectors) .singleAggregation({"c0"}, {}, {}) @@ -1841,12 +1841,12 @@ TEST_F(AggregationTest, spillingForAggrsWithDistinct) { createDuckDbTable(vectors); auto spillDirectory = exec::test::TempDirectoryPath::create(); core::PlanNodeId aggrNodeId; + TestScopedSpillInjection scopedSpillInjection(100); auto task = AssertQueryBuilder(duckDbQueryRunner_) .spillDirectory(spillDirectory->path) .config(QueryConfig::kSpillEnabled, true) .config(QueryConfig::kAggregationSpillEnabled, true) - .config(QueryConfig::kTestingSpillPct, "100") .plan(PlanBuilder() .values(vectors) .singleAggregation({"c1"}, {"count(DISTINCT c0)"}, {}) @@ -1857,7 +1857,6 @@ TEST_F(AggregationTest, spillingForAggrsWithDistinct) { const auto& queryConfig = task->queryCtx()->queryConfig(); ASSERT_TRUE(queryConfig.spillEnabled()); ASSERT_TRUE(queryConfig.aggregationSpillEnabled()); - ASSERT_EQ(100, queryConfig.testingSpillPct()); ASSERT_EQ(toPlanStats(task->taskStats()).at(aggrNodeId).spilledBytes, 0); OperatorTestBase::deleteTaskAndCheckSpillDirectory(task); } @@ -1871,11 +1870,11 @@ TEST_F(AggregationTest, spillingForAggrsWithSorting) { auto testPlan = [&](const core::PlanNodePtr& plan, const std::string& sql) { SCOPED_TRACE(sql); + TestScopedSpillInjection scopedSpillInjection(100); auto task = AssertQueryBuilder(duckDbQueryRunner_) .spillDirectory(spillDirectory->path) .config(QueryConfig::kSpillEnabled, true) .config(QueryConfig::kAggregationSpillEnabled, true) - .config(QueryConfig::kTestingSpillPct, "100") .plan(plan) .assertResults(sql); @@ -1970,12 +1969,12 @@ TEST_F(AggregationTest, preGroupedAggregationWithSpilling) { createDuckDbTable(vectors); auto spillDirectory = exec::test::TempDirectoryPath::create(); core::PlanNodeId aggrNodeId; + TestScopedSpillInjection scopedSpillInjection(100); auto task = AssertQueryBuilder(duckDbQueryRunner_) .spillDirectory(spillDirectory->path) .config(QueryConfig::kSpillEnabled, true) .config(QueryConfig::kAggregationSpillEnabled, true) - .config(QueryConfig::kTestingSpillPct, "100") .plan(PlanBuilder() .values(vectors) .aggregation( diff --git a/velox/exec/tests/SortBufferTest.cpp b/velox/exec/tests/SortBufferTest.cpp index 3f35d456f7e51..df4209bafa74d 100644 --- a/velox/exec/tests/SortBufferTest.cpp +++ b/velox/exec/tests/SortBufferTest.cpp @@ -58,7 +58,6 @@ class SortBufferTest : public OperatorTestBase { 0, 0, 0, - 0, "none"); } @@ -301,7 +300,6 @@ TEST_F(SortBufferTest, batchOutput) { 0, 0, 0, - 100, // testSpillPct "none"); auto sortBuffer = std::make_unique( inputType_, @@ -399,7 +397,6 @@ TEST_F(SortBufferTest, spill) { 0, 0, 0, - 0, "none"); auto sortBuffer = std::make_unique( inputType_, diff --git a/velox/exec/tests/TableWriteTest.cpp b/velox/exec/tests/TableWriteTest.cpp index 6c63c897f8df6..09af603cb0680 100644 --- a/velox/exec/tests/TableWriteTest.cpp +++ b/velox/exec/tests/TableWriteTest.cpp @@ -346,6 +346,7 @@ class TableWriteTest : public HiveConnectorTestBase { } const auto spillDirectory = exec::test::TempDirectoryPath::create(); + TestScopedSpillInjection scopedSpillInjection(100); return AssertQueryBuilder(plan, duckDbQueryRunner_) .spillDirectory(spillDirectory->path) .maxDrivers( @@ -357,7 +358,6 @@ class TableWriteTest : public HiveConnectorTestBase { std::to_string(numPartitionedTableWriterCount_)) .config(core::QueryConfig::kSpillEnabled, "true") .config(QueryConfig::kWriterSpillEnabled, "true") - .config(QueryConfig::kTestingSpillPct, "100") .copyResults(pool()); } diff --git a/velox/exec/tests/TaskTest.cpp b/velox/exec/tests/TaskTest.cpp index 32398da7abf6c..2d602352b82ec 100644 --- a/velox/exec/tests/TaskTest.cpp +++ b/velox/exec/tests/TaskTest.cpp @@ -1383,8 +1383,7 @@ TEST_F(TaskTest, spillDirectoryLifecycleManagement) { params.queryCtx = std::make_shared(driverExecutor_.get()); params.queryCtx->testingOverrideConfigUnsafe( {{core::QueryConfig::kSpillEnabled, "true"}, - {core::QueryConfig::kAggregationSpillEnabled, "true"}, - {core::QueryConfig::kTestingSpillPct, "100"}}); + {core::QueryConfig::kAggregationSpillEnabled, "true"}}); params.maxDrivers = 1; auto cursor = TaskCursor::create(params); @@ -1394,6 +1393,7 @@ TEST_F(TaskTest, spillDirectoryLifecycleManagement) { rootTempDir->path + "/spillDirectoryLifecycleManagement"; task->setSpillDirectory(tmpDirectoryPath, false); + TestScopedSpillInjection scopedSpillInjection(100); while (cursor->moveNext()) { } ASSERT_TRUE(waitForTaskCompletion(task.get(), 5'000'000)); diff --git a/velox/functions/lib/aggregates/tests/utils/AggregationTestBase.cpp b/velox/functions/lib/aggregates/tests/utils/AggregationTestBase.cpp index 178ac443fea17..01f72fc092146 100644 --- a/velox/functions/lib/aggregates/tests/utils/AggregationTestBase.cpp +++ b/velox/functions/lib/aggregates/tests/utils/AggregationTestBase.cpp @@ -24,6 +24,7 @@ #include "velox/dwio/dwrf/writer/Writer.h" #include "velox/exec/AggregateCompanionSignatures.h" #include "velox/exec/PlanNodeStats.h" +#include "velox/exec/Spill.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" #include "velox/exec/tests/utils/TempFilePath.h" #include "velox/expression/Expr.h" @@ -376,12 +377,12 @@ void AggregationTestBase::testAggregationsWithCompanion( AssertQueryBuilder queryBuilder(builder.planNode(), duckDbQueryRunner_); queryBuilder.configs(config) - .config(core::QueryConfig::kTestingSpillPct, 100) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kAggregationSpillEnabled, true) .spillDirectory(spillDirectory->path) .maxDrivers(4); + exec::TestScopedSpillInjection scopedSpillInjection(100); auto task = assertResults(queryBuilder); // Expect > 0 spilled bytes unless there was no input. @@ -785,12 +786,12 @@ void AggregationTestBase::testAggregationsImpl( memory::spillMemoryPool()->stats().peakBytes; AssertQueryBuilder queryBuilder(builder.planNode(), duckDbQueryRunner_); queryBuilder.configs(config) - .config(core::QueryConfig::kTestingSpillPct, 100) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kAggregationSpillEnabled, true) .spillDirectory(spillDirectory->path) .maxDrivers(4); + exec::TestScopedSpillInjection scopedSpillInjection(100); auto task = assertResults(queryBuilder); // Expect > 0 spilled bytes unless there was no input. @@ -871,11 +872,12 @@ void AggregationTestBase::testAggregationsImpl( auto spillDirectory = exec::test::TempDirectoryPath::create(); AssertQueryBuilder queryBuilder(builder.planNode(), duckDbQueryRunner_); - queryBuilder.configs(config).config(core::QueryConfig::kTestingSpillPct, "100") + queryBuilder.configs(config) .config(core::QueryConfig::kSpillEnabled, "true") .config(core::QueryConfig::kAggregationSpillEnabled, "true") .spillDirectory(spillDirectory->path); + TestScopedSpillInjection scopedSpillInjection(100); auto task = assertResults(queryBuilder); // Expect > 0 spilled bytes unless there was no input. diff --git a/velox/functions/sparksql/aggregates/tests/FirstAggregateTest.cpp b/velox/functions/sparksql/aggregates/tests/FirstAggregateTest.cpp index 63ccc85d3b2d7..6792dec1537c0 100644 --- a/velox/functions/sparksql/aggregates/tests/FirstAggregateTest.cpp +++ b/velox/functions/sparksql/aggregates/tests/FirstAggregateTest.cpp @@ -590,8 +590,8 @@ TEST_F(FirstAggregateTest, spillingAndSorting) { auto spillDirectory = exec::test::TempDirectoryPath::create(); + exec::TestScopedSpillInjection scopedSpillInjection(100); results = AssertQueryBuilder(plan) - .config(core::QueryConfig::kTestingSpillPct, "100") .config(core::QueryConfig::kSpillEnabled, "true") .config(core::QueryConfig::kAggregationSpillEnabled, "true") .spillDirectory(spillDirectory->path)