Skip to content

Commit

Permalink
Followup: Deprecate spill pct completely
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Mar 3, 2024
1 parent e4f33f4 commit 74cd024
Show file tree
Hide file tree
Showing 16 changed files with 20 additions and 49 deletions.
2 changes: 0 additions & 2 deletions velox/common/base/SpillConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ SpillConfig::SpillConfig(
int32_t _maxSpillLevel,
uint64_t _maxSpillRunRows,
uint64_t _writerFlushThresholdSize,
int32_t _testSpillPct,
const std::string& _compressionKind,
const std::string& _fileCreateConfig)
: getSpillDirPathCb(std::move(_getSpillDirPathCb)),
Expand All @@ -52,7 +51,6 @@ SpillConfig::SpillConfig(
maxSpillLevel(_maxSpillLevel),
maxSpillRunRows(_maxSpillRunRows),
writerFlushThresholdSize(_writerFlushThresholdSize),
testSpillPct(_testSpillPct),
compressionKind(common::stringToCompressionKind(_compressionKind)),
fileCreateConfig(_fileCreateConfig) {
VELOX_USER_CHECK_GE(
Expand Down
5 changes: 0 additions & 5 deletions velox/common/base/SpillConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ struct SpillConfig {
int32_t _maxSpillLevel,
uint64_t _maxSpillRunRows,
uint64_t _writerFlushThresholdSize,
int32_t _testSpillPct,
const std::string& _compressionKind,
const std::string& _fileCreateConfig = {});

Expand Down Expand Up @@ -137,10 +136,6 @@ struct SpillConfig {
/// writer by flushing its buffered data to disk.
uint64_t writerFlushThresholdSize;

/// Percentage of input batches to be spilled for testing. 0 means no
/// spilling for test.
int32_t testSpillPct;

/// CompressionKind when spilling, CompressionKind_NONE means no compression.
common::CompressionKind compressionKind;

Expand Down
3 changes: 0 additions & 3 deletions velox/common/base/tests/SpillConfigTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ TEST(SpillConfig, spillLevel) {
0,
0,
0,
0,
"none");
struct {
uint8_t bitOffset;
Expand Down Expand Up @@ -127,7 +126,6 @@ TEST(SpillConfig, spillLevelLimit) {
testData.maxSpillLevel,
0,
0,
0,
"none");

ASSERT_EQ(
Expand Down Expand Up @@ -175,7 +173,6 @@ TEST(SpillConfig, spillableReservationPercentages) {
0,
1'000'000,
0,
0,
"none");
};

Expand Down
1 change: 0 additions & 1 deletion velox/connectors/hive/tests/HiveDataSinkTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ class HiveDataSinkTest : public exec::test::HiveConnectorTestBase {
0,
0,
writerFlushThreshold,
0,
"none");
}

Expand Down
8 changes: 0 additions & 8 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,6 @@ class QueryConfig {
/// value is set to 100 GB.
static constexpr const char* kMaxSpillBytes = "max_spill_bytes";

static constexpr const char* kTestingSpillPct = "testing.spill_pct";

/// The max allowed spilling level with zero being the initial spilling level.
/// This only applies for hash build spilling which might trigger recursive
/// spilling when the build table is too big. If it is set to -1, then there
Expand Down Expand Up @@ -563,12 +561,6 @@ class QueryConfig {
return get<bool>(kTopNRowNumberSpillEnabled, true);
}

/// Returns a percentage of aggregation or join input batches that will be
/// forced to spill for testing. 0 means no extra spilling.
int32_t testingSpillPct() const {
return get<int32_t>(kTestingSpillPct, 0);
}

int32_t maxSpillLevel() const {
return get<int32_t>(kMaxSpillLevel, 4);
}
Expand Down
1 change: 0 additions & 1 deletion velox/dwio/dwrf/test/E2EWriterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,6 @@ class E2EWriterTest : public testing::Test {
0,
0,
writerFlushThresholdSize,
0,
"none");
}

Expand Down
1 change: 0 additions & 1 deletion velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ std::optional<common::SpillConfig> DriverCtx::makeSpillConfig(
queryConfig.maxSpillLevel(),
queryConfig.maxSpillRunRows(),
queryConfig.writerFlushThresholdBytes(),
queryConfig.testingSpillPct(),
queryConfig.spillCompressionKind(),
queryConfig.spillFileCreateConfig());
}
Expand Down
8 changes: 2 additions & 6 deletions velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -823,9 +823,7 @@ void GroupingSet::ensureInputFits(const RowVectorPtr& input) {
const int64_t flatBytes = input->estimateFlatSize();

// Test-only spill path.
if (spillConfig_->testSpillPct > 0 &&
(folly::hasher<uint64_t>()(++spillTestCounter_)) % 100 <=
spillConfig_->testSpillPct) {
if (testingTriggerSpill()) {
spill();
return;
}
Expand Down Expand Up @@ -895,9 +893,7 @@ void GroupingSet::ensureOutputFits() {
}

// Test-only spill path.
if (spillConfig_->testSpillPct > 0 &&
(folly::hasher<uint64_t>()(++spillTestCounter_)) % 100 <=
spillConfig_->testSpillPct) {
if (testingTriggerSpill()) {
spill(RowContainerIterator{});
return;
}
Expand Down
4 changes: 0 additions & 4 deletions velox/exec/GroupingSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -348,10 +348,6 @@ class GroupingSet {
// Pool of the OperatorCtx. Used for spilling.
memory::MemoryPool& pool_;

// Counts input batches and triggers spilling if folly hash of this % 100 <=
// 'spillConfig_->testSpillPct'.
uint64_t spillTestCounter_{0};

// True if partial aggregation has been given up as non-productive.
bool abandonedPartialAggregation_{false};

Expand Down
6 changes: 4 additions & 2 deletions velox/exec/fuzzer/AggregationFuzzerBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -373,12 +373,13 @@ velox::test::ResultOrError AggregationFuzzerBase::execute(

builder.configs(queryConfigs_);

int32_t spillPct{0};
if (injectSpill) {
spillDirectory = exec::test::TempDirectoryPath::create();
builder.spillDirectory(spillDirectory->path)
.config(core::QueryConfig::kSpillEnabled, "true")
.config(core::QueryConfig::kAggregationSpillEnabled, "true")
.config(core::QueryConfig::kTestingSpillPct, "100");
.config(core::QueryConfig::kAggregationSpillEnabled, "true");
spillPct = 100;
}

if (abandonPartial) {
Expand All @@ -392,6 +393,7 @@ velox::test::ResultOrError AggregationFuzzerBase::execute(
builder.splits(splits);
}

TestScopedSpillInjection scopedSpillInjection(spillPct);
resultOrError.result =
builder.maxDrivers(maxDrivers).copyResults(pool_.get());
} catch (VeloxUserError& e) {
Expand Down
11 changes: 5 additions & 6 deletions velox/exec/tests/AggregationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1650,12 +1650,12 @@ TEST_F(AggregationTest, spillDuringOutputProcessing) {
const int numOutputRows = 5;
auto tempDirectory = exec::test::TempDirectoryPath::create();
core::PlanNodeId aggrNodeId;
TestScopedSpillInjection scopedSpillInjection(100);
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.spillDirectory(tempDirectory->path)
.config(QueryConfig::kSpillEnabled, true)
.config(QueryConfig::kAggregationSpillEnabled, true)
.config(QueryConfig::kTestingSpillPct, "100")
// Set very large output buffer size, the number of output rows is
// effectively controlled by 'kPreferredOutputBatchBytes'.
.config(
Expand Down Expand Up @@ -1818,11 +1818,11 @@ TEST_F(AggregationTest, distinctWithSpilling) {
createDuckDbTable(vectors);
auto spillDirectory = exec::test::TempDirectoryPath::create();
core::PlanNodeId aggrNodeId;
TestScopedSpillInjection scopedSpillInjection(100);
auto task = AssertQueryBuilder(duckDbQueryRunner_)
.spillDirectory(spillDirectory->path)
.config(QueryConfig::kSpillEnabled, true)
.config(QueryConfig::kAggregationSpillEnabled, true)
.config(QueryConfig::kTestingSpillPct, "100")
.plan(PlanBuilder()
.values(vectors)
.singleAggregation({"c0"}, {}, {})
Expand All @@ -1841,12 +1841,12 @@ TEST_F(AggregationTest, spillingForAggrsWithDistinct) {
createDuckDbTable(vectors);
auto spillDirectory = exec::test::TempDirectoryPath::create();
core::PlanNodeId aggrNodeId;
TestScopedSpillInjection scopedSpillInjection(100);
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.spillDirectory(spillDirectory->path)
.config(QueryConfig::kSpillEnabled, true)
.config(QueryConfig::kAggregationSpillEnabled, true)
.config(QueryConfig::kTestingSpillPct, "100")
.plan(PlanBuilder()
.values(vectors)
.singleAggregation({"c1"}, {"count(DISTINCT c0)"}, {})
Expand All @@ -1857,7 +1857,6 @@ TEST_F(AggregationTest, spillingForAggrsWithDistinct) {
const auto& queryConfig = task->queryCtx()->queryConfig();
ASSERT_TRUE(queryConfig.spillEnabled());
ASSERT_TRUE(queryConfig.aggregationSpillEnabled());
ASSERT_EQ(100, queryConfig.testingSpillPct());
ASSERT_EQ(toPlanStats(task->taskStats()).at(aggrNodeId).spilledBytes, 0);
OperatorTestBase::deleteTaskAndCheckSpillDirectory(task);
}
Expand All @@ -1871,11 +1870,11 @@ TEST_F(AggregationTest, spillingForAggrsWithSorting) {

auto testPlan = [&](const core::PlanNodePtr& plan, const std::string& sql) {
SCOPED_TRACE(sql);
TestScopedSpillInjection scopedSpillInjection(100);
auto task = AssertQueryBuilder(duckDbQueryRunner_)
.spillDirectory(spillDirectory->path)
.config(QueryConfig::kSpillEnabled, true)
.config(QueryConfig::kAggregationSpillEnabled, true)
.config(QueryConfig::kTestingSpillPct, "100")
.plan(plan)
.assertResults(sql);

Expand Down Expand Up @@ -1970,12 +1969,12 @@ TEST_F(AggregationTest, preGroupedAggregationWithSpilling) {
createDuckDbTable(vectors);
auto spillDirectory = exec::test::TempDirectoryPath::create();
core::PlanNodeId aggrNodeId;
TestScopedSpillInjection scopedSpillInjection(100);
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.spillDirectory(spillDirectory->path)
.config(QueryConfig::kSpillEnabled, true)
.config(QueryConfig::kAggregationSpillEnabled, true)
.config(QueryConfig::kTestingSpillPct, "100")
.plan(PlanBuilder()
.values(vectors)
.aggregation(
Expand Down
3 changes: 0 additions & 3 deletions velox/exec/tests/SortBufferTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ class SortBufferTest : public OperatorTestBase {
0,
0,
0,
0,
"none");
}

Expand Down Expand Up @@ -301,7 +300,6 @@ TEST_F(SortBufferTest, batchOutput) {
0,
0,
0,
100, // testSpillPct
"none");
auto sortBuffer = std::make_unique<SortBuffer>(
inputType_,
Expand Down Expand Up @@ -399,7 +397,6 @@ TEST_F(SortBufferTest, spill) {
0,
0,
0,
0,
"none");
auto sortBuffer = std::make_unique<SortBuffer>(
inputType_,
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/tests/TableWriteTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ class TableWriteTest : public HiveConnectorTestBase {
}

const auto spillDirectory = exec::test::TempDirectoryPath::create();
TestScopedSpillInjection scopedSpillInjection(100);
return AssertQueryBuilder(plan, duckDbQueryRunner_)
.spillDirectory(spillDirectory->path)
.maxDrivers(
Expand All @@ -357,7 +358,6 @@ class TableWriteTest : public HiveConnectorTestBase {
std::to_string(numPartitionedTableWriterCount_))
.config(core::QueryConfig::kSpillEnabled, "true")
.config(QueryConfig::kWriterSpillEnabled, "true")
.config(QueryConfig::kTestingSpillPct, "100")
.copyResults(pool());
}

Expand Down
4 changes: 2 additions & 2 deletions velox/exec/tests/TaskTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1383,8 +1383,7 @@ TEST_F(TaskTest, spillDirectoryLifecycleManagement) {
params.queryCtx = std::make_shared<core::QueryCtx>(driverExecutor_.get());
params.queryCtx->testingOverrideConfigUnsafe(
{{core::QueryConfig::kSpillEnabled, "true"},
{core::QueryConfig::kAggregationSpillEnabled, "true"},
{core::QueryConfig::kTestingSpillPct, "100"}});
{core::QueryConfig::kAggregationSpillEnabled, "true"}});
params.maxDrivers = 1;

auto cursor = TaskCursor::create(params);
Expand All @@ -1394,6 +1393,7 @@ TEST_F(TaskTest, spillDirectoryLifecycleManagement) {
rootTempDir->path + "/spillDirectoryLifecycleManagement";
task->setSpillDirectory(tmpDirectoryPath, false);

TestScopedSpillInjection scopedSpillInjection(100);
while (cursor->moveNext()) {
}
ASSERT_TRUE(waitForTaskCompletion(task.get(), 5'000'000));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "velox/dwio/dwrf/writer/Writer.h"
#include "velox/exec/AggregateCompanionSignatures.h"
#include "velox/exec/PlanNodeStats.h"
#include "velox/exec/Spill.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"
#include "velox/exec/tests/utils/TempFilePath.h"
#include "velox/expression/Expr.h"
Expand Down Expand Up @@ -376,12 +377,12 @@ void AggregationTestBase::testAggregationsWithCompanion(

AssertQueryBuilder queryBuilder(builder.planNode(), duckDbQueryRunner_);
queryBuilder.configs(config)
.config(core::QueryConfig::kTestingSpillPct, 100)
.config(core::QueryConfig::kSpillEnabled, true)
.config(core::QueryConfig::kAggregationSpillEnabled, true)
.spillDirectory(spillDirectory->path)
.maxDrivers(4);

exec::TestScopedSpillInjection scopedSpillInjection(100);
auto task = assertResults(queryBuilder);

// Expect > 0 spilled bytes unless there was no input.
Expand Down Expand Up @@ -785,12 +786,12 @@ void AggregationTestBase::testAggregationsImpl(
memory::spillMemoryPool()->stats().peakBytes;
AssertQueryBuilder queryBuilder(builder.planNode(), duckDbQueryRunner_);
queryBuilder.configs(config)
.config(core::QueryConfig::kTestingSpillPct, 100)
.config(core::QueryConfig::kSpillEnabled, true)
.config(core::QueryConfig::kAggregationSpillEnabled, true)
.spillDirectory(spillDirectory->path)
.maxDrivers(4);

exec::TestScopedSpillInjection scopedSpillInjection(100);
auto task = assertResults(queryBuilder);

// Expect > 0 spilled bytes unless there was no input.
Expand Down Expand Up @@ -871,11 +872,12 @@ void AggregationTestBase::testAggregationsImpl(
auto spillDirectory = exec::test::TempDirectoryPath::create();

AssertQueryBuilder queryBuilder(builder.planNode(), duckDbQueryRunner_);
queryBuilder.configs(config).config(core::QueryConfig::kTestingSpillPct, "100")
queryBuilder.configs(config)
.config(core::QueryConfig::kSpillEnabled, "true")
.config(core::QueryConfig::kAggregationSpillEnabled, "true")
.spillDirectory(spillDirectory->path);

TestScopedSpillInjection scopedSpillInjection(100);
auto task = assertResults(queryBuilder);

// Expect > 0 spilled bytes unless there was no input.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,8 +590,8 @@ TEST_F(FirstAggregateTest, spillingAndSorting) {

auto spillDirectory = exec::test::TempDirectoryPath::create();

exec::TestScopedSpillInjection scopedSpillInjection(100);
results = AssertQueryBuilder(plan)
.config(core::QueryConfig::kTestingSpillPct, "100")
.config(core::QueryConfig::kSpillEnabled, "true")
.config(core::QueryConfig::kAggregationSpillEnabled, "true")
.spillDirectory(spillDirectory->path)
Expand Down

0 comments on commit 74cd024

Please sign in to comment.