Skip to content

Commit

Permalink
Add execution mode enforcement
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Apr 16, 2024
1 parent d796cfc commit 7158bbb
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 2 deletions.
28 changes: 28 additions & 0 deletions velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,16 @@ void movePromisesOut(
from.clear();
}

std::string executionModeToString(const Task::ExecutionMode& mode) {
switch (mode) {
case Task::ExecutionMode::kSerial:
return "Serial";
case Task::ExecutionMode::kParallel:
return "Parallel";
default:
return "Unknown";
}
}
} // namespace

std::string taskStateString(TaskState state) {
Expand Down Expand Up @@ -231,13 +241,15 @@ std::shared_ptr<Task> Task::create(
core::PlanFragment planFragment,
int destination,
std::shared_ptr<core::QueryCtx> queryCtx,
ExecutionMode mode,
Consumer consumer,
std::function<void(std::exception_ptr)> onError) {
return Task::create(
taskId,
std::move(planFragment),
destination,
std::move(queryCtx),
mode,
(consumer ? [c = std::move(consumer)]() { return c; }
: ConsumerSupplier{}),
std::move(onError));
Expand All @@ -248,13 +260,15 @@ std::shared_ptr<Task> Task::create(
core::PlanFragment planFragment,
int destination,
std::shared_ptr<core::QueryCtx> queryCtx,
ExecutionMode mode,
ConsumerSupplier consumerSupplier,
std::function<void(std::exception_ptr)> onError) {
auto task = std::shared_ptr<Task>(new Task(
taskId,
std::move(planFragment),
destination,
std::move(queryCtx),
mode,
std::move(consumerSupplier),
std::move(onError)));
task->initTaskPool();
Expand All @@ -266,13 +280,15 @@ Task::Task(
core::PlanFragment planFragment,
int destination,
std::shared_ptr<core::QueryCtx> queryCtx,
ExecutionMode mode,
ConsumerSupplier consumerSupplier,
std::function<void(std::exception_ptr)> onError)
: uuid_{makeUuid()},
taskId_(taskId),
planFragment_(std::move(planFragment)),
destination_(destination),
queryCtx_(std::move(queryCtx)),
mode_(mode),
consumerSupplier_(std::move(consumerSupplier)),
onError_(onError),
splitsStates_(buildSplitStates(planFragment_.planNode)),
Expand Down Expand Up @@ -525,6 +541,7 @@ bool Task::supportsSingleThreadedExecution() const {
}

RowVectorPtr Task::next(ContinueFuture* future) {
checkExecutionMode(ExecutionMode::kSerial);
// NOTE: Task::next() is single-threaded execution so locking is not required
// to access Task object.
VELOX_CHECK_EQ(
Expand Down Expand Up @@ -645,6 +662,7 @@ void Task::start(uint32_t maxDrivers, uint32_t concurrentSplitGroups) {
facebook::velox::process::ThreadDebugInfo threadDebugInfo{
queryCtx()->queryId(), taskId_, nullptr};
facebook::velox::process::ScopedThreadDebugInfo scopedInfo(threadDebugInfo);
checkExecutionMode(ExecutionMode::kParallel);

try {
VELOX_CHECK_GE(
Expand Down Expand Up @@ -688,6 +706,16 @@ void Task::start(uint32_t maxDrivers, uint32_t concurrentSplitGroups) {
}
}

void Task::checkExecutionMode(ExecutionMode mode) {
VELOX_CHECK(
mode == mode_,
fmt::format(
"Inconsistent task execution mode. Cannot execute in '{}' mode "
"for a '{}' mode task",
executionModeToString(mode),
executionModeToString(mode_)));
}

void Task::createDriverFactoriesLocked(uint32_t maxDrivers) {
VELOX_CHECK(isRunningLocked());
VELOX_CHECK(driverFactories_.empty());
Expand Down
23 changes: 22 additions & 1 deletion velox/exec/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ using ConnectorSplitPreloadFunc =

class Task : public std::enable_shared_from_this<Task> {
public:
/// Threading mode the query uses to execute. 'kSerial' mode executes the
/// query serially on the call thread. 'kUninitialized' mode executes the
/// query parallelly using provided executor.
enum ExecutionMode {
kSerial,
kParallel,
};

/// Creates a task to execute a plan fragment, but doesn't start execution
/// until Task::start() method is called.
/// @param taskId Unique task identifier.
Expand All @@ -48,8 +56,10 @@ class Task : public std::enable_shared_from_this<Task> {
/// @param queryCtx Query context containing MemoryPool and MemoryAllocator
/// instances to use for memory allocations during execution, executor to
/// schedule operators on, and session properties.
/// @param mode Execution mode for this task. The task can be executed in
/// Serial and Parallel mode.
/// @param consumer Optional factory function to get callbacks to pass the
/// results of the execution. In a multi-threaded execution, results from each
/// results of the execution. In a parallel execution mode, results from each
/// thread are passed on to a separate consumer.
/// @param onError Optional callback to receive an exception if task
/// execution fails.
Expand All @@ -58,6 +68,7 @@ class Task : public std::enable_shared_from_this<Task> {
core::PlanFragment planFragment,
int destination,
std::shared_ptr<core::QueryCtx> queryCtx,
ExecutionMode mode,
Consumer consumer = nullptr,
std::function<void(std::exception_ptr)> onError = nullptr);

Expand All @@ -66,6 +77,7 @@ class Task : public std::enable_shared_from_this<Task> {
core::PlanFragment planFragment,
int destination,
std::shared_ptr<core::QueryCtx> queryCtx,
ExecutionMode mode,
ConsumerSupplier consumerSupplier,
std::function<void(std::exception_ptr)> onError = nullptr);

Expand Down Expand Up @@ -646,9 +658,14 @@ class Task : public std::enable_shared_from_this<Task> {
core::PlanFragment planFragment,
int destination,
std::shared_ptr<core::QueryCtx> queryCtx,
ExecutionMode mode,
ConsumerSupplier consumerSupplier,
std::function<void(std::exception_ptr)> onError = nullptr);

// Consistency check of the task execution to make sure the execution mode
// stays the same.
void checkExecutionMode(ExecutionMode mode);

// Creates driver factories.
void createDriverFactoriesLocked(uint32_t maxDrivers);

Expand Down Expand Up @@ -953,6 +970,10 @@ class Task : public std::enable_shared_from_this<Task> {
const int destination_;
const std::shared_ptr<core::QueryCtx> queryCtx_;

// The execution mode of the task. It is enforced that a task can only be
// executed in a single mode throughout its lifetime
const ExecutionMode mode_;

// Root MemoryPool for this Task. All member variables that hold references
// to pool_ must be defined after pool_, childPools_.
std::shared_ptr<memory::MemoryPool> pool_;
Expand Down
62 changes: 61 additions & 1 deletion velox/exec/tests/TaskTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,11 @@ class TaskTest : public HiveConnectorTestBase {
const std::unordered_map<std::string, std::vector<std::string>>&
filePaths = {}) {
auto task = Task::create(
"single.execution.task.0", plan, 0, std::make_shared<core::QueryCtx>());
"single.execution.task.0",
plan,
0,
std::make_shared<core::QueryCtx>(),
Task::ExecutionMode::kSerial);

for (const auto& [nodeId, paths] : filePaths) {
for (const auto& path : paths) {
Expand Down Expand Up @@ -1199,6 +1203,62 @@ TEST_F(TaskTest, outputBufferSize) {
task->requestCancel();
}

DEBUG_ONLY_TEST_F(TaskTest, inconsistentExecutionMode) {
{
// Scenario 1: Parallel execution starts first then kicks in Serial
// execution.

// Let parallel execution pause a bit so that we can call serial API on Task
// to trigger inconsistent execution mode failure.
folly::EventCount getOutputWait;
std::atomic_bool getOutputWaitFlag{false};
SCOPED_TESTVALUE_SET(
"facebook::velox::exec::Values::getOutput",
std::function<void(Values*)>([&](Values* /*unused*/) {
getOutputWait.await([&]() { return getOutputWaitFlag.load(); });
}));
auto data = makeRowVector({
makeFlatVector<int64_t>(1'000, [](auto row) { return row; }),
});

CursorParameters params;
params.planNode =
PlanBuilder().values({data, data, data}).project({"c0"}).planNode();
params.queryCtx = std::make_shared<core::QueryCtx>(driverExecutor_.get());
params.maxDrivers = 4;

auto cursor = TaskCursor::create(params);
auto* task = cursor->task().get();

cursor->start();
VELOX_ASSERT_THROW(
task->next(), "Inconsistent task execution mode.");
getOutputWaitFlag = true;
getOutputWait.notify();
while (cursor->hasNext()) {
cursor->moveNext();
}
}

{
// Scenario 2: Serial execution starts first then kicks in Parallel
// execution.

auto data = makeRowVector({
makeFlatVector<int64_t>(1'000, [](auto row) { return row; }),
});
auto plan =
PlanBuilder().values({data, data, data}).project({"c0"}).planFragment();
auto queryCtx = std::make_shared<core::QueryCtx>(driverExecutor_.get());
auto task = Task::create("task.0", plan, 0, queryCtx);

task->next();
VELOX_ASSERT_THROW(
task->start(4, 1), "inconsistent with previous execution mode");
while (task->next() != nullptr) {}
}
}

DEBUG_ONLY_TEST_F(TaskTest, findPeerOperators) {
const std::vector<RowVectorPtr> probeVectors = {makeRowVector(
{"t_c0", "t_c1"},
Expand Down

0 comments on commit 7158bbb

Please sign in to comment.