diff --git a/velox/examples/ScanAndSort.cpp b/velox/examples/ScanAndSort.cpp index ff8bf0e2cc183..83650a959c749 100644 --- a/velox/examples/ScanAndSort.cpp +++ b/velox/examples/ScanAndSort.cpp @@ -130,7 +130,8 @@ int main(int argc, char** argv) { "my_write_task", writerPlanFragment, /*destination=*/0, - std::make_shared(executor.get())); + std::make_shared(executor.get()), + exec::Task::ExecutionMode::kParallel); // next() starts execution using the client thread. The loop pumps output // vectors out of the task (there are none in this query fragment). @@ -159,7 +160,8 @@ int main(int argc, char** argv) { "my_read_task", readPlanFragment, /*destination=*/0, - std::make_shared(executor.get())); + std::make_shared(executor.get()), + exec::Task::ExecutionMode::kParallel); // Now that we have the query fragment and Task structure set up, we will // add data to it via `splits`. diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index 4ce96369d4da5..a42e524417ae9 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -175,9 +175,23 @@ void movePromisesOut( } from.clear(); } - } // namespace +std::string executionModeString(Task::ExecutionMode mode) { + switch (mode) { + case Task::ExecutionMode::kSerial: + return "Serial"; + case Task::ExecutionMode::kParallel: + return "Parallel"; + default: + return fmt::format("Unknown {}", static_cast(mode)); + } +} + +std::ostream& operator<<(std::ostream& out, const Task::ExecutionMode& mode) { + return out << executionModeString(mode); +} + std::string taskStateString(TaskState state) { switch (state) { case TaskState::kRunning: @@ -231,6 +245,7 @@ std::shared_ptr Task::create( core::PlanFragment planFragment, int destination, std::shared_ptr queryCtx, + ExecutionMode mode, Consumer consumer, std::function onError) { return Task::create( @@ -238,6 +253,7 @@ std::shared_ptr Task::create( std::move(planFragment), destination, std::move(queryCtx), + mode, (consumer ? [c = std::move(consumer)]() { return c; } : ConsumerSupplier{}), std::move(onError)); @@ -248,6 +264,7 @@ std::shared_ptr Task::create( core::PlanFragment planFragment, int destination, std::shared_ptr queryCtx, + ExecutionMode mode, ConsumerSupplier consumerSupplier, std::function onError) { auto task = std::shared_ptr(new Task( @@ -255,6 +272,7 @@ std::shared_ptr Task::create( std::move(planFragment), destination, std::move(queryCtx), + mode, std::move(consumerSupplier), std::move(onError))); task->initTaskPool(); @@ -266,6 +284,7 @@ Task::Task( core::PlanFragment planFragment, int destination, std::shared_ptr queryCtx, + ExecutionMode mode, ConsumerSupplier consumerSupplier, std::function onError) : uuid_{makeUuid()}, @@ -273,6 +292,7 @@ Task::Task( planFragment_(std::move(planFragment)), destination_(destination), queryCtx_(std::move(queryCtx)), + mode_(mode), consumerSupplier_(std::move(consumerSupplier)), onError_(onError), splitsStates_(buildSplitStates(planFragment_.planNode)), @@ -525,6 +545,7 @@ bool Task::supportsSingleThreadedExecution() const { } RowVectorPtr Task::next(ContinueFuture* future) { + checkExecutionMode(ExecutionMode::kSerial); // NOTE: Task::next() is single-threaded execution so locking is not required // to access Task object. VELOX_CHECK_EQ( @@ -645,6 +666,7 @@ void Task::start(uint32_t maxDrivers, uint32_t concurrentSplitGroups) { facebook::velox::process::ThreadDebugInfo threadDebugInfo{ queryCtx()->queryId(), taskId_, nullptr}; facebook::velox::process::ScopedThreadDebugInfo scopedInfo(threadDebugInfo); + checkExecutionMode(ExecutionMode::kParallel); try { VELOX_CHECK_GE( @@ -688,6 +710,16 @@ void Task::start(uint32_t maxDrivers, uint32_t concurrentSplitGroups) { } } +void Task::checkExecutionMode(ExecutionMode mode) { + VELOX_CHECK_EQ( + mode, + mode_, + "Inconsistent task execution mode. Cannot execute in '{}' mode " + "for a '{}' mode task", + mode, + mode_) +} + void Task::createDriverFactoriesLocked(uint32_t maxDrivers) { VELOX_CHECK(isRunningLocked()); VELOX_CHECK(driverFactories_.empty()); diff --git a/velox/exec/Task.h b/velox/exec/Task.h index c6aa572334102..9a2b3d8c9af58 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -37,6 +37,16 @@ using ConnectorSplitPreloadFunc = class Task : public std::enable_shared_from_this { public: + /// Threading mode the task is executed. + enum class ExecutionMode { + /// Mode that executes the query serially (single-threaded) on the calling + /// thread. + kSerial, + /// Mode that executes the query in parallel (multi-threaded) using provided + /// executor. + kParallel, + }; + /// Creates a task to execute a plan fragment, but doesn't start execution /// until Task::start() method is called. /// @param taskId Unique task identifier. @@ -48,8 +58,10 @@ class Task : public std::enable_shared_from_this { /// @param queryCtx Query context containing MemoryPool and MemoryAllocator /// instances to use for memory allocations during execution, executor to /// schedule operators on, and session properties. + /// @param mode Execution mode for this task. The task can be executed in + /// Serial and Parallel mode. /// @param consumer Optional factory function to get callbacks to pass the - /// results of the execution. In a multi-threaded execution, results from each + /// results of the execution. In a parallel execution mode, results from each /// thread are passed on to a separate consumer. /// @param onError Optional callback to receive an exception if task /// execution fails. @@ -58,6 +70,7 @@ class Task : public std::enable_shared_from_this { core::PlanFragment planFragment, int destination, std::shared_ptr queryCtx, + ExecutionMode mode, Consumer consumer = nullptr, std::function onError = nullptr); @@ -66,6 +79,7 @@ class Task : public std::enable_shared_from_this { core::PlanFragment planFragment, int destination, std::shared_ptr queryCtx, + ExecutionMode mode, ConsumerSupplier consumerSupplier, std::function onError = nullptr); @@ -646,9 +660,14 @@ class Task : public std::enable_shared_from_this { core::PlanFragment planFragment, int destination, std::shared_ptr queryCtx, + ExecutionMode mode, ConsumerSupplier consumerSupplier, std::function onError = nullptr); + // Consistency check of the task execution to make sure the execution mode + // stays the same. + void checkExecutionMode(ExecutionMode mode); + // Creates driver factories. void createDriverFactoriesLocked(uint32_t maxDrivers); @@ -953,6 +972,10 @@ class Task : public std::enable_shared_from_this { const int destination_; const std::shared_ptr queryCtx_; + // The execution mode of the task. It is enforced that a task can only be + // executed in a single mode throughout its lifetime + const ExecutionMode mode_; + // Root MemoryPool for this Task. All member variables that hold references // to pool_ must be defined after pool_, childPools_. std::shared_ptr pool_; @@ -1135,4 +1158,19 @@ bool registerTaskListener(std::shared_ptr listener); /// unregistered successfuly, false if listener was not found. bool unregisterTaskListener(const std::shared_ptr& listener); +std::string executionModeString(Task::ExecutionMode mode); + +std::ostream& operator<<(std::ostream& out, const Task::ExecutionMode& mode); + } // namespace facebook::velox::exec + +template <> +struct fmt::formatter + : formatter { + auto format( + facebook::velox::exec::Task::ExecutionMode m, + format_context& ctx) { + return formatter::format( + facebook::velox::exec::executionModeString(m), ctx); + } +}; diff --git a/velox/exec/benchmarks/ExchangeBenchmark.cpp b/velox/exec/benchmarks/ExchangeBenchmark.cpp index 1108d4613c986..d0bf7f5d903af 100644 --- a/velox/exec/benchmarks/ExchangeBenchmark.cpp +++ b/velox/exec/benchmarks/ExchangeBenchmark.cpp @@ -299,6 +299,7 @@ class ExchangeBenchmark : public VectorTestBase { std::move(planFragment), destination, std::move(queryCtx), + Task::ExecutionMode::kParallel, std::move(consumer)); } diff --git a/velox/exec/tests/DriverTest.cpp b/velox/exec/tests/DriverTest.cpp index 10b06d8f9562c..6ef5edd1261af 100644 --- a/velox/exec/tests/DriverTest.cpp +++ b/velox/exec/tests/DriverTest.cpp @@ -266,6 +266,7 @@ class DriverTest : public OperatorTestBase { plan, 0, std::make_shared(driverExecutor_.get()), + Task::ExecutionMode::kParallel, [](RowVectorPtr /*unused*/, ContinueFuture* /*unused*/) { return exec::BlockingReason::kNotBlocked; }); @@ -1418,6 +1419,7 @@ DEBUG_ONLY_TEST_F(DriverTest, driverCpuTimeSlicingCheck) { 0, std::make_shared( driverExecutor_.get(), std::move(queryConfig)), + Task::ExecutionMode::kParallel, [](RowVectorPtr /*unused*/, ContinueFuture* /*unused*/) { return exec::BlockingReason::kNotBlocked; }); @@ -1452,6 +1454,7 @@ TEST_F(OpCallStatusTest, basic) { 0, std::make_shared( driverExecutor_.get(), std::move(queryConfig)), + Task::ExecutionMode::kParallel, [](RowVectorPtr /*unused*/, ContinueFuture* /*unused*/) { return exec::BlockingReason::kNotBlocked; }); diff --git a/velox/exec/tests/ExchangeClientTest.cpp b/velox/exec/tests/ExchangeClientTest.cpp index a00bcb0fa3b52..68021897b8ae6 100644 --- a/velox/exec/tests/ExchangeClientTest.cpp +++ b/velox/exec/tests/ExchangeClientTest.cpp @@ -64,7 +64,11 @@ class ExchangeClientTest : public testing::Test, queryCtx->testingOverrideMemoryPool( memory::memoryManager()->addRootPool(queryCtx->queryId())); return Task::create( - taskId, core::PlanFragment{planNode}, 0, std::move(queryCtx)); + taskId, + core::PlanFragment{planNode}, + 0, + std::move(queryCtx), + Task::ExecutionMode::kParallel); } int32_t enqueue( diff --git a/velox/exec/tests/ExchangeFuzzer.cpp b/velox/exec/tests/ExchangeFuzzer.cpp index c6076778a3273..4adff83e0b7d0 100644 --- a/velox/exec/tests/ExchangeFuzzer.cpp +++ b/velox/exec/tests/ExchangeFuzzer.cpp @@ -494,6 +494,7 @@ class ExchangeFuzzer : public VectorTestBase { std::move(planFragment), destination, std::move(queryCtx), + Task::ExecutionMode::kParallel, std::move(consumer)); } diff --git a/velox/exec/tests/GroupedExecutionTest.cpp b/velox/exec/tests/GroupedExecutionTest.cpp index 19c503d928390..3dc0811d7918b 100644 --- a/velox/exec/tests/GroupedExecutionTest.cpp +++ b/velox/exec/tests/GroupedExecutionTest.cpp @@ -120,7 +120,12 @@ TEST_F(GroupedExecutionTest, groupedExecutionErrors) { planFragment.groupedExecutionLeafNodeIds.clear(); planFragment.groupedExecutionLeafNodeIds.emplace(tableScanNodeId); queryCtx = std::make_shared(executor_.get()); - task = exec::Task::create("0", planFragment, 0, std::move(queryCtx)); + task = exec::Task::create( + "0", + planFragment, + 0, + std::move(queryCtx), + Task::ExecutionMode::kParallel); VELOX_ASSERT_THROW( task->start(3, 1), "groupedExecutionLeafNodeIds must be empty in ungrouped execution mode"); @@ -129,7 +134,12 @@ TEST_F(GroupedExecutionTest, groupedExecutionErrors) { planFragment.executionStrategy = core::ExecutionStrategy::kGrouped; planFragment.groupedExecutionLeafNodeIds.clear(); queryCtx = std::make_shared(executor_.get()); - task = exec::Task::create("0", planFragment, 0, std::move(queryCtx)); + task = exec::Task::create( + "0", + planFragment, + 0, + std::move(queryCtx), + Task::ExecutionMode::kParallel); VELOX_ASSERT_THROW( task->start(3, 1), "groupedExecutionLeafNodeIds must not be empty in " @@ -140,7 +150,12 @@ TEST_F(GroupedExecutionTest, groupedExecutionErrors) { planFragment.groupedExecutionLeafNodeIds.clear(); planFragment.groupedExecutionLeafNodeIds.emplace(projectNodeId); queryCtx = std::make_shared(executor_.get()); - task = exec::Task::create("0", planFragment, 0, std::move(queryCtx)); + task = exec::Task::create( + "0", + planFragment, + 0, + std::move(queryCtx), + Task::ExecutionMode::kParallel); VELOX_ASSERT_THROW( task->start(3, 1), fmt::format( @@ -153,7 +168,12 @@ TEST_F(GroupedExecutionTest, groupedExecutionErrors) { planFragment.groupedExecutionLeafNodeIds.emplace(tableScanNodeId); planFragment.groupedExecutionLeafNodeIds.emplace(projectNodeId); queryCtx = std::make_shared(executor_.get()); - task = exec::Task::create("0", planFragment, 0, std::move(queryCtx)); + task = exec::Task::create( + "0", + planFragment, + 0, + std::move(queryCtx), + Task::ExecutionMode::kParallel); VELOX_ASSERT_THROW( task->start(3, 1), fmt::format( @@ -166,7 +186,12 @@ TEST_F(GroupedExecutionTest, groupedExecutionErrors) { planFragment.groupedExecutionLeafNodeIds.clear(); planFragment.groupedExecutionLeafNodeIds.emplace(localPartitionNodeId); queryCtx = std::make_shared(executor_.get()); - task = exec::Task::create("0", planFragment, 0, std::move(queryCtx)); + task = exec::Task::create( + "0", + planFragment, + 0, + std::move(queryCtx), + Task::ExecutionMode::kParallel); VELOX_ASSERT_THROW( task->start(3, 1), fmt::format( @@ -202,8 +227,12 @@ TEST_F(GroupedExecutionTest, groupedExecutionWithOutputBuffer) { planFragment.groupedExecutionLeafNodeIds.emplace(tableScanNodeId); planFragment.numSplitGroups = 10; auto queryCtx = std::make_shared(executor_.get()); - auto task = - exec::Task::create("0", std::move(planFragment), 0, std::move(queryCtx)); + auto task = exec::Task::create( + "0", + std::move(planFragment), + 0, + std::move(queryCtx), + Task::ExecutionMode::kParallel); // 3 drivers max and 1 concurrent split group. task->start(3, 1); @@ -397,7 +426,11 @@ DEBUG_ONLY_TEST_F( })); auto task = exec::Task::create( - "0", std::move(planFragment), 0, std::move(queryCtx)); + "0", + std::move(planFragment), + 0, + std::move(queryCtx), + Task::ExecutionMode::kParallel); const auto spillDirectory = exec::test::TempDirectoryPath::create(); if (testData.enableSpill) { task->setSpillDirectory(spillDirectory->getPath()); @@ -513,7 +546,11 @@ TEST_F(GroupedExecutionTest, groupedExecutionWithHashAndNestedLoopJoin) { planFragment.numSplitGroups = 10; auto queryCtx = std::make_shared(executor_.get()); auto task = exec::Task::create( - "0", std::move(planFragment), 0, std::move(queryCtx)); + "0", + std::move(planFragment), + 0, + std::move(queryCtx), + Task::ExecutionMode::kParallel); // 3 drivers max and 1 concurrent split group. task->start(3, 1); diff --git a/velox/exec/tests/MemoryReclaimerTest.cpp b/velox/exec/tests/MemoryReclaimerTest.cpp index 8c288fcc0fab3..3c2b78ed6283f 100644 --- a/velox/exec/tests/MemoryReclaimerTest.cpp +++ b/velox/exec/tests/MemoryReclaimerTest.cpp @@ -44,7 +44,8 @@ class MemoryReclaimerTest : public OperatorTestBase { "MemoryReclaimerTest", std::move(fakePlanFragment), 0, - std::make_shared()); + std::make_shared(), + Task::ExecutionMode::kParallel); } void SetUp() override {} diff --git a/velox/exec/tests/MultiFragmentTest.cpp b/velox/exec/tests/MultiFragmentTest.cpp index 8465e1417d27e..ba061d4cf250f 100644 --- a/velox/exec/tests/MultiFragmentTest.cpp +++ b/velox/exec/tests/MultiFragmentTest.cpp @@ -86,6 +86,7 @@ class MultiFragmentTest : public HiveConnectorTestBase { std::move(planFragment), destination, std::move(queryCtx), + Task::ExecutionMode::kParallel, std::move(consumer)); } diff --git a/velox/exec/tests/OperatorUtilsTest.cpp b/velox/exec/tests/OperatorUtilsTest.cpp index c0abaa5f22263..af3021058fbd8 100644 --- a/velox/exec/tests/OperatorUtilsTest.cpp +++ b/velox/exec/tests/OperatorUtilsTest.cpp @@ -46,7 +46,8 @@ class OperatorUtilsTest : public OperatorTestBase { "SpillOperatorGroupTest_task", std::move(planFragment), 0, - std::make_shared()); + std::make_shared(), + Task::ExecutionMode::kParallel); driver_ = Driver::testingCreate(); driverCtx_ = std::make_unique(task_, 0, 0, 0, 0); driverCtx_->driver = driver_.get(); diff --git a/velox/exec/tests/OutputBufferManagerTest.cpp b/velox/exec/tests/OutputBufferManagerTest.cpp index 97053d5bf072f..527e2d50bab3f 100644 --- a/velox/exec/tests/OutputBufferManagerTest.cpp +++ b/velox/exec/tests/OutputBufferManagerTest.cpp @@ -74,8 +74,12 @@ class OutputBufferManagerTest : public testing::Test { auto queryCtx = std::make_shared( executor_.get(), core::QueryConfig(std::move(configSettings))); - auto task = - Task::create(taskId, std::move(planFragment), 0, std::move(queryCtx)); + auto task = Task::create( + taskId, + std::move(planFragment), + 0, + std::move(queryCtx), + Task::ExecutionMode::kParallel); bufferManager_->initializeTask(task, kind, numDestinations, numDrivers); return task; diff --git a/velox/exec/tests/PartitionedOutputTest.cpp b/velox/exec/tests/PartitionedOutputTest.cpp index 3d2ca6f834f81..1adb55bf4e71c 100644 --- a/velox/exec/tests/PartitionedOutputTest.cpp +++ b/velox/exec/tests/PartitionedOutputTest.cpp @@ -116,7 +116,8 @@ TEST_F(PartitionedOutputTest, flush) { 0, createQueryContext( {{core::QueryConfig::kMaxPartitionedOutputBufferSize, - std::to_string(PartitionedOutput::kMinDestinationSize * 2)}})); + std::to_string(PartitionedOutput::kMinDestinationSize * 2)}}), + Task::ExecutionMode::kParallel); task->start(1); const auto partition0 = getAllData(taskId, 0); diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index 09f856e20072c..f97f4fdbaed39 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -1425,6 +1425,7 @@ DEBUG_ONLY_TEST_F(TableScanTest, tableScanSplitsAndWeights) { core::PlanFragment{leafPlan}, 0, std::move(queryCtx), + Task::ExecutionMode::kParallel, std::move(consumer)); leafTask->start(4); diff --git a/velox/exec/tests/TaskTest.cpp b/velox/exec/tests/TaskTest.cpp index af99c323bc08e..06ad2c0d7b656 100644 --- a/velox/exec/tests/TaskTest.cpp +++ b/velox/exec/tests/TaskTest.cpp @@ -466,7 +466,11 @@ class TaskTest : public HiveConnectorTestBase { const std::unordered_map>& filePaths = {}) { auto task = Task::create( - "single.execution.task.0", plan, 0, std::make_shared()); + "single.execution.task.0", + plan, + 0, + std::make_shared(), + Task::ExecutionMode::kSerial); for (const auto& [nodeId, paths] : filePaths) { for (const auto& path : paths) { @@ -514,7 +518,8 @@ TEST_F(TaskTest, toJson) { "task-1", std::move(plan), 0, - std::make_shared(driverExecutor_.get())); + std::make_shared(driverExecutor_.get()), + Task::ExecutionMode::kParallel); ASSERT_EQ( task->toString(), "{Task task-1 (task-1)Plan: -- Project\n\n drivers:\n"); @@ -554,7 +559,8 @@ TEST_F(TaskTest, wrongPlanNodeForSplit) { "task-1", std::move(plan), 0, - std::make_shared(driverExecutor_.get())); + std::make_shared(driverExecutor_.get()), + Task::ExecutionMode::kParallel); // Add split for the source node. task->addSplit("0", exec::Split(folly::copy(connectorSplit))); @@ -609,7 +615,8 @@ TEST_F(TaskTest, wrongPlanNodeForSplit) { "task-2", std::move(plan), 0, - std::make_shared(driverExecutor_.get())); + std::make_shared(driverExecutor_.get()), + Task::ExecutionMode::kParallel); errorMessage = "Splits can be associated only with leaf plan nodes which require splits. Plan node ID 0 doesn't refer to such plan node."; VELOX_ASSERT_THROW( @@ -635,7 +642,8 @@ TEST_F(TaskTest, duplicatePlanNodeIds) { "task-1", std::move(plan), 0, - std::make_shared(driverExecutor_.get())), + std::make_shared(driverExecutor_.get()), + Task::ExecutionMode::kParallel), "Plan node IDs must be unique. Found duplicate ID: 0.") } @@ -893,7 +901,11 @@ TEST_F(TaskTest, singleThreadedExecutionExternalBlockable) { // First pass, we don't activate the external blocker, expect the task to run // without being blocked. auto nonBlockingTask = Task::create( - "single.execution.task.0", plan, 0, std::make_shared()); + "single.execution.task.0", + plan, + 0, + std::make_shared(), + Task::ExecutionMode::kSerial); std::vector results; for (;;) { auto result = nonBlockingTask->next(&continueFuture); @@ -909,7 +921,11 @@ TEST_F(TaskTest, singleThreadedExecutionExternalBlockable) { continueFuture = ContinueFuture::makeEmpty(); // Second pass, we will now use external blockers to block the task. auto blockingTask = Task::create( - "single.execution.task.1", plan, 0, std::make_shared()); + "single.execution.task.1", + plan, + 0, + std::make_shared(), + Task::ExecutionMode::kSerial); // Before we block, we expect `next` to get data normally. results.push_back(blockingTask->next(&continueFuture)); EXPECT_TRUE(results.back() != nullptr); @@ -941,7 +957,11 @@ TEST_F(TaskTest, supportsSingleThreadedExecution) { .partitionedOutput({}, 1, std::vector{"p0"}) .planFragment(); auto task = Task::create( - "single.execution.task.0", plan, 0, std::make_shared()); + "single.execution.task.0", + plan, + 0, + std::make_shared(), + Task::ExecutionMode::kSerial); // PartitionedOutput does not support single threaded execution, therefore the // task doesn't support it either. @@ -957,7 +977,11 @@ TEST_F(TaskTest, updateBroadCastOutputBuffers) { auto bufferManager = OutputBufferManager::getInstance().lock(); { auto task = Task::create( - "t0", plan, 0, std::make_shared(driverExecutor_.get())); + "t0", + plan, + 0, + std::make_shared(driverExecutor_.get()), + Task::ExecutionMode::kParallel); task->start(1, 1); @@ -971,7 +995,11 @@ TEST_F(TaskTest, updateBroadCastOutputBuffers) { { auto task = Task::create( - "t1", plan, 0, std::make_shared(driverExecutor_.get())); + "t1", + plan, + 0, + std::make_shared(driverExecutor_.get()), + Task::ExecutionMode::kParallel); task->start(1, 1); @@ -1199,6 +1227,62 @@ TEST_F(TaskTest, outputBufferSize) { task->requestCancel(); } +DEBUG_ONLY_TEST_F(TaskTest, inconsistentExecutionMode) { + { + // Scenario 1: Parallel execution starts first then kicks in Serial + // execution. + + // Let parallel execution pause a bit so that we can call serial API on Task + // to trigger inconsistent execution mode failure. + folly::EventCount getOutputWait; + std::atomic_bool getOutputWaitFlag{false}; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::Values::getOutput", + std::function([&](Values* /*unused*/) { + getOutputWait.await([&]() { return getOutputWaitFlag.load(); }); + })); + auto data = makeRowVector({ + makeFlatVector(1'000, [](auto row) { return row; }), + }); + + CursorParameters params; + params.planNode = + PlanBuilder().values({data, data, data}).project({"c0"}).planNode(); + params.queryCtx = std::make_shared(driverExecutor_.get()); + params.maxDrivers = 4; + + auto cursor = TaskCursor::create(params); + auto* task = cursor->task().get(); + + cursor->start(); + VELOX_ASSERT_THROW(task->next(), "Inconsistent task execution mode."); + getOutputWaitFlag = true; + getOutputWait.notify(); + while (cursor->hasNext()) { + cursor->moveNext(); + } + } + + { + // Scenario 2: Serial execution starts first then kicks in Parallel + // execution. + + auto data = makeRowVector({ + makeFlatVector(1'000, [](auto row) { return row; }), + }); + auto plan = + PlanBuilder().values({data, data, data}).project({"c0"}).planFragment(); + auto queryCtx = std::make_shared(driverExecutor_.get()); + auto task = + Task::create("task.0", plan, 0, queryCtx, Task::ExecutionMode::kSerial); + + task->next(); + VELOX_ASSERT_THROW(task->start(4, 1), "Inconsistent task execution mode."); + while (task->next() != nullptr) { + } + } +} + DEBUG_ONLY_TEST_F(TaskTest, findPeerOperators) { const std::vector probeVectors = {makeRowVector( {"t_c0", "t_c1"}, @@ -1462,12 +1546,14 @@ TEST_F(TaskTest, driverCreationMemoryAllocationCheck) { "driverCreationMemoryAllocationCheck", plan, 0, - std::make_shared()); + std::make_shared(), + singleThreadExecution ? Task::ExecutionMode::kSerial + : Task::ExecutionMode::kParallel); if (singleThreadExecution) { + VELOX_ASSERT_THROW(badTask->next(), "Unexpected memory pool allocations"); + } else { VELOX_ASSERT_THROW( badTask->start(1), "Unexpected memory pool allocations"); - } else { - VELOX_ASSERT_THROW(badTask->next(), "Unexpected memory pool allocations"); } } } @@ -1599,7 +1685,8 @@ DEBUG_ONLY_TEST_F(TaskTest, resumeAfterTaskFinish) { "task", std::move(plan), 0, - std::make_shared(driverExecutor_.get())); + std::make_shared(driverExecutor_.get()), + Task::ExecutionMode::kParallel); task->start(4, 1); // Request pause and then unblock operators to proceed. @@ -1631,7 +1718,8 @@ DEBUG_ONLY_TEST_F( auto queryCtx = std::make_shared(driverExecutor_.get()); - auto blockingTask = Task::create("blocking.task.0", plan, 0, queryCtx); + auto blockingTask = Task::create( + "blocking.task.0", plan, 0, queryCtx, Task::ExecutionMode::kSerial); // Before we block, we expect `next` to get data normally. EXPECT_NE(nullptr, blockingTask->next()); @@ -1705,7 +1793,8 @@ DEBUG_ONLY_TEST_F(TaskTest, longRunningOperatorInTaskReclaimerAbort) { auto queryCtx = std::make_shared(driverExecutor_.get()); - auto blockingTask = Task::create("blocking.task.0", plan, 0, queryCtx); + auto blockingTask = Task::create( + "blocking.task.0", plan, 0, queryCtx, Task::ExecutionMode::kParallel); blockingTask->start(4, 1); const std::string abortErrorMessage("Synthetic Exception"); @@ -1766,7 +1855,12 @@ DEBUG_ONLY_TEST_F(TaskTest, taskReclaimStats) { nullptr, std::move(queryPool), nullptr); - auto task = Task::create("task", std::move(plan), 0, std::move(queryCtx)); + auto task = Task::create( + "task", + std::move(plan), + 0, + std::move(queryCtx), + Task::ExecutionMode::kParallel); task->start(4, 1); const int numReclaims{10}; @@ -1832,7 +1926,12 @@ DEBUG_ONLY_TEST_F(TaskTest, taskPauseTime) { nullptr, std::move(queryPool), nullptr); - auto task = Task::create("task", std::move(plan), 0, std::move(queryCtx)); + auto task = Task::create( + "task", + std::move(plan), + 0, + std::move(queryCtx), + Task::ExecutionMode::kParallel); task->start(4, 1); // Wait for the task driver starts to run. @@ -1880,7 +1979,8 @@ TEST_F(TaskTest, updateStatsWhileCloseOffThreadDriver) { "task", std::move(plan), 0, - std::make_shared(driverExecutor_.get())); + std::make_shared(driverExecutor_.get()), + Task::ExecutionMode::kParallel); task->start(4, 1); std::this_thread::sleep_for(std::chrono::milliseconds(100)); task->testingVisitDrivers( @@ -1924,7 +2024,8 @@ DEBUG_ONLY_TEST_F(TaskTest, driverEnqueAfterFailedAndPausedTask) { "task", std::move(plan), 0, - std::make_shared(driverExecutor_.get())); + std::make_shared(driverExecutor_.get()), + Task::ExecutionMode::kParallel); task->start(4, 1); // Request pause. diff --git a/velox/exec/tests/ThreadDebugInfoTest.cpp b/velox/exec/tests/ThreadDebugInfoTest.cpp index b3696a0fb448e..d5cbb15c1d343 100644 --- a/velox/exec/tests/ThreadDebugInfoTest.cpp +++ b/velox/exec/tests/ThreadDebugInfoTest.cpp @@ -99,7 +99,11 @@ DEBUG_ONLY_TEST_F(ThreadDebugInfoDeathTest, withinTheCallingThread) { nullptr, "TaskCursorQuery_0"); auto task = exec::Task::create( - "single.execution.task.0", std::move(plan), 0, queryCtx); + "single.execution.task.0", + std::move(plan), + 0, + queryCtx, + exec::Task::ExecutionMode::kSerial); #if IS_BUILDING_WITH_ASAN() == 0 ASSERT_DEATH( diff --git a/velox/exec/tests/utils/Cursor.cpp b/velox/exec/tests/utils/Cursor.cpp index a49a27ddfb0dc..9ad81ba8844a7 100644 --- a/velox/exec/tests/utils/Cursor.cpp +++ b/velox/exec/tests/utils/Cursor.cpp @@ -226,6 +226,7 @@ class MultiThreadedTaskCursor : public TaskCursorBase { std::move(planFragment_), params.destination, std::move(queryCtx_), + Task::ExecutionMode::kParallel, // consumer [queue, copyResult = params.copyResult]( const RowVectorPtr& vector, velox::ContinueFuture* future) { @@ -329,7 +330,8 @@ class SingleThreadedTaskCursor : public TaskCursorBase { taskId_, std::move(planFragment_), params.destination, - std::move(queryCtx_)); + std::move(queryCtx_), + Task::ExecutionMode::kSerial); if (!taskSpillDirectory_.empty()) { task_->setSpillDirectory(taskSpillDirectory_); diff --git a/velox/functions/prestosql/aggregates/benchmarks/ReduceAgg.cpp b/velox/functions/prestosql/aggregates/benchmarks/ReduceAgg.cpp index 249145f343a36..87cb0cc46d7d1 100644 --- a/velox/functions/prestosql/aggregates/benchmarks/ReduceAgg.cpp +++ b/velox/functions/prestosql/aggregates/benchmarks/ReduceAgg.cpp @@ -189,7 +189,8 @@ class ReduceAggBenchmark : public HiveConnectorTestBase { "t", std::move(plan), 0, - std::make_shared(executor_.get())); + std::make_shared(executor_.get()), + exec::Task::ExecutionMode::kParallel); task->addSplit( "0", exec::Split(makeHiveConnectorSplit(filePath_->getPath()))); diff --git a/velox/functions/prestosql/aggregates/benchmarks/SimpleAggregates.cpp b/velox/functions/prestosql/aggregates/benchmarks/SimpleAggregates.cpp index 36388cea051bf..81a4821b2477b 100644 --- a/velox/functions/prestosql/aggregates/benchmarks/SimpleAggregates.cpp +++ b/velox/functions/prestosql/aggregates/benchmarks/SimpleAggregates.cpp @@ -165,7 +165,8 @@ class SimpleAggregatesBenchmark : public HiveConnectorTestBase { "t", std::move(plan), 0, - std::make_shared(executor_.get())); + std::make_shared(executor_.get()), + exec::Task::ExecutionMode::kParallel); } private: diff --git a/velox/functions/prestosql/aggregates/benchmarks/TwoStringKeys.cpp b/velox/functions/prestosql/aggregates/benchmarks/TwoStringKeys.cpp index 83e6bbe7a4e8b..7ce96450c4434 100644 --- a/velox/functions/prestosql/aggregates/benchmarks/TwoStringKeys.cpp +++ b/velox/functions/prestosql/aggregates/benchmarks/TwoStringKeys.cpp @@ -113,7 +113,8 @@ class TwoStringKeysBenchmark : public HiveConnectorTestBase { "t", std::move(plan), 0, - std::make_shared(executor_.get())); + std::make_shared(executor_.get()), + exec::Task::ExecutionMode::kParallel); task->addSplit( "0", exec::Split(makeHiveConnectorSplit((filePath_->getPath()))));