diff --git a/dali/pipeline/executor/executor2/exec2.cc b/dali/pipeline/executor/executor2/exec2.cc index 9821e086a7a..2f4e71f3624 100644 --- a/dali/pipeline/executor/executor2/exec2.cc +++ b/dali/pipeline/executor/executor2/exec2.cc @@ -48,6 +48,7 @@ void ApplyConcurrencyLimit(ExecGraph &graph, const Executor2::Config &config) { LimitBackendConcurrency(graph, OpType::CPU); break; // other operators have no restrictions case OperatorConcurrency::Backend: + LimitBackendConcurrency(graph, OpType::CPU); LimitBackendConcurrency(graph, OpType::GPU); LimitBackendConcurrency(graph, OpType::MIXED); break; @@ -70,6 +71,9 @@ class Executor2::Impl { public: explicit Impl(const Config &config) : config_(config) { } + ~Impl() { + Shutdown(); + } enum class State { New = 0, @@ -83,7 +87,6 @@ class Executor2::Impl { void Build(const graph::OpGraph &graph) { if (state_ != State::New) throw std::logic_error("Already built."); - std::cerr << "Building " << (void*)this << std::endl; state_ = State::Building; DeviceGuard dg(config_.device.value_or(CPU_ONLY_DEVICE_ID)); graph_.Lower(graph); @@ -154,7 +157,6 @@ class Executor2::Impl { void Shutdown() { if (state_ != State::Running) return; - std::cerr << "Shutting down " << (void*)this << std::endl; state_ = State::ShutdownRequested; if (exec_) exec_->Shutdown(); diff --git a/dali/pipeline/executor/executor2/exec2_test.cc b/dali/pipeline/executor/executor2/exec2_test.cc index cdc9e894bbb..495612bbfcc 100644 --- a/dali/pipeline/executor/executor2/exec2_test.cc +++ b/dali/pipeline/executor/executor2/exec2_test.cc @@ -115,28 +115,30 @@ TEST_P(Exec2Test, Graph1_CPUOnly) { Executor2 exec(config_); graph::OpGraph graph = GetTestGraph1(); exec.Build(graph); - exec.Run(); - exec.Run(); + for (int i = 0; i < 10; i++) { + exec.Run(); + } Workspace ws; - exec.Outputs(&ws); - CheckTestGraph1Results(ws, config_.max_batch_size); - ws.Clear(); - exec.Outputs(&ws); - CheckTestGraph1Results(ws, config_.max_batch_size); + for (int i = 0; i < 10; i++) { + ws.Clear(); + exec.Outputs(&ws); + CheckTestGraph1Results(ws, config_.max_batch_size); + } } TEST_P(Exec2Test, Graph2_CPU2GPU) { Executor2 exec(config_); graph::OpGraph graph = GetTestGraph2(); exec.Build(graph); - exec.Run(); - exec.Run(); + for (int i = 0; i < 10; i++) { + exec.Run(); + } Workspace ws; - exec.Outputs(&ws); - CheckTestGraph2Results(ws, config_.max_batch_size); - ws.Clear(); - exec.Outputs(&ws); - CheckTestGraph2Results(ws, config_.max_batch_size); + for (int i = 0; i < 10; i++) { + ws.Clear(); + exec.Outputs(&ws); + CheckTestGraph2Results(ws, config_.max_batch_size); + } } diff --git a/dali/pipeline/executor/executor2/exec2_test.h b/dali/pipeline/executor/executor2/exec2_test.h index a1e01ade128..ca6efaef69c 100644 --- a/dali/pipeline/executor/executor2/exec2_test.h +++ b/dali/pipeline/executor/executor2/exec2_test.h @@ -45,18 +45,27 @@ class DummyOpCPU : public Operator { void RunImpl(Workspace &ws) override { int N = ws.GetRequestedBatchSize(0); addend_.Acquire(spec_, ws, N); + sample_sums_.resize(N); + auto &tp = ws.GetThreadPool(); for (int s = 0; s < N; s++) { - int sum = *addend_[s].data + s; - for (int i = 0; i < ws.NumInput(); i++) { - sum += *ws.Input(i)[s].data(); - } - *ws.Output(0)[s].mutable_data() = sum; + auto sample_sum = [&, s](int) { + int sum = *addend_[s].data + s; + for (int i = 0; i < ws.NumInput(); i++) { + sum += *ws.Input(i)[s].data(); + } + sample_sums_[s] = sum; + }; + tp.AddWork(sample_sum); } + tp.RunAll(true); + for (int s = 0; s < N; s++) + *ws.Output(0)[s].mutable_data() = sample_sums_[s]; } bool CanInferOutputs() const override { return true; } ArgValue addend_{"addend", spec_}; + std::vector sample_sums_; std::string instance_name_; }; diff --git a/dali/pipeline/executor/executor2/exec_graph.h b/dali/pipeline/executor/executor2/exec_graph.h index b4d2a401361..f654f40a115 100644 --- a/dali/pipeline/executor/executor2/exec_graph.h +++ b/dali/pipeline/executor/executor2/exec_graph.h @@ -129,6 +129,8 @@ class DLL_PUBLIC ExecNode { * from being executed concurrently. */ std::shared_ptr concurrency; + std::atomic_int actual_concurrency{0}; + /** Limits the number of output buffers for the operator. * * If the graph is scheduled multiple times ahead, the operator would produce multiple results, diff --git a/dali/pipeline/executor/executor2/exec_graph_test.cc b/dali/pipeline/executor/executor2/exec_graph_test.cc index 000c5667624..a285edb7dac 100644 --- a/dali/pipeline/executor/executor2/exec_graph_test.cc +++ b/dali/pipeline/executor/executor2/exec_graph_test.cc @@ -24,6 +24,16 @@ namespace dali { namespace exec2 { namespace test { +namespace { +void LimitBackendConcurrency(ExecGraph &graph, OpType backend, int max_concurrency = 1) { + auto sem = std::make_shared(max_concurrency); + for (auto &n : graph.Nodes()) { + if (n.backend == backend) + n.concurrency = sem; + } +} +} // namespace + TEST(ExecGraphTest, SimpleGraph) { int batch_size = 32; OpSpec spec0(kTestOpName); @@ -62,6 +72,7 @@ TEST(ExecGraphTest, SimpleGraph) { g.Link(n0, 0, n2, 0); g.Link(n1, 0, n2, 1); g.Link(n2, 0, no, 0); + LimitBackendConcurrency(g, OpType::CPU); WorkspaceParams params = {}; auto tp = std::make_unique(std::thread::hardware_concurrency(), 0, false, "test"); @@ -114,14 +125,15 @@ TEST(ExecGraphTest, SimpleGraphRepeat) { .AddOutput("op2e0", "cpu") .AddArg("name", "op2"); auto op2 = std::make_unique(spec2); - ExecGraph def; - ExecNode *n2 = def.AddNode(std::move(op2)); - ExecNode *n1 = def.AddNode(std::move(op1)); - ExecNode *n0 = def.AddNode(std::move(op0)); - ExecNode *no = def.AddOutputNode(); - def.Link(n0, 0, n2, 0); - def.Link(n1, 0, n2, 1); - def.Link(n2, 0, no, 0); + ExecGraph g; + ExecNode *n2 = g.AddNode(std::move(op2)); + ExecNode *n1 = g.AddNode(std::move(op1)); + ExecNode *n0 = g.AddNode(std::move(op0)); + ExecNode *no = g.AddOutputNode(); + g.Link(n0, 0, n2, 0); + g.Link(n1, 0, n2, 1); + g.Link(n2, 0, no, 0); + LimitBackendConcurrency(g, OpType::CPU); ThreadPool tp(4, 0, false, "test"); WorkspaceParams params = {}; ExecEnv env; @@ -135,8 +147,8 @@ TEST(ExecGraphTest, SimpleGraphRepeat) { ex.Start(); auto start = dali::test::perf_timer::now(); for (int i = 0; i < N; i++) { - def.PrepareIteration(std::make_shared(), params); - auto fut = def.Launch(ex); + g.PrepareIteration(std::make_shared(), params); + auto fut = g.Launch(ex); auto &pipe_out = fut.Value(); auto &ws = pipe_out.workspace; auto &out = ws.Output(0); @@ -179,14 +191,16 @@ TEST(ExecGraphTest, SimpleGraphScheduleAhead) { .AddOutput("op2e0", "cpu") .AddArg("name", "op2"); auto op2 = std::make_unique(spec2); - ExecGraph def; - ExecNode *n2 = def.AddNode(std::move(op2)); - ExecNode *n1 = def.AddNode(std::move(op1)); - ExecNode *n0 = def.AddNode(std::move(op0)); - ExecNode *no = def.AddOutputNode(); - def.Link(n0, 0, n2, 0); - def.Link(n1, 0, n2, 1); - def.Link(n2, 0, no, 0); + ExecGraph g; + ExecNode *n2 = g.AddNode(std::move(op2)); + ExecNode *n1 = g.AddNode(std::move(op1)); + ExecNode *n0 = g.AddNode(std::move(op0)); + ExecNode *no = g.AddOutputNode(); + g.Link(n0, 0, n2, 0); + g.Link(n1, 0, n2, 1); + g.Link(n2, 0, no, 0); + LimitBackendConcurrency(g, OpType::CPU); + ThreadPool tp(4, 0, false, "test"); WorkspaceParams params = {}; ExecEnv env; @@ -200,8 +214,8 @@ TEST(ExecGraphTest, SimpleGraphScheduleAhead) { std::vector fut; fut.reserve(N); for (int i = 0; i < N; i++) { - def.PrepareIteration(std::make_shared(), params); - fut.push_back(def.Launch(ex)); + g.PrepareIteration(std::make_shared(), params); + fut.push_back(g.Launch(ex)); } int ctr = 0; @@ -330,6 +344,7 @@ TEST(ExecGraphTest, LoweredExec) { graph::OpGraph def = GetTestGraph1(); ExecGraph g; g.Lower(def); + LimitBackendConcurrency(g, OpType::CPU); ThreadPool tp(std::thread::hardware_concurrency(), 0, false, "test"); WorkspaceParams params = {}; diff --git a/dali/pipeline/executor/executor2/op_task.cc b/dali/pipeline/executor/executor2/op_task.cc index 0a5561962b0..4ad8c21f841 100644 --- a/dali/pipeline/executor/executor2/op_task.cc +++ b/dali/pipeline/executor/executor2/op_task.cc @@ -22,6 +22,7 @@ #include "dali/core/nvtx.h" #include "dali/pipeline/operator/operator.h" #include "dali/pipeline/operator/checkpointing/checkpoint.h" +#include "dali/core/call_at_exit.h" namespace dali { namespace exec2 { @@ -39,6 +40,11 @@ tasking::SharedTask OpTask::CreateTask(ExecNode *node, CachedWorkspace ws) { } OpTask::OpTaskOutputs OpTask::Run() { + auto c = ++node_->actual_concurrency; + AtScopeExit([&]() { --node_->actual_concurrency; }); + if (node_->concurrency) { + assert(c <= node_->concurrency->MaxCount()); + } SetWorkspaceInputs(); SetupOp(); RunOp(); diff --git a/include/dali/core/exec/tasking/sync.h b/include/dali/core/exec/tasking/sync.h index 90158e1a3dc..16c04d0ee8a 100644 --- a/include/dali/core/exec/tasking/sync.h +++ b/include/dali/core/exec/tasking/sync.h @@ -158,6 +158,8 @@ class Semaphore : public Releasable { explicit Semaphore(int max_count) : Semaphore(max_count, max_count) {} Semaphore(int max_count, int initial_count) : count(initial_count), max_count(max_count) {} + constexpr int MaxCount() const { return max_count; } + protected: mutable spinlock lock_;