Skip to content

Commit

Permalink
Add execution mode enforcement
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Apr 17, 2024
1 parent d796cfc commit 0bd26bb
Show file tree
Hide file tree
Showing 20 changed files with 281 additions and 44 deletions.
6 changes: 4 additions & 2 deletions velox/examples/ScanAndSort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ int main(int argc, char** argv) {
"my_write_task",
writerPlanFragment,
/*destination=*/0,
std::make_shared<core::QueryCtx>(executor.get()));
std::make_shared<core::QueryCtx>(executor.get()),
exec::Task::ExecutionMode::kParallel);

// next() starts execution using the client thread. The loop pumps output
// vectors out of the task (there are none in this query fragment).
Expand Down Expand Up @@ -159,7 +160,8 @@ int main(int argc, char** argv) {
"my_read_task",
readPlanFragment,
/*destination=*/0,
std::make_shared<core::QueryCtx>(executor.get()));
std::make_shared<core::QueryCtx>(executor.get()),
exec::Task::ExecutionMode::kParallel);

// Now that we have the query fragment and Task structure set up, we will
// add data to it via `splits`.
Expand Down
34 changes: 33 additions & 1 deletion velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,23 @@ void movePromisesOut(
}
from.clear();
}

} // namespace

std::string executionModeString(Task::ExecutionMode mode) {
switch (mode) {
case Task::ExecutionMode::kSerial:
return "Serial";
case Task::ExecutionMode::kParallel:
return "Parallel";
default:
return fmt::format("Unknown {}", static_cast<int>(mode));
}
}

std::ostream& operator<<(std::ostream& out, const Task::ExecutionMode& mode) {
return out << executionModeString(mode);
}

std::string taskStateString(TaskState state) {
switch (state) {
case TaskState::kRunning:
Expand Down Expand Up @@ -231,13 +245,15 @@ std::shared_ptr<Task> Task::create(
core::PlanFragment planFragment,
int destination,
std::shared_ptr<core::QueryCtx> queryCtx,
ExecutionMode mode,
Consumer consumer,
std::function<void(std::exception_ptr)> onError) {
return Task::create(
taskId,
std::move(planFragment),
destination,
std::move(queryCtx),
mode,
(consumer ? [c = std::move(consumer)]() { return c; }
: ConsumerSupplier{}),
std::move(onError));
Expand All @@ -248,13 +264,15 @@ std::shared_ptr<Task> Task::create(
core::PlanFragment planFragment,
int destination,
std::shared_ptr<core::QueryCtx> queryCtx,
ExecutionMode mode,
ConsumerSupplier consumerSupplier,
std::function<void(std::exception_ptr)> onError) {
auto task = std::shared_ptr<Task>(new Task(
taskId,
std::move(planFragment),
destination,
std::move(queryCtx),
mode,
std::move(consumerSupplier),
std::move(onError)));
task->initTaskPool();
Expand All @@ -266,13 +284,15 @@ Task::Task(
core::PlanFragment planFragment,
int destination,
std::shared_ptr<core::QueryCtx> queryCtx,
ExecutionMode mode,
ConsumerSupplier consumerSupplier,
std::function<void(std::exception_ptr)> onError)
: uuid_{makeUuid()},
taskId_(taskId),
planFragment_(std::move(planFragment)),
destination_(destination),
queryCtx_(std::move(queryCtx)),
mode_(mode),
consumerSupplier_(std::move(consumerSupplier)),
onError_(onError),
splitsStates_(buildSplitStates(planFragment_.planNode)),
Expand Down Expand Up @@ -525,6 +545,7 @@ bool Task::supportsSingleThreadedExecution() const {
}

RowVectorPtr Task::next(ContinueFuture* future) {
checkExecutionMode(ExecutionMode::kSerial);
// NOTE: Task::next() is single-threaded execution so locking is not required
// to access Task object.
VELOX_CHECK_EQ(
Expand Down Expand Up @@ -645,6 +666,7 @@ void Task::start(uint32_t maxDrivers, uint32_t concurrentSplitGroups) {
facebook::velox::process::ThreadDebugInfo threadDebugInfo{
queryCtx()->queryId(), taskId_, nullptr};
facebook::velox::process::ScopedThreadDebugInfo scopedInfo(threadDebugInfo);
checkExecutionMode(ExecutionMode::kParallel);

try {
VELOX_CHECK_GE(
Expand Down Expand Up @@ -688,6 +710,16 @@ void Task::start(uint32_t maxDrivers, uint32_t concurrentSplitGroups) {
}
}

void Task::checkExecutionMode(ExecutionMode mode) {
VELOX_CHECK_EQ(
mode,
mode_,
"Inconsistent task execution mode. Cannot execute in '{}' mode "
"for a '{}' mode task",
mode,
mode_)
}

void Task::createDriverFactoriesLocked(uint32_t maxDrivers) {
VELOX_CHECK(isRunningLocked());
VELOX_CHECK(driverFactories_.empty());
Expand Down
40 changes: 39 additions & 1 deletion velox/exec/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ using ConnectorSplitPreloadFunc =

class Task : public std::enable_shared_from_this<Task> {
public:
/// Threading mode the task is executed.
enum class ExecutionMode {
/// Mode that executes the query serially (single-threaded) on the calling
/// thread.
kSerial,
/// Mode that executes the query in parallel (multi-threaded) using provided
/// executor.
kParallel,
};

/// Creates a task to execute a plan fragment, but doesn't start execution
/// until Task::start() method is called.
/// @param taskId Unique task identifier.
Expand All @@ -48,8 +58,10 @@ class Task : public std::enable_shared_from_this<Task> {
/// @param queryCtx Query context containing MemoryPool and MemoryAllocator
/// instances to use for memory allocations during execution, executor to
/// schedule operators on, and session properties.
/// @param mode Execution mode for this task. The task can be executed in
/// Serial and Parallel mode.
/// @param consumer Optional factory function to get callbacks to pass the
/// results of the execution. In a multi-threaded execution, results from each
/// results of the execution. In a parallel execution mode, results from each
/// thread are passed on to a separate consumer.
/// @param onError Optional callback to receive an exception if task
/// execution fails.
Expand All @@ -58,6 +70,7 @@ class Task : public std::enable_shared_from_this<Task> {
core::PlanFragment planFragment,
int destination,
std::shared_ptr<core::QueryCtx> queryCtx,
ExecutionMode mode,
Consumer consumer = nullptr,
std::function<void(std::exception_ptr)> onError = nullptr);

Expand All @@ -66,6 +79,7 @@ class Task : public std::enable_shared_from_this<Task> {
core::PlanFragment planFragment,
int destination,
std::shared_ptr<core::QueryCtx> queryCtx,
ExecutionMode mode,
ConsumerSupplier consumerSupplier,
std::function<void(std::exception_ptr)> onError = nullptr);

Expand Down Expand Up @@ -646,9 +660,14 @@ class Task : public std::enable_shared_from_this<Task> {
core::PlanFragment planFragment,
int destination,
std::shared_ptr<core::QueryCtx> queryCtx,
ExecutionMode mode,
ConsumerSupplier consumerSupplier,
std::function<void(std::exception_ptr)> onError = nullptr);

// Consistency check of the task execution to make sure the execution mode
// stays the same.
void checkExecutionMode(ExecutionMode mode);

// Creates driver factories.
void createDriverFactoriesLocked(uint32_t maxDrivers);

Expand Down Expand Up @@ -953,6 +972,10 @@ class Task : public std::enable_shared_from_this<Task> {
const int destination_;
const std::shared_ptr<core::QueryCtx> queryCtx_;

// The execution mode of the task. It is enforced that a task can only be
// executed in a single mode throughout its lifetime
const ExecutionMode mode_;

// Root MemoryPool for this Task. All member variables that hold references
// to pool_ must be defined after pool_, childPools_.
std::shared_ptr<memory::MemoryPool> pool_;
Expand Down Expand Up @@ -1135,4 +1158,19 @@ bool registerTaskListener(std::shared_ptr<TaskListener> listener);
/// unregistered successfuly, false if listener was not found.
bool unregisterTaskListener(const std::shared_ptr<TaskListener>& listener);

std::string executionModeString(Task::ExecutionMode mode);

std::ostream& operator<<(std::ostream& out, const Task::ExecutionMode& mode);

} // namespace facebook::velox::exec

template <>
struct fmt::formatter<facebook::velox::exec::Task::ExecutionMode>
: formatter<std::string> {
auto format(
facebook::velox::exec::Task::ExecutionMode m,
format_context& ctx) {
return formatter<std::string>::format(
facebook::velox::exec::executionModeString(m), ctx);
}
};
1 change: 1 addition & 0 deletions velox/exec/benchmarks/ExchangeBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ class ExchangeBenchmark : public VectorTestBase {
std::move(planFragment),
destination,
std::move(queryCtx),
Task::ExecutionMode::kParallel,
std::move(consumer));
}

Expand Down
3 changes: 3 additions & 0 deletions velox/exec/tests/DriverTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ class DriverTest : public OperatorTestBase {
plan,
0,
std::make_shared<core::QueryCtx>(driverExecutor_.get()),
Task::ExecutionMode::kParallel,
[](RowVectorPtr /*unused*/, ContinueFuture* /*unused*/) {
return exec::BlockingReason::kNotBlocked;
});
Expand Down Expand Up @@ -1418,6 +1419,7 @@ DEBUG_ONLY_TEST_F(DriverTest, driverCpuTimeSlicingCheck) {
0,
std::make_shared<core::QueryCtx>(
driverExecutor_.get(), std::move(queryConfig)),
Task::ExecutionMode::kParallel,
[](RowVectorPtr /*unused*/, ContinueFuture* /*unused*/) {
return exec::BlockingReason::kNotBlocked;
});
Expand Down Expand Up @@ -1452,6 +1454,7 @@ TEST_F(OpCallStatusTest, basic) {
0,
std::make_shared<core::QueryCtx>(
driverExecutor_.get(), std::move(queryConfig)),
Task::ExecutionMode::kParallel,
[](RowVectorPtr /*unused*/, ContinueFuture* /*unused*/) {
return exec::BlockingReason::kNotBlocked;
});
Expand Down
6 changes: 5 additions & 1 deletion velox/exec/tests/ExchangeClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ class ExchangeClientTest : public testing::Test,
queryCtx->testingOverrideMemoryPool(
memory::memoryManager()->addRootPool(queryCtx->queryId()));
return Task::create(
taskId, core::PlanFragment{planNode}, 0, std::move(queryCtx));
taskId,
core::PlanFragment{planNode},
0,
std::move(queryCtx),
Task::ExecutionMode::kParallel);
}

int32_t enqueue(
Expand Down
1 change: 1 addition & 0 deletions velox/exec/tests/ExchangeFuzzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@ class ExchangeFuzzer : public VectorTestBase {
std::move(planFragment),
destination,
std::move(queryCtx),
Task::ExecutionMode::kParallel,
std::move(consumer));
}

Expand Down
55 changes: 46 additions & 9 deletions velox/exec/tests/GroupedExecutionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,12 @@ TEST_F(GroupedExecutionTest, groupedExecutionErrors) {
planFragment.groupedExecutionLeafNodeIds.clear();
planFragment.groupedExecutionLeafNodeIds.emplace(tableScanNodeId);
queryCtx = std::make_shared<core::QueryCtx>(executor_.get());
task = exec::Task::create("0", planFragment, 0, std::move(queryCtx));
task = exec::Task::create(
"0",
planFragment,
0,
std::move(queryCtx),
Task::ExecutionMode::kParallel);
VELOX_ASSERT_THROW(
task->start(3, 1),
"groupedExecutionLeafNodeIds must be empty in ungrouped execution mode");
Expand All @@ -129,7 +134,12 @@ TEST_F(GroupedExecutionTest, groupedExecutionErrors) {
planFragment.executionStrategy = core::ExecutionStrategy::kGrouped;
planFragment.groupedExecutionLeafNodeIds.clear();
queryCtx = std::make_shared<core::QueryCtx>(executor_.get());
task = exec::Task::create("0", planFragment, 0, std::move(queryCtx));
task = exec::Task::create(
"0",
planFragment,
0,
std::move(queryCtx),
Task::ExecutionMode::kParallel);
VELOX_ASSERT_THROW(
task->start(3, 1),
"groupedExecutionLeafNodeIds must not be empty in "
Expand All @@ -140,7 +150,12 @@ TEST_F(GroupedExecutionTest, groupedExecutionErrors) {
planFragment.groupedExecutionLeafNodeIds.clear();
planFragment.groupedExecutionLeafNodeIds.emplace(projectNodeId);
queryCtx = std::make_shared<core::QueryCtx>(executor_.get());
task = exec::Task::create("0", planFragment, 0, std::move(queryCtx));
task = exec::Task::create(
"0",
planFragment,
0,
std::move(queryCtx),
Task::ExecutionMode::kParallel);
VELOX_ASSERT_THROW(
task->start(3, 1),
fmt::format(
Expand All @@ -153,7 +168,12 @@ TEST_F(GroupedExecutionTest, groupedExecutionErrors) {
planFragment.groupedExecutionLeafNodeIds.emplace(tableScanNodeId);
planFragment.groupedExecutionLeafNodeIds.emplace(projectNodeId);
queryCtx = std::make_shared<core::QueryCtx>(executor_.get());
task = exec::Task::create("0", planFragment, 0, std::move(queryCtx));
task = exec::Task::create(
"0",
planFragment,
0,
std::move(queryCtx),
Task::ExecutionMode::kParallel);
VELOX_ASSERT_THROW(
task->start(3, 1),
fmt::format(
Expand All @@ -166,7 +186,12 @@ TEST_F(GroupedExecutionTest, groupedExecutionErrors) {
planFragment.groupedExecutionLeafNodeIds.clear();
planFragment.groupedExecutionLeafNodeIds.emplace(localPartitionNodeId);
queryCtx = std::make_shared<core::QueryCtx>(executor_.get());
task = exec::Task::create("0", planFragment, 0, std::move(queryCtx));
task = exec::Task::create(
"0",
planFragment,
0,
std::move(queryCtx),
Task::ExecutionMode::kParallel);
VELOX_ASSERT_THROW(
task->start(3, 1),
fmt::format(
Expand Down Expand Up @@ -202,8 +227,12 @@ TEST_F(GroupedExecutionTest, groupedExecutionWithOutputBuffer) {
planFragment.groupedExecutionLeafNodeIds.emplace(tableScanNodeId);
planFragment.numSplitGroups = 10;
auto queryCtx = std::make_shared<core::QueryCtx>(executor_.get());
auto task =
exec::Task::create("0", std::move(planFragment), 0, std::move(queryCtx));
auto task = exec::Task::create(
"0",
std::move(planFragment),
0,
std::move(queryCtx),
Task::ExecutionMode::kParallel);
// 3 drivers max and 1 concurrent split group.
task->start(3, 1);

Expand Down Expand Up @@ -397,7 +426,11 @@ DEBUG_ONLY_TEST_F(
}));

auto task = exec::Task::create(
"0", std::move(planFragment), 0, std::move(queryCtx));
"0",
std::move(planFragment),
0,
std::move(queryCtx),
Task::ExecutionMode::kParallel);
const auto spillDirectory = exec::test::TempDirectoryPath::create();
if (testData.enableSpill) {
task->setSpillDirectory(spillDirectory->getPath());
Expand Down Expand Up @@ -513,7 +546,11 @@ TEST_F(GroupedExecutionTest, groupedExecutionWithHashAndNestedLoopJoin) {
planFragment.numSplitGroups = 10;
auto queryCtx = std::make_shared<core::QueryCtx>(executor_.get());
auto task = exec::Task::create(
"0", std::move(planFragment), 0, std::move(queryCtx));
"0",
std::move(planFragment),
0,
std::move(queryCtx),
Task::ExecutionMode::kParallel);
// 3 drivers max and 1 concurrent split group.
task->start(3, 1);

Expand Down
3 changes: 2 additions & 1 deletion velox/exec/tests/MemoryReclaimerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class MemoryReclaimerTest : public OperatorTestBase {
"MemoryReclaimerTest",
std::move(fakePlanFragment),
0,
std::make_shared<core::QueryCtx>());
std::make_shared<core::QueryCtx>(),
Task::ExecutionMode::kParallel);
}

void SetUp() override {}
Expand Down
1 change: 1 addition & 0 deletions velox/exec/tests/MultiFragmentTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ class MultiFragmentTest : public HiveConnectorTestBase {
std::move(planFragment),
destination,
std::move(queryCtx),
Task::ExecutionMode::kParallel,
std::move(consumer));
}

Expand Down
Loading

0 comments on commit 0bd26bb

Please sign in to comment.