Skip to content

Commit

Permalink
Add execution mode enforcement
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Apr 15, 2024
1 parent d796cfc commit 2d9ca9c
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 0 deletions.
44 changes: 44 additions & 0 deletions velox/core/QueryCtx.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ namespace facebook::velox::core {

class QueryCtx {
public:
/// Threading mode the query uses to execute. 'kSerial' mode executes the
/// query serially on the call thread. 'kUninitialized' mode executes the
/// query parallelly using provided executor.
enum ExecutionMode {
kSerial,
kParallel,
kUninitialized,
};

/// QueryCtx is used in different places. When used with `Task::start()`, it's
/// required that the caller supplies the executor and ensure its lifetime
/// outlives the tasks that use it. In contrast, when used in expression
Expand Down Expand Up @@ -133,6 +142,20 @@ class QueryCtx {
/// exceeds the max spill bytes limit.
void updateSpilledBytesAndCheckLimit(uint64_t bytes);

void checkExecutionMode(ExecutionMode mode) {
std::lock_guard l(modeMu_);
if (executionMode_ == ExecutionMode::kUninitialized) {
executionMode_ = mode;
return;
}
VELOX_CHECK(
mode == executionMode_,
fmt::format(
"Execution mode {} is inconsistent with previous execution mode {}",
executionModeToString(mode),
executionModeToString(executionMode_)));
}

private:
static Config* getEmptyConfig() {
static const std::unique_ptr<Config> kEmptyConfig =
Expand All @@ -149,11 +172,32 @@ class QueryCtx {
}
}

std::string executionModeToString(const ExecutionMode& mode) const {
switch (mode) {
case ExecutionMode::kSerial:
return "Serial";
case ExecutionMode::kParallel:
return "Parallel";
case ExecutionMode::kUninitialized:
return "Uninitialized";
default:
return "Unknown";
}
}

const std::string queryId_;
folly::Executor* const executor_{nullptr};
folly::Executor* const spillExecutor_{nullptr};
cache::AsyncDataCache* const cache_;

// The execution mode of the query. The value is not set until execution
// starts. The first call to start the query sets this value depending on the
// API used from 'Task' or 'Driver'. If serial API is used, 'executionMode_'
// will be set 'kSerial', otherwise 'kParallel'. The mode is then enforced
// throughout the query's lifetime.
ExecutionMode executionMode_{
ExecutionMode::kUninitialized};
std::mutex modeMu_;
std::unordered_map<std::string, std::shared_ptr<Config>>
connectorSessionProperties_;
std::shared_ptr<memory::MemoryPool> pool_;
Expand Down
6 changes: 6 additions & 0 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ void Driver::pushdownFilters(int operatorIndex) {
}

RowVectorPtr Driver::next(std::shared_ptr<BlockingState>& blockingState) {
checkExecutionMode(core::QueryCtx::ExecutionMode::kSerial);
enqueueInternal();
auto self = shared_from_this();
facebook::velox::process::ScopedThreadDebugInfo scopedInfo(
Expand Down Expand Up @@ -752,6 +753,7 @@ void Driver::recordYieldCount() {

// static
void Driver::run(std::shared_ptr<Driver> self) {
self->checkExecutionMode(core::QueryCtx::ExecutionMode::kParallel);
process::TraceContext trace("Driver::run");
facebook::velox::process::ScopedThreadDebugInfo scopedInfo(
self->driverCtx()->threadDebugInfo);
Expand Down Expand Up @@ -854,6 +856,10 @@ void Driver::close() {
Task::removeDriver(ctx_->task, this);
}

void Driver::checkExecutionMode(core::QueryCtx::ExecutionMode mode) {
task()->queryCtx()->checkExecutionMode(mode);
}

void Driver::closeByTask() {
VELOX_CHECK(isOnThread());
VELOX_CHECK(isTerminated());
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/Driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,10 @@ class Driver : public std::enable_shared_from_this<Driver> {

void close();

// Consistency check of the driver execution to make sure the execution mode
// stays the same.
void checkExecutionMode(core::QueryCtx::ExecutionMode mode);

// Push down dynamic filters produced by the operator at the specified
// position in the pipeline.
void pushdownFilters(int operatorIndex);
Expand Down
6 changes: 6 additions & 0 deletions velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@ bool Task::supportsSingleThreadedExecution() const {
}

RowVectorPtr Task::next(ContinueFuture* future) {
checkExecutionMode(core::QueryCtx::ExecutionMode::kSerial);
// NOTE: Task::next() is single-threaded execution so locking is not required
// to access Task object.
VELOX_CHECK_EQ(
Expand Down Expand Up @@ -645,6 +646,7 @@ void Task::start(uint32_t maxDrivers, uint32_t concurrentSplitGroups) {
facebook::velox::process::ThreadDebugInfo threadDebugInfo{
queryCtx()->queryId(), taskId_, nullptr};
facebook::velox::process::ScopedThreadDebugInfo scopedInfo(threadDebugInfo);
checkExecutionMode(core::QueryCtx::ExecutionMode::kParallel);

try {
VELOX_CHECK_GE(
Expand Down Expand Up @@ -688,6 +690,10 @@ void Task::start(uint32_t maxDrivers, uint32_t concurrentSplitGroups) {
}
}

void Task::checkExecutionMode(core::QueryCtx::ExecutionMode mode) {
queryCtx_->checkExecutionMode(mode);
}

void Task::createDriverFactoriesLocked(uint32_t maxDrivers) {
VELOX_CHECK(isRunningLocked());
VELOX_CHECK(driverFactories_.empty());
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,10 @@ class Task : public std::enable_shared_from_this<Task> {
ConsumerSupplier consumerSupplier,
std::function<void(std::exception_ptr)> onError = nullptr);

// Consistency check of the task execution to make sure the execution mode
// stays the same.
void checkExecutionMode(core::QueryCtx::ExecutionMode mode);

// Creates driver factories.
void createDriverFactoriesLocked(uint32_t maxDrivers);

Expand Down
56 changes: 56 additions & 0 deletions velox/exec/tests/TaskTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1199,6 +1199,62 @@ TEST_F(TaskTest, outputBufferSize) {
task->requestCancel();
}

DEBUG_ONLY_TEST_F(TaskTest, inconsistentExecutionMode) {
{
// Scenario 1: Parallel execution starts first then kicks in Serial
// execution.

// Let parallel execution pause a bit so that we can call serial API on Task
// to trigger inconsistent execution mode failure.
folly::EventCount getOutputWait;
std::atomic_bool getOutputWaitFlag{false};
SCOPED_TESTVALUE_SET(
"facebook::velox::exec::Values::getOutput",
std::function<void(Values*)>([&](Values* /*unused*/) {
getOutputWait.await([&]() { return getOutputWaitFlag.load(); });
}));
auto data = makeRowVector({
makeFlatVector<int64_t>(1'000, [](auto row) { return row; }),
});

CursorParameters params;
params.planNode =
PlanBuilder().values({data, data, data}).project({"c0"}).planNode();
params.queryCtx = std::make_shared<core::QueryCtx>(driverExecutor_.get());
params.maxDrivers = 4;

auto cursor = TaskCursor::create(params);
auto* task = cursor->task().get();

cursor->start();
VELOX_ASSERT_THROW(
task->next(), "inconsistent with previous execution mode");
getOutputWaitFlag = true;
getOutputWait.notify();
while (cursor->hasNext()) {
cursor->moveNext();
}
}

{
// Scenario 2: Serial execution starts first then kicks in Parallel
// execution.

auto data = makeRowVector({
makeFlatVector<int64_t>(1'000, [](auto row) { return row; }),
});
auto plan =
PlanBuilder().values({data, data, data}).project({"c0"}).planFragment();
auto queryCtx = std::make_shared<core::QueryCtx>(driverExecutor_.get());
auto task = Task::create("task.0", plan, 0, queryCtx);

task->next();
VELOX_ASSERT_THROW(
task->start(4, 1), "inconsistent with previous execution mode");
while (task->next() != nullptr) {}
}
}

DEBUG_ONLY_TEST_F(TaskTest, findPeerOperators) {
const std::vector<RowVectorPtr> probeVectors = {makeRowVector(
{"t_c0", "t_c1"},
Expand Down

0 comments on commit 2d9ca9c

Please sign in to comment.