diff --git a/velox/core/QueryCtx.h b/velox/core/QueryCtx.h index d27370bcb9ae9..568ba4fda1e32 100644 --- a/velox/core/QueryCtx.h +++ b/velox/core/QueryCtx.h @@ -28,6 +28,15 @@ namespace facebook::velox::core { class QueryCtx { public: + /// Threading mode the query uses to execute. 'kSerial' mode executes the + /// query serially on the call thread. 'kUninitialized' mode executes the + /// query parallelly using provided executor. + enum ExecutionMode { + kSerial, + kParallel, + kUninitialized, + }; + /// QueryCtx is used in different places. When used with `Task::start()`, it's /// required that the caller supplies the executor and ensure its lifetime /// outlives the tasks that use it. In contrast, when used in expression @@ -133,6 +142,20 @@ class QueryCtx { /// exceeds the max spill bytes limit. void updateSpilledBytesAndCheckLimit(uint64_t bytes); + void checkExecutionMode(ExecutionMode mode) { + std::lock_guard l(modeMu_); + if (executionMode_ == ExecutionMode::kUninitialized) { + executionMode_ = mode; + return; + } + VELOX_CHECK( + mode == executionMode_, + fmt::format( + "Execution mode {} is inconsistent with previous execution mode {}", + executionModeToString(mode), + executionModeToString(executionMode_))); + } + private: static Config* getEmptyConfig() { static const std::unique_ptr kEmptyConfig = @@ -149,11 +172,32 @@ class QueryCtx { } } + std::string executionModeToString(const ExecutionMode& mode) const { + switch (mode) { + case ExecutionMode::kSerial: + return "Serial"; + case ExecutionMode::kParallel: + return "Parallel"; + case ExecutionMode::kUninitialized: + return "Uninitialized"; + default: + return "Unknown"; + } + } + const std::string queryId_; folly::Executor* const executor_{nullptr}; folly::Executor* const spillExecutor_{nullptr}; cache::AsyncDataCache* const cache_; + // The execution mode of the query. The value is not set until execution + // starts. The first call to start the query sets this value depending on the + // API used from 'Task' or 'Driver'. If serial API is used, 'executionMode_' + // will be set 'kSerial', otherwise 'kParallel'. The mode is then enforced + // throughout the query's lifetime. + ExecutionMode executionMode_{ + ExecutionMode::kUninitialized}; + std::mutex modeMu_; std::unordered_map> connectorSessionProperties_; std::shared_ptr pool_; diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index a7949bc3d74d0..833e20c1234e2 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -330,6 +330,7 @@ void Driver::pushdownFilters(int operatorIndex) { } RowVectorPtr Driver::next(std::shared_ptr& blockingState) { + checkExecutionMode(core::QueryCtx::ExecutionMode::kSerial); enqueueInternal(); auto self = shared_from_this(); facebook::velox::process::ScopedThreadDebugInfo scopedInfo( @@ -752,6 +753,7 @@ void Driver::recordYieldCount() { // static void Driver::run(std::shared_ptr self) { + self->checkExecutionMode(core::QueryCtx::ExecutionMode::kParallel); process::TraceContext trace("Driver::run"); facebook::velox::process::ScopedThreadDebugInfo scopedInfo( self->driverCtx()->threadDebugInfo); @@ -854,6 +856,10 @@ void Driver::close() { Task::removeDriver(ctx_->task, this); } +void Driver::checkExecutionMode(core::QueryCtx::ExecutionMode mode) { + task()->queryCtx()->checkExecutionMode(mode); +} + void Driver::closeByTask() { VELOX_CHECK(isOnThread()); VELOX_CHECK(isTerminated()); diff --git a/velox/exec/Driver.h b/velox/exec/Driver.h index 9c76ae0f319ec..6747ff29e212d 100644 --- a/velox/exec/Driver.h +++ b/velox/exec/Driver.h @@ -469,6 +469,10 @@ class Driver : public std::enable_shared_from_this { void close(); + // Consistency check of the driver execution to make sure the execution mode + // stays the same. + void checkExecutionMode(core::QueryCtx::ExecutionMode mode); + // Push down dynamic filters produced by the operator at the specified // position in the pipeline. void pushdownFilters(int operatorIndex); diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index 4ce96369d4da5..cbcbd3c476fb7 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -525,6 +525,7 @@ bool Task::supportsSingleThreadedExecution() const { } RowVectorPtr Task::next(ContinueFuture* future) { + checkExecutionMode(core::QueryCtx::ExecutionMode::kSerial); // NOTE: Task::next() is single-threaded execution so locking is not required // to access Task object. VELOX_CHECK_EQ( @@ -645,6 +646,7 @@ void Task::start(uint32_t maxDrivers, uint32_t concurrentSplitGroups) { facebook::velox::process::ThreadDebugInfo threadDebugInfo{ queryCtx()->queryId(), taskId_, nullptr}; facebook::velox::process::ScopedThreadDebugInfo scopedInfo(threadDebugInfo); + checkExecutionMode(core::QueryCtx::ExecutionMode::kParallel); try { VELOX_CHECK_GE( @@ -688,6 +690,10 @@ void Task::start(uint32_t maxDrivers, uint32_t concurrentSplitGroups) { } } +void Task::checkExecutionMode(core::QueryCtx::ExecutionMode mode) { + queryCtx_->checkExecutionMode(mode); +} + void Task::createDriverFactoriesLocked(uint32_t maxDrivers) { VELOX_CHECK(isRunningLocked()); VELOX_CHECK(driverFactories_.empty()); diff --git a/velox/exec/Task.h b/velox/exec/Task.h index c6aa572334102..ed6a1a0ff413f 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -649,6 +649,10 @@ class Task : public std::enable_shared_from_this { ConsumerSupplier consumerSupplier, std::function onError = nullptr); + // Consistency check of the task execution to make sure the execution mode + // stays the same. + void checkExecutionMode(core::QueryCtx::ExecutionMode mode); + // Creates driver factories. void createDriverFactoriesLocked(uint32_t maxDrivers); diff --git a/velox/exec/tests/TaskTest.cpp b/velox/exec/tests/TaskTest.cpp index af99c323bc08e..f783d16bcf978 100644 --- a/velox/exec/tests/TaskTest.cpp +++ b/velox/exec/tests/TaskTest.cpp @@ -1199,6 +1199,62 @@ TEST_F(TaskTest, outputBufferSize) { task->requestCancel(); } +DEBUG_ONLY_TEST_F(TaskTest, inconsistentExecutionMode) { + { + // Scenario 1: Parallel execution starts first then kicks in Serial + // execution. + + // Let parallel execution pause a bit so that we can call serial API on Task + // to trigger inconsistent execution mode failure. + folly::EventCount getOutputWait; + std::atomic_bool getOutputWaitFlag{false}; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::Values::getOutput", + std::function([&](Values* /*unused*/) { + getOutputWait.await([&]() { return getOutputWaitFlag.load(); }); + })); + auto data = makeRowVector({ + makeFlatVector(1'000, [](auto row) { return row; }), + }); + + CursorParameters params; + params.planNode = + PlanBuilder().values({data, data, data}).project({"c0"}).planNode(); + params.queryCtx = std::make_shared(driverExecutor_.get()); + params.maxDrivers = 4; + + auto cursor = TaskCursor::create(params); + auto* task = cursor->task().get(); + + cursor->start(); + VELOX_ASSERT_THROW( + task->next(), "inconsistent with previous execution mode"); + getOutputWaitFlag = true; + getOutputWait.notify(); + while (cursor->hasNext()) { + cursor->moveNext(); + } + } + + { + // Scenario 2: Serial execution starts first then kicks in Parallel + // execution. + + auto data = makeRowVector({ + makeFlatVector(1'000, [](auto row) { return row; }), + }); + auto plan = + PlanBuilder().values({data, data, data}).project({"c0"}).planFragment(); + auto queryCtx = std::make_shared(driverExecutor_.get()); + auto task = Task::create("task.0", plan, 0, queryCtx); + + task->next(); + VELOX_ASSERT_THROW( + task->start(4, 1), "inconsistent with previous execution mode"); + while (task->next() != nullptr) {} + } +} + DEBUG_ONLY_TEST_F(TaskTest, findPeerOperators) { const std::vector probeVectors = {makeRowVector( {"t_c0", "t_c1"},