diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index 4ce96369d4da5..dd98f860fdbb2 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -176,6 +176,16 @@ void movePromisesOut( from.clear(); } +std::string executionModeToString(const Task::ExecutionMode& mode) { + switch (mode) { + case Task::ExecutionMode::kSerial: + return "Serial"; + case Task::ExecutionMode::kParallel: + return "Parallel"; + default: + return "Unknown"; + } +} } // namespace std::string taskStateString(TaskState state) { @@ -231,6 +241,7 @@ std::shared_ptr Task::create( core::PlanFragment planFragment, int destination, std::shared_ptr queryCtx, + ExecutionMode mode, Consumer consumer, std::function onError) { return Task::create( @@ -238,6 +249,7 @@ std::shared_ptr Task::create( std::move(planFragment), destination, std::move(queryCtx), + mode, (consumer ? [c = std::move(consumer)]() { return c; } : ConsumerSupplier{}), std::move(onError)); @@ -248,6 +260,7 @@ std::shared_ptr Task::create( core::PlanFragment planFragment, int destination, std::shared_ptr queryCtx, + ExecutionMode mode, ConsumerSupplier consumerSupplier, std::function onError) { auto task = std::shared_ptr(new Task( @@ -255,6 +268,7 @@ std::shared_ptr Task::create( std::move(planFragment), destination, std::move(queryCtx), + mode, std::move(consumerSupplier), std::move(onError))); task->initTaskPool(); @@ -266,6 +280,7 @@ Task::Task( core::PlanFragment planFragment, int destination, std::shared_ptr queryCtx, + ExecutionMode mode, ConsumerSupplier consumerSupplier, std::function onError) : uuid_{makeUuid()}, @@ -273,6 +288,7 @@ Task::Task( planFragment_(std::move(planFragment)), destination_(destination), queryCtx_(std::move(queryCtx)), + mode_(mode), consumerSupplier_(std::move(consumerSupplier)), onError_(onError), splitsStates_(buildSplitStates(planFragment_.planNode)), @@ -525,6 +541,7 @@ bool Task::supportsSingleThreadedExecution() const { } RowVectorPtr Task::next(ContinueFuture* future) { + checkExecutionMode(ExecutionMode::kSerial); // NOTE: Task::next() is single-threaded execution so locking is not required // to access Task object. VELOX_CHECK_EQ( @@ -645,6 +662,7 @@ void Task::start(uint32_t maxDrivers, uint32_t concurrentSplitGroups) { facebook::velox::process::ThreadDebugInfo threadDebugInfo{ queryCtx()->queryId(), taskId_, nullptr}; facebook::velox::process::ScopedThreadDebugInfo scopedInfo(threadDebugInfo); + checkExecutionMode(ExecutionMode::kParallel); try { VELOX_CHECK_GE( @@ -688,6 +706,16 @@ void Task::start(uint32_t maxDrivers, uint32_t concurrentSplitGroups) { } } +void Task::checkExecutionMode(ExecutionMode mode) { + VELOX_CHECK( + mode == mode_, + fmt::format( + "Inconsistent task execution mode. Cannot execute in '{}' mode " + "for a '{}' mode task", + executionModeToString(mode), + executionModeToString(mode_))); +} + void Task::createDriverFactoriesLocked(uint32_t maxDrivers) { VELOX_CHECK(isRunningLocked()); VELOX_CHECK(driverFactories_.empty()); diff --git a/velox/exec/Task.h b/velox/exec/Task.h index c6aa572334102..da8d7853c5776 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -37,6 +37,14 @@ using ConnectorSplitPreloadFunc = class Task : public std::enable_shared_from_this { public: + /// Threading mode the query uses to execute. 'kSerial' mode executes the + /// query serially on the call thread. 'kUninitialized' mode executes the + /// query parallelly using provided executor. + enum ExecutionMode { + kSerial, + kParallel, + }; + /// Creates a task to execute a plan fragment, but doesn't start execution /// until Task::start() method is called. /// @param taskId Unique task identifier. @@ -48,8 +56,10 @@ class Task : public std::enable_shared_from_this { /// @param queryCtx Query context containing MemoryPool and MemoryAllocator /// instances to use for memory allocations during execution, executor to /// schedule operators on, and session properties. + /// @param mode Execution mode for this task. The task can be executed in + /// Serial and Parallel mode. /// @param consumer Optional factory function to get callbacks to pass the - /// results of the execution. In a multi-threaded execution, results from each + /// results of the execution. In a parallel execution mode, results from each /// thread are passed on to a separate consumer. /// @param onError Optional callback to receive an exception if task /// execution fails. @@ -58,6 +68,7 @@ class Task : public std::enable_shared_from_this { core::PlanFragment planFragment, int destination, std::shared_ptr queryCtx, + ExecutionMode mode, Consumer consumer = nullptr, std::function onError = nullptr); @@ -66,6 +77,7 @@ class Task : public std::enable_shared_from_this { core::PlanFragment planFragment, int destination, std::shared_ptr queryCtx, + ExecutionMode mode, ConsumerSupplier consumerSupplier, std::function onError = nullptr); @@ -646,9 +658,14 @@ class Task : public std::enable_shared_from_this { core::PlanFragment planFragment, int destination, std::shared_ptr queryCtx, + ExecutionMode mode, ConsumerSupplier consumerSupplier, std::function onError = nullptr); + // Consistency check of the task execution to make sure the execution mode + // stays the same. + void checkExecutionMode(ExecutionMode mode); + // Creates driver factories. void createDriverFactoriesLocked(uint32_t maxDrivers); @@ -953,6 +970,10 @@ class Task : public std::enable_shared_from_this { const int destination_; const std::shared_ptr queryCtx_; + // The execution mode of the task. It is enforced that a task can only be + // executed in a single mode throughout its lifetime + const ExecutionMode mode_; + // Root MemoryPool for this Task. All member variables that hold references // to pool_ must be defined after pool_, childPools_. std::shared_ptr pool_; diff --git a/velox/exec/tests/TaskTest.cpp b/velox/exec/tests/TaskTest.cpp index af99c323bc08e..5a3dff5bea70a 100644 --- a/velox/exec/tests/TaskTest.cpp +++ b/velox/exec/tests/TaskTest.cpp @@ -466,7 +466,11 @@ class TaskTest : public HiveConnectorTestBase { const std::unordered_map>& filePaths = {}) { auto task = Task::create( - "single.execution.task.0", plan, 0, std::make_shared()); + "single.execution.task.0", + plan, + 0, + std::make_shared(), + Task::ExecutionMode::kSerial); for (const auto& [nodeId, paths] : filePaths) { for (const auto& path : paths) { @@ -1199,6 +1203,62 @@ TEST_F(TaskTest, outputBufferSize) { task->requestCancel(); } +DEBUG_ONLY_TEST_F(TaskTest, inconsistentExecutionMode) { + { + // Scenario 1: Parallel execution starts first then kicks in Serial + // execution. + + // Let parallel execution pause a bit so that we can call serial API on Task + // to trigger inconsistent execution mode failure. + folly::EventCount getOutputWait; + std::atomic_bool getOutputWaitFlag{false}; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::Values::getOutput", + std::function([&](Values* /*unused*/) { + getOutputWait.await([&]() { return getOutputWaitFlag.load(); }); + })); + auto data = makeRowVector({ + makeFlatVector(1'000, [](auto row) { return row; }), + }); + + CursorParameters params; + params.planNode = + PlanBuilder().values({data, data, data}).project({"c0"}).planNode(); + params.queryCtx = std::make_shared(driverExecutor_.get()); + params.maxDrivers = 4; + + auto cursor = TaskCursor::create(params); + auto* task = cursor->task().get(); + + cursor->start(); + VELOX_ASSERT_THROW( + task->next(), "Inconsistent task execution mode."); + getOutputWaitFlag = true; + getOutputWait.notify(); + while (cursor->hasNext()) { + cursor->moveNext(); + } + } + + { + // Scenario 2: Serial execution starts first then kicks in Parallel + // execution. + + auto data = makeRowVector({ + makeFlatVector(1'000, [](auto row) { return row; }), + }); + auto plan = + PlanBuilder().values({data, data, data}).project({"c0"}).planFragment(); + auto queryCtx = std::make_shared(driverExecutor_.get()); + auto task = Task::create("task.0", plan, 0, queryCtx); + + task->next(); + VELOX_ASSERT_THROW( + task->start(4, 1), "inconsistent with previous execution mode"); + while (task->next() != nullptr) {} + } +} + DEBUG_ONLY_TEST_F(TaskTest, findPeerOperators) { const std::vector probeVectors = {makeRowVector( {"t_c0", "t_c1"},