Skip to content

Commit

Permalink
Driver should have slicing diabled in serial mode
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed May 12, 2024
1 parent 2c98308 commit 4c3610c
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 20 deletions.
2 changes: 1 addition & 1 deletion velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ void Driver::init(
std::vector<std::unique_ptr<Operator>> operators) {
VELOX_CHECK_NULL(ctx_);
ctx_ = std::move(ctx);
cpuSliceMs_ = ctx_->queryConfig().driverCpuTimeSliceLimitMs();
cpuSliceMs_ = task()->driverCpuTimeSliceLimitMs();
VELOX_CHECK(operators_.empty());
operators_ = std::move(operators);
curOperatorId_ = operators_.size() - 1;
Expand Down
6 changes: 6 additions & 0 deletions velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,12 @@ void Task::removeSpillDirectoryIfExists() {
}
}

uint64_t Task::driverCpuTimeSliceLimitMs() const {
return mode_ == Task::ExecutionMode::kSerial
? 0
: queryCtx_->queryConfig().driverCpuTimeSliceLimitMs();
}

void Task::initTaskPool() {
VELOX_CHECK_NULL(pool_);
pool_ = queryCtx_->pool()->addAggregateChild(
Expand Down
10 changes: 7 additions & 3 deletions velox/exec/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ class Task : public std::enable_shared_from_this<Task> {
ConsumerSupplier consumerSupplier,
std::function<void(std::exception_ptr)> onError = nullptr);

/// Convenience function for shortening a Presto taskId. To be used
/// in debugging messages and listings.
static std::string shortId(const std::string& id);

~Task();

/// Specify directory to which data will be spilled if spilling is enabled and
Expand Down Expand Up @@ -116,9 +120,9 @@ class Task : public std::enable_shared_from_this<Task> {
return destination_;
}

// Convenience function for shortening a Presto taskId. To be used
// in debugging messages and listings.
static std::string shortId(const std::string& id);
/// Configured cpu slice time limit for drivers. 0 (meaning slicing/yield
/// disabled) when task is under serial mode.
uint64_t driverCpuTimeSliceLimitMs() const;

/// Returns QueryCtx specified in the constructor.
const std::shared_ptr<core::QueryCtx>& queryCtx() const {
Expand Down
59 changes: 43 additions & 16 deletions velox/exec/tests/DriverTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1474,8 +1474,19 @@ DEBUG_ONLY_TEST_F(DriverTest, driverCpuTimeSlicingCheck) {
makeRowVector({"c0"}, {makeFlatVector<int32_t>({1, 2, 3})}));
}

for (const auto& hasCpuTimeSliceLimit : {false, true}) {
SCOPED_TRACE(fmt::format("hasCpuSliceLimit: {}", hasCpuTimeSliceLimit));
struct TestParam {
bool hasCpuTimeSliceLimit;
Task::ExecutionMode executionMode;
};
std::vector<TestParam> testParams{
{true, Task::ExecutionMode::kParallel},
{false, Task::ExecutionMode::kParallel},
{true, Task::ExecutionMode::kSerial},
{false, Task::ExecutionMode::kSerial}};

for (const auto& testParam : testParams) {
SCOPED_TRACE(
fmt::format("hasCpuSliceLimit: {}", testParam.hasCpuTimeSliceLimit));
SCOPED_TESTVALUE_SET(
"facebook::velox::exec::Values::getOutput",
std::function<void(const exec::Values*)>([&](const exec::Values*
Expand All @@ -1485,7 +1496,7 @@ DEBUG_ONLY_TEST_F(DriverTest, driverCpuTimeSlicingCheck) {
ASSERT_NE(
values->testingOperatorCtx()->driver()->state().startExecTimeMs,
0);
if (hasCpuTimeSliceLimit) {
if (testParam.hasCpuTimeSliceLimit) {
std::this_thread::sleep_for(std::chrono::seconds(1)); // NOLINT
ASSERT_GT(
values->testingOperatorCtx()->driver()->state().execTimeMs(),
Expand All @@ -1496,23 +1507,39 @@ DEBUG_ONLY_TEST_F(DriverTest, driverCpuTimeSlicingCheck) {
auto fragment =
PlanBuilder(planNodeIdGenerator).values(batches).planFragment();
std::unordered_map<std::string, std::string> queryConfig;
if (hasCpuTimeSliceLimit) {
if (testParam.hasCpuTimeSliceLimit) {
queryConfig.emplace(core::QueryConfig::kDriverCpuTimeSliceLimitMs, "500");
}
const uint64_t oldYieldCount = Driver::yieldCount();
auto task = Task::create(
"t0",
fragment,
0,
std::make_shared<core::QueryCtx>(
driverExecutor_.get(), std::move(queryConfig)),
Task::ExecutionMode::kParallel,
[](RowVectorPtr /*unused*/, ContinueFuture* /*unused*/) {
return exec::BlockingReason::kNotBlocked;
});
task->start(1, 1);

std::shared_ptr<Task> task;
if (testParam.executionMode == Task::ExecutionMode::kParallel) {
task = Task::create(
"t0",
fragment,
0,
std::make_shared<core::QueryCtx>(
driverExecutor_.get(), std::move(queryConfig)),
testParam.executionMode,
[](RowVectorPtr /*unused*/, ContinueFuture* /*unused*/) {
return exec::BlockingReason::kNotBlocked;
});
task->start(1, 1);
} else {
task = Task::create(
"t0",
fragment,
0,
std::make_shared<core::QueryCtx>(
driverExecutor_.get(), std::move(queryConfig)),
testParam.executionMode);
while (task->next() != nullptr) {
}
}

ASSERT_TRUE(waitForTaskCompletion(task.get(), 600'000'000));
if (hasCpuTimeSliceLimit) {
if (testParam.hasCpuTimeSliceLimit &&
testParam.executionMode == Task::ExecutionMode::kParallel) {
// NOTE: there is one additional yield for the empty output.
ASSERT_GE(Driver::yieldCount(), oldYieldCount + numBatches + 1);
} else {
Expand Down

0 comments on commit 4c3610c

Please sign in to comment.