diff --git a/velox/exec/HashBuild.cpp b/velox/exec/HashBuild.cpp index a1805e69cd2e1..2e3965e177428 100644 --- a/velox/exec/HashBuild.cpp +++ b/velox/exec/HashBuild.cpp @@ -1107,15 +1107,6 @@ std::string HashBuild::stateName(State state) { } } -bool HashBuild::testingTriggerSpill() { - // Test-only spill path. - if (spillConfig()->testSpillPct == 0) { - return false; - } - return folly::hasher()(++spillTestCounter_) % 100 <= - spillConfig()->testSpillPct; -} - void HashBuild::reclaim( uint64_t /*unused*/, memory::MemoryReclaimer::Stats& stats) { diff --git a/velox/exec/HashBuild.h b/velox/exec/HashBuild.h index 84366f5cff2ba..2ddd17181bb26 100644 --- a/velox/exec/HashBuild.h +++ b/velox/exec/HashBuild.h @@ -236,9 +236,6 @@ class HashBuild final : public Operator { void addRuntimeStats(); - // Invoked to check if it needs to trigger spilling for test purpose only. - bool testingTriggerSpill(); - // Indicates if this hash build operator is under non-reclaimable state or // not. bool nonReclaimableState() const; @@ -307,10 +304,6 @@ class HashBuild final : public Operator { // at least one entry with null join keys. bool joinHasNullKeys_{false}; - // Counts input batches and triggers spilling if folly hash of this % 100 <= - // 'testSpillPct_';. - uint64_t spillTestCounter_{0}; - // The spill targets set by 'requestSpill()' to request group spill. uint64_t numSpillRows_{0}; uint64_t numSpillBytes_{0}; diff --git a/velox/exec/RowNumber.cpp b/velox/exec/RowNumber.cpp index 2dcf06a84fe4d..70e555cea1243 100644 --- a/velox/exec/RowNumber.cpp +++ b/velox/exec/RowNumber.cpp @@ -206,7 +206,7 @@ void RowNumber::ensureInputFits(const RowVectorPtr& input) { const auto outOfLineBytesPerRow = outOfLineBytes / numDistinct; // Test-only spill path. - if (spillConfig_->testSpillPct > 0) { + if (testingTriggerSpill()) { spill(); return; } diff --git a/velox/exec/SortBuffer.cpp b/velox/exec/SortBuffer.cpp index f00d2991feef1..cc97841125222 100644 --- a/velox/exec/SortBuffer.cpp +++ b/velox/exec/SortBuffer.cpp @@ -194,9 +194,7 @@ void SortBuffer::ensureInputFits(const VectorPtr& input) { const int64_t flatInputBytes = input->estimateFlatSize(); // Test-only spill path. - if (numRows > 0 && spillConfig_->testSpillPct && - (folly::hasher()(++spillTestCounter_)) % 100 <= - spillConfig_->testSpillPct) { + if (numRows > 0 && testingTriggerSpill()) { spill(); return; } diff --git a/velox/exec/SortBuffer.h b/velox/exec/SortBuffer.h index fa62460d203a6..791cc8b03636e 100644 --- a/velox/exec/SortBuffer.h +++ b/velox/exec/SortBuffer.h @@ -124,8 +124,6 @@ class SortBuffer { // Records the source rows to copy to 'output_' in order. std::vector spillSources_; std::vector spillSourceRows_; - // Counts input batches to trigger spilling for test. - uint64_t spillTestCounter_{0}; // Reusable output vector. RowVectorPtr output_; diff --git a/velox/exec/SortWindowBuild.cpp b/velox/exec/SortWindowBuild.cpp index 24ef55467df12..ce70f011ac570 100644 --- a/velox/exec/SortWindowBuild.cpp +++ b/velox/exec/SortWindowBuild.cpp @@ -88,7 +88,7 @@ void SortWindowBuild::ensureInputFits(const RowVectorPtr& input) { } // Test-only spill path. - if (spillConfig_->testSpillPct > 0) { + if (testingTriggerSpill()) { spill(); return; } diff --git a/velox/exec/Spill.cpp b/velox/exec/Spill.cpp index bbabd65d29ed0..3c96ac234994a 100644 --- a/velox/exec/Spill.cpp +++ b/velox/exec/Spill.cpp @@ -279,4 +279,38 @@ SpillPartitionIdSet toSpillPartitionIdSet( } return partitionIdSet; } + +tsan_atomic& testingSpillPct() { + static tsan_atomic spillPct = 0; + return spillPct; +} + +tsan_atomic& testingSpillCounter() { + static tsan_atomic spillCounter = 0; + return spillCounter; +} + +TestScopedSpillInjection::TestScopedSpillInjection( + int32_t spillPct, + int32_t maxInjections) { + VELOX_CHECK_EQ(testingSpillCounter(), 0); + testingSpillPct() = spillPct; + testingSpillCounter() = maxInjections; +} + +TestScopedSpillInjection::~TestScopedSpillInjection() { + testingSpillPct() = 0; + testingSpillCounter() = 0; +} + +bool testingTriggerSpill() { + // Do not evaluate further if trigger is not set. + if (testingSpillCounter() == 0 || testingSpillPct() == 0) { + return false; + } + if (folly::Random::rand32() % 100 < testingSpillPct()) { + return testingSpillCounter()-- > 0; + } + return false; +} } // namespace facebook::velox::exec diff --git a/velox/exec/Spill.h b/velox/exec/Spill.h index 21ea9d17388e1..d5e3bf18b7ca5 100644 --- a/velox/exec/Spill.h +++ b/velox/exec/Spill.h @@ -434,6 +434,27 @@ class SpillState { /// Generate partition id set from given spill partition set. SpillPartitionIdSet toSpillPartitionIdSet( const SpillPartitionSet& partitionSet); + +/// Scoped spill percentage utility that allows user to set the behavior of +/// triggered spill. +/// 'spillPct' indicates the chance of triggering spilling. 100% means spill +/// will always be triggered. +/// 'maxInjections' indicates the max number of actual triggering. e.g. when +/// 'spillPct' is 20 and 'maxInjections' is 10, continuous calls to +/// testingTriggerSpill() will keep rolling the dice that has a chance of 20% +/// triggering until 10 triggers have been invoked. +class TestScopedSpillInjection { + public: + explicit TestScopedSpillInjection( + int32_t spillPct, + int32_t maxInjections = std::numeric_limits::max()); + + ~TestScopedSpillInjection(); +}; + +/// Test utility that returns true if triggered spill is evaluated to happen, +/// false otherwise. +bool testingTriggerSpill(); } // namespace facebook::velox::exec // Adding the custom hash for SpillPartitionId to std::hash to make it usable diff --git a/velox/exec/TopNRowNumber.cpp b/velox/exec/TopNRowNumber.cpp index 5ad9184c0bd5b..2b7b1cbfd884a 100644 --- a/velox/exec/TopNRowNumber.cpp +++ b/velox/exec/TopNRowNumber.cpp @@ -675,7 +675,7 @@ void TopNRowNumber::ensureInputFits(const RowVectorPtr& input) { } // Test-only spill path. - if (spillConfig_->testSpillPct > 0) { + if (testingTriggerSpill()) { spill(); return; } diff --git a/velox/exec/tests/HashJoinTest.cpp b/velox/exec/tests/HashJoinTest.cpp index 08dc5b8f76041..43ea05ecfdd11 100644 --- a/velox/exec/tests/HashJoinTest.cpp +++ b/velox/exec/tests/HashJoinTest.cpp @@ -586,6 +586,7 @@ class HashJoinBuilder { } auto queryCtx = std::make_shared(executor_); std::shared_ptr spillDirectory; + int32_t spillPct{0}; if (injectSpill) { spillDirectory = exec::test::TempDirectoryPath::create(); builder.spillDirectory(spillDirectory->path); @@ -595,7 +596,7 @@ class HashJoinBuilder { // Disable write buffering to ease test verification. For example, we want // many spilled vectors in a spilled file to trigger recursive spilling. config(core::QueryConfig::kSpillWriteBufferSize, std::to_string(0)); - config(core::QueryConfig::kTestingSpillPct, "100"); + spillPct = 100; } else if (spillMemoryThreshold_ != 0) { spillDirectory = exec::test::TempDirectoryPath::create(); builder.spillDirectory(spillDirectory->path); @@ -636,6 +637,7 @@ class HashJoinBuilder { ASSERT_EQ(memory::spillMemoryPool()->stats().currentBytes, 0); const uint64_t peakSpillMemoryUsage = memory::spillMemoryPool()->stats().peakBytes; + TestScopedSpillInjection scopedSpillInjection(spillPct); auto task = builder.assertResults(referenceQuery_); const auto statsPair = taskSpilledStats(*task); if (injectSpill) { diff --git a/velox/exec/tests/JoinFuzzer.cpp b/velox/exec/tests/JoinFuzzer.cpp index 6338de3223873..4d1b04255af68 100644 --- a/velox/exec/tests/JoinFuzzer.cpp +++ b/velox/exec/tests/JoinFuzzer.cpp @@ -292,14 +292,16 @@ RowVectorPtr JoinFuzzer::execute(const PlanWithSplits& plan, bool injectSpill) { } std::shared_ptr spillDirectory; + int32_t spillPct{0}; if (injectSpill) { spillDirectory = exec::test::TempDirectoryPath::create(); builder.config(core::QueryConfig::kSpillEnabled, "true") .config(core::QueryConfig::kAggregationSpillEnabled, "true") - .config(core::QueryConfig::kTestingSpillPct, "100") .spillDirectory(spillDirectory->path); + spillPct = 100; } + TestScopedSpillInjection scopedSpillInjection(spillPct); auto result = builder.maxDrivers(2).copyResults(pool_.get()); LOG(INFO) << "Results: " << result->toString(); if (VLOG_IS_ON(1)) { diff --git a/velox/exec/tests/OrderByTest.cpp b/velox/exec/tests/OrderByTest.cpp index 2295534ca4bf3..beacf8cb4811a 100644 --- a/velox/exec/tests/OrderByTest.cpp +++ b/velox/exec/tests/OrderByTest.cpp @@ -196,8 +196,8 @@ class OrderByTest : public OperatorTestBase { SCOPED_TRACE("run with spilling"); auto spillDirectory = exec::test::TempDirectoryPath::create(); auto queryCtx = std::make_shared(executor_.get()); + TestScopedSpillInjection scopedSpillInjection(100); queryCtx->testingOverrideConfigUnsafe({ - {core::QueryConfig::kTestingSpillPct, "100"}, {core::QueryConfig::kSpillEnabled, "true"}, {core::QueryConfig::kOrderBySpillEnabled, "true"}, }); diff --git a/velox/exec/tests/RowNumberTest.cpp b/velox/exec/tests/RowNumberTest.cpp index 6d1baf601dcf8..26294cd03b37e 100644 --- a/velox/exec/tests/RowNumberTest.cpp +++ b/velox/exec/tests/RowNumberTest.cpp @@ -51,8 +51,8 @@ TEST_F(RowNumberTest, spill) { makeFlatVector({vectorSize, vectorSize, vectorSize}), }); + TestScopedSpillInjection scopedSpillInjection(100); auto task = AssertQueryBuilder(plan) - .config(core::QueryConfig::kTestingSpillPct, 100) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kRowNumberSpillEnabled, true) .spillDirectory(spillDirectory->path) @@ -242,12 +242,12 @@ TEST_F(RowNumberTest, maxSpillBytes) { for (const auto& testData : testSettings) { SCOPED_TRACE(testData.debugString()); try { + TestScopedSpillInjection scopedSpillInjection(100); AssertQueryBuilder(plan) .spillDirectory(spillDirectory->path) .queryCtx(queryCtx) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kRowNumberSpillEnabled, true) - .config(core::QueryConfig::kTestingSpillPct, 100) .config(core::QueryConfig::kMaxSpillBytes, testData.maxSpilledBytes) .copyResults(pool_.get()); ASSERT_FALSE(testData.expectedExceedLimit); @@ -284,12 +284,12 @@ TEST_F(RowNumberTest, memoryUsage) { const std::string spillEnableConfig = std::to_string(spillEnable); std::shared_ptr task; + TestScopedSpillInjection scopedSpillInjection(100); AssertQueryBuilder(plan) .spillDirectory(spillDirectory->path) .queryCtx(queryCtx) .config(core::QueryConfig::kSpillEnabled, spillEnableConfig) .config(core::QueryConfig::kRowNumberSpillEnabled, spillEnableConfig) - .config(core::QueryConfig::kTestingSpillPct, "100") .spillDirectory(spillDirectory->path) .copyResults(pool_.get(), task); diff --git a/velox/exec/tests/SortBufferTest.cpp b/velox/exec/tests/SortBufferTest.cpp index e8c9c4d47cac7..3f35d456f7e51 100644 --- a/velox/exec/tests/SortBufferTest.cpp +++ b/velox/exec/tests/SortBufferTest.cpp @@ -282,6 +282,7 @@ TEST_F(SortBufferTest, batchOutput) { {false, {1024, 1024, 1024}, 1000, {1000, 1000, 1000, 72}}, {true, {1024, 1024, 1024}, 1000, {1000, 1000, 1000, 72}}}; + TestScopedSpillInjection scopedSpillInjection(100); for (const auto& testData : testSettings) { SCOPED_TRACE(testData.debugString()); auto spillDirectory = exec::test::TempDirectoryPath::create(); diff --git a/velox/exec/tests/TableWriteTest.cpp b/velox/exec/tests/TableWriteTest.cpp index 799116e3c2767..6c63c897f8df6 100644 --- a/velox/exec/tests/TableWriteTest.cpp +++ b/velox/exec/tests/TableWriteTest.cpp @@ -275,6 +275,7 @@ class TableWriteTest : public HiveConnectorTestBase { .assertResults(duckDbSql); } const auto spillDirectory = exec::test::TempDirectoryPath::create(); + TestScopedSpillInjection scopedSpillInjection(100); return AssertQueryBuilder(plan, duckDbQueryRunner_) .spillDirectory(spillDirectory->path) .maxDrivers( @@ -286,7 +287,6 @@ class TableWriteTest : public HiveConnectorTestBase { std::to_string(numPartitionedTableWriterCount_)) .config(core::QueryConfig::kSpillEnabled, "true") .config(QueryConfig::kWriterSpillEnabled, "true") - .config(QueryConfig::kTestingSpillPct, "100") .splits(splits) .assertResults(duckDbSql); } @@ -296,6 +296,7 @@ class TableWriteTest : public HiveConnectorTestBase { const std::string& duckDbSql, bool enableSpill = false) { if (!enableSpill) { + TestScopedSpillInjection scopedSpillInjection(100); return AssertQueryBuilder(plan, duckDbQueryRunner_) .maxDrivers( 2 * @@ -308,11 +309,11 @@ class TableWriteTest : public HiveConnectorTestBase { std::to_string(numPartitionedTableWriterCount_)) .config(core::QueryConfig::kSpillEnabled, "true") .config(QueryConfig::kWriterSpillEnabled, "true") - .config(QueryConfig::kTestingSpillPct, "100") .assertResults(duckDbSql); } const auto spillDirectory = exec::test::TempDirectoryPath::create(); + TestScopedSpillInjection scopedSpillInjection(100); return AssertQueryBuilder(plan, duckDbQueryRunner_) .spillDirectory(spillDirectory->path) .maxDrivers( @@ -324,7 +325,6 @@ class TableWriteTest : public HiveConnectorTestBase { std::to_string(numPartitionedTableWriterCount_)) .config(core::QueryConfig::kSpillEnabled, "true") .config(QueryConfig::kWriterSpillEnabled, "true") - .config(QueryConfig::kTestingSpillPct, "100") .assertResults(duckDbSql); } diff --git a/velox/exec/tests/TaskTest.cpp b/velox/exec/tests/TaskTest.cpp index 37c9e56183953..32398da7abf6c 100644 --- a/velox/exec/tests/TaskTest.cpp +++ b/velox/exec/tests/TaskTest.cpp @@ -1372,7 +1372,6 @@ TEST_F(TaskTest, spillDirectoryLifecycleManagement) { makeFlatVector(1'000, [](auto row) { return row % 300; }), makeFlatVector(1'000, [](auto row) { return row; }), }); - core::PlanNodeId aggrNodeId; const auto plan = PlanBuilder() .values({data}) @@ -1441,9 +1440,9 @@ TEST_F(TaskTest, spillDirNotCreated) { params.queryCtx = std::make_shared(driverExecutor_.get()); params.queryCtx->testingOverrideConfigUnsafe( {{core::QueryConfig::kSpillEnabled, "true"}, - {core::QueryConfig::kJoinSpillEnabled, "true"}, - {core::QueryConfig::kTestingSpillPct, "0"}}); + {core::QueryConfig::kJoinSpillEnabled, "true"}}); params.maxDrivers = 1; + TestScopedSpillInjection scopedSpillInjection(100); auto cursor = TaskCursor::create(params); auto* task = cursor->task().get(); diff --git a/velox/exec/tests/TopNRowNumberTest.cpp b/velox/exec/tests/TopNRowNumberTest.cpp index 28034537c403c..bbc9bcd1c61a5 100644 --- a/velox/exec/tests/TopNRowNumberTest.cpp +++ b/velox/exec/tests/TopNRowNumberTest.cpp @@ -132,22 +132,24 @@ TEST_F(TopNRowNumberTest, largeOutput) { .assertResults(sql); // Spilling. - auto task = - AssertQueryBuilder(plan, duckDbQueryRunner_) - .config(core::QueryConfig::kPreferredOutputBatchBytes, "1024") - .config(core::QueryConfig::kTestingSpillPct, "100") - .config(core::QueryConfig::kSpillEnabled, "true") - .config(core::QueryConfig::kTopNRowNumberSpillEnabled, "true") - .spillDirectory(spillDirectory->path) - .assertResults(sql); - - auto taskStats = exec::toPlanStats(task->taskStats()); - const auto& stats = taskStats.at(topNRowNumberId); - - ASSERT_GT(stats.spilledBytes, 0); - ASSERT_GT(stats.spilledRows, 0); - ASSERT_GT(stats.spilledFiles, 0); - ASSERT_GT(stats.spilledPartitions, 0); + { + TestScopedSpillInjection scopedSpillInjection(100); + auto task = + AssertQueryBuilder(plan, duckDbQueryRunner_) + .config(core::QueryConfig::kPreferredOutputBatchBytes, "1024") + .config(core::QueryConfig::kSpillEnabled, "true") + .config(core::QueryConfig::kTopNRowNumberSpillEnabled, "true") + .spillDirectory(spillDirectory->path) + .assertResults(sql); + + auto taskStats = exec::toPlanStats(task->taskStats()); + const auto& stats = taskStats.at(topNRowNumberId); + + ASSERT_GT(stats.spilledBytes, 0); + ASSERT_GT(stats.spilledRows, 0); + ASSERT_GT(stats.spilledFiles, 0); + ASSERT_GT(stats.spilledPartitions, 0); + } // No partitioning keys. plan = PlanBuilder() @@ -212,24 +214,26 @@ TEST_F(TopNRowNumberTest, manyPartitions) { assertQuery(plan, sql); // Spilling. - auto task = - AssertQueryBuilder(plan, duckDbQueryRunner_) - .config( - core::QueryConfig::kPreferredOutputBatchBytes, - fmt::format("{}", outputBatchBytes)) - .config(core::QueryConfig::kTestingSpillPct, "100") - .config(core::QueryConfig::kSpillEnabled, "true") - .config(core::QueryConfig::kTopNRowNumberSpillEnabled, "true") - .spillDirectory(spillDirectory->path) - .assertResults(sql); - - auto taskStats = exec::toPlanStats(task->taskStats()); - const auto& stats = taskStats.at(topNRowNumberId); - - ASSERT_GT(stats.spilledBytes, 0); - ASSERT_GT(stats.spilledRows, 0); - ASSERT_GT(stats.spilledFiles, 0); - ASSERT_GT(stats.spilledPartitions, 0); + { + TestScopedSpillInjection scopedSpillInjection(100); + auto task = + AssertQueryBuilder(plan, duckDbQueryRunner_) + .config( + core::QueryConfig::kPreferredOutputBatchBytes, + fmt::format("{}", outputBatchBytes)) + .config(core::QueryConfig::kSpillEnabled, "true") + .config(core::QueryConfig::kTopNRowNumberSpillEnabled, "true") + .spillDirectory(spillDirectory->path) + .assertResults(sql); + + auto taskStats = exec::toPlanStats(task->taskStats()); + const auto& stats = taskStats.at(topNRowNumberId); + + ASSERT_GT(stats.spilledBytes, 0); + ASSERT_GT(stats.spilledRows, 0); + ASSERT_GT(stats.spilledFiles, 0); + ASSERT_GT(stats.spilledPartitions, 0); + } }; testLimit(1); @@ -353,12 +357,12 @@ TEST_F(TopNRowNumberTest, maxSpillBytes) { for (const auto& testData : testSettings) { SCOPED_TRACE(testData.debugString()); try { + TestScopedSpillInjection scopedSpillInjection(100); AssertQueryBuilder(plan) .spillDirectory(spillDirectory->path) .queryCtx(queryCtx) .config(core::QueryConfig::kSpillEnabled, "true") .config(core::QueryConfig::kTopNRowNumberSpillEnabled, "true") - .config(core::QueryConfig::kTestingSpillPct, "100") .config( core::QueryConfig::kMaxSpillBytes, std::to_string(testData.maxSpilledBytes)) diff --git a/velox/exec/tests/WindowTest.cpp b/velox/exec/tests/WindowTest.cpp index 9a759ea770e0e..cfc45ee5afd65 100644 --- a/velox/exec/tests/WindowTest.cpp +++ b/velox/exec/tests/WindowTest.cpp @@ -60,10 +60,10 @@ TEST_F(WindowTest, spill) { .planNode(); auto spillDirectory = TempDirectoryPath::create(); + TestScopedSpillInjection scopedSpillInjection(100); auto task = AssertQueryBuilder(plan, duckDbQueryRunner_) .config(core::QueryConfig::kPreferredOutputBatchBytes, "1024") - .config(core::QueryConfig::kTestingSpillPct, "100") .config(core::QueryConfig::kSpillEnabled, "true") .config(core::QueryConfig::kWindowSpillEnabled, "true") .spillDirectory(spillDirectory->path)