Skip to content

Commit

Permalink
No hang.
Browse files Browse the repository at this point in the history
Signed-off-by: Michał Zientkiewicz <[email protected]>
  • Loading branch information
mzient committed Jul 18, 2024
1 parent f3308b5 commit 3dfbac0
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 41 deletions.
6 changes: 4 additions & 2 deletions dali/pipeline/executor/executor2/exec2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ void ApplyConcurrencyLimit(ExecGraph &graph, const Executor2::Config &config) {
LimitBackendConcurrency(graph, OpType::CPU);
break; // other operators have no restrictions
case OperatorConcurrency::Backend:
LimitBackendConcurrency(graph, OpType::CPU);
LimitBackendConcurrency(graph, OpType::GPU);
LimitBackendConcurrency(graph, OpType::MIXED);
break;
Expand All @@ -70,6 +71,9 @@ class Executor2::Impl {
public:
explicit Impl(const Config &config) : config_(config) {
}
~Impl() {
Shutdown();
}

enum class State {
New = 0,
Expand All @@ -83,7 +87,6 @@ class Executor2::Impl {
void Build(const graph::OpGraph &graph) {
if (state_ != State::New)
throw std::logic_error("Already built.");
std::cerr << "Building " << (void*)this << std::endl;
state_ = State::Building;
DeviceGuard dg(config_.device.value_or(CPU_ONLY_DEVICE_ID));
graph_.Lower(graph);
Expand Down Expand Up @@ -154,7 +157,6 @@ class Executor2::Impl {
void Shutdown() {
if (state_ != State::Running)
return;
std::cerr << "Shutting down " << (void*)this << std::endl;
state_ = State::ShutdownRequested;
if (exec_)
exec_->Shutdown();
Expand Down
30 changes: 16 additions & 14 deletions dali/pipeline/executor/executor2/exec2_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,28 +115,30 @@ TEST_P(Exec2Test, Graph1_CPUOnly) {
Executor2 exec(config_);
graph::OpGraph graph = GetTestGraph1();
exec.Build(graph);
exec.Run();
exec.Run();
for (int i = 0; i < 10; i++) {
exec.Run();
}
Workspace ws;
exec.Outputs(&ws);
CheckTestGraph1Results(ws, config_.max_batch_size);
ws.Clear();
exec.Outputs(&ws);
CheckTestGraph1Results(ws, config_.max_batch_size);
for (int i = 0; i < 10; i++) {
ws.Clear();
exec.Outputs(&ws);
CheckTestGraph1Results(ws, config_.max_batch_size);
}
}

TEST_P(Exec2Test, Graph2_CPU2GPU) {
Executor2 exec(config_);
graph::OpGraph graph = GetTestGraph2();
exec.Build(graph);
exec.Run();
exec.Run();
for (int i = 0; i < 10; i++) {
exec.Run();
}
Workspace ws;
exec.Outputs(&ws);
CheckTestGraph2Results(ws, config_.max_batch_size);
ws.Clear();
exec.Outputs(&ws);
CheckTestGraph2Results(ws, config_.max_batch_size);
for (int i = 0; i < 10; i++) {
ws.Clear();
exec.Outputs(&ws);
CheckTestGraph2Results(ws, config_.max_batch_size);
}
}


Expand Down
19 changes: 14 additions & 5 deletions dali/pipeline/executor/executor2/exec2_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,27 @@ class DummyOpCPU : public Operator<CPUBackend> {
void RunImpl(Workspace &ws) override {
int N = ws.GetRequestedBatchSize(0);
addend_.Acquire(spec_, ws, N);
sample_sums_.resize(N);
auto &tp = ws.GetThreadPool();
for (int s = 0; s < N; s++) {
int sum = *addend_[s].data + s;
for (int i = 0; i < ws.NumInput(); i++) {
sum += *ws.Input<CPUBackend>(i)[s].data<int>();
}
*ws.Output<CPUBackend>(0)[s].mutable_data<int>() = sum;
auto sample_sum = [&, s](int) {
int sum = *addend_[s].data + s;
for (int i = 0; i < ws.NumInput(); i++) {
sum += *ws.Input<CPUBackend>(i)[s].data<int>();
}
sample_sums_[s] = sum;
};
tp.AddWork(sample_sum);
}
tp.RunAll(true);
for (int s = 0; s < N; s++)
*ws.Output<CPUBackend>(0)[s].mutable_data<int>() = sample_sums_[s];
}

bool CanInferOutputs() const override { return true; }
ArgValue<int> addend_{"addend", spec_};

std::vector<int> sample_sums_;
std::string instance_name_;
};

Expand Down
2 changes: 2 additions & 0 deletions dali/pipeline/executor/executor2/exec_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ class DLL_PUBLIC ExecNode {
* from being executed concurrently.
*/
std::shared_ptr<tasking::Semaphore> concurrency;
std::atomic_int actual_concurrency{0};

/** Limits the number of output buffers for the operator.
*
* If the graph is scheduled multiple times ahead, the operator would produce multiple results,
Expand Down
55 changes: 35 additions & 20 deletions dali/pipeline/executor/executor2/exec_graph_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ namespace dali {
namespace exec2 {
namespace test {

namespace {
void LimitBackendConcurrency(ExecGraph &graph, OpType backend, int max_concurrency = 1) {
auto sem = std::make_shared<tasking::Semaphore>(max_concurrency);
for (auto &n : graph.Nodes()) {
if (n.backend == backend)
n.concurrency = sem;
}
}
} // namespace

TEST(ExecGraphTest, SimpleGraph) {
int batch_size = 32;
OpSpec spec0(kTestOpName);
Expand Down Expand Up @@ -62,6 +72,7 @@ TEST(ExecGraphTest, SimpleGraph) {
g.Link(n0, 0, n2, 0);
g.Link(n1, 0, n2, 1);
g.Link(n2, 0, no, 0);
LimitBackendConcurrency(g, OpType::CPU);

WorkspaceParams params = {};
auto tp = std::make_unique<ThreadPool>(std::thread::hardware_concurrency(), 0, false, "test");
Expand Down Expand Up @@ -114,14 +125,15 @@ TEST(ExecGraphTest, SimpleGraphRepeat) {
.AddOutput("op2e0", "cpu")
.AddArg("name", "op2");
auto op2 = std::make_unique<DummyOpCPU>(spec2);
ExecGraph def;
ExecNode *n2 = def.AddNode(std::move(op2));
ExecNode *n1 = def.AddNode(std::move(op1));
ExecNode *n0 = def.AddNode(std::move(op0));
ExecNode *no = def.AddOutputNode();
def.Link(n0, 0, n2, 0);
def.Link(n1, 0, n2, 1);
def.Link(n2, 0, no, 0);
ExecGraph g;
ExecNode *n2 = g.AddNode(std::move(op2));
ExecNode *n1 = g.AddNode(std::move(op1));
ExecNode *n0 = g.AddNode(std::move(op0));
ExecNode *no = g.AddOutputNode();
g.Link(n0, 0, n2, 0);
g.Link(n1, 0, n2, 1);
g.Link(n2, 0, no, 0);
LimitBackendConcurrency(g, OpType::CPU);
ThreadPool tp(4, 0, false, "test");
WorkspaceParams params = {};
ExecEnv env;
Expand All @@ -135,8 +147,8 @@ TEST(ExecGraphTest, SimpleGraphRepeat) {
ex.Start();
auto start = dali::test::perf_timer::now();
for (int i = 0; i < N; i++) {
def.PrepareIteration(std::make_shared<IterationData>(), params);
auto fut = def.Launch(ex);
g.PrepareIteration(std::make_shared<IterationData>(), params);
auto fut = g.Launch(ex);
auto &pipe_out = fut.Value<const PipelineOutput &>();
auto &ws = pipe_out.workspace;
auto &out = ws.Output<CPUBackend>(0);
Expand Down Expand Up @@ -179,14 +191,16 @@ TEST(ExecGraphTest, SimpleGraphScheduleAhead) {
.AddOutput("op2e0", "cpu")
.AddArg("name", "op2");
auto op2 = std::make_unique<DummyOpCPU>(spec2);
ExecGraph def;
ExecNode *n2 = def.AddNode(std::move(op2));
ExecNode *n1 = def.AddNode(std::move(op1));
ExecNode *n0 = def.AddNode(std::move(op0));
ExecNode *no = def.AddOutputNode();
def.Link(n0, 0, n2, 0);
def.Link(n1, 0, n2, 1);
def.Link(n2, 0, no, 0);
ExecGraph g;
ExecNode *n2 = g.AddNode(std::move(op2));
ExecNode *n1 = g.AddNode(std::move(op1));
ExecNode *n0 = g.AddNode(std::move(op0));
ExecNode *no = g.AddOutputNode();
g.Link(n0, 0, n2, 0);
g.Link(n1, 0, n2, 1);
g.Link(n2, 0, no, 0);
LimitBackendConcurrency(g, OpType::CPU);

ThreadPool tp(4, 0, false, "test");
WorkspaceParams params = {};
ExecEnv env;
Expand All @@ -200,8 +214,8 @@ TEST(ExecGraphTest, SimpleGraphScheduleAhead) {
std::vector<tasking::TaskFuture> fut;
fut.reserve(N);
for (int i = 0; i < N; i++) {
def.PrepareIteration(std::make_shared<IterationData>(), params);
fut.push_back(def.Launch(ex));
g.PrepareIteration(std::make_shared<IterationData>(), params);
fut.push_back(g.Launch(ex));
}

int ctr = 0;
Expand Down Expand Up @@ -330,6 +344,7 @@ TEST(ExecGraphTest, LoweredExec) {
graph::OpGraph def = GetTestGraph1();
ExecGraph g;
g.Lower(def);
LimitBackendConcurrency(g, OpType::CPU);

ThreadPool tp(std::thread::hardware_concurrency(), 0, false, "test");
WorkspaceParams params = {};
Expand Down
6 changes: 6 additions & 0 deletions dali/pipeline/executor/executor2/op_task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "dali/core/nvtx.h"
#include "dali/pipeline/operator/operator.h"
#include "dali/pipeline/operator/checkpointing/checkpoint.h"
#include "dali/core/call_at_exit.h"

namespace dali {
namespace exec2 {
Expand All @@ -39,6 +40,11 @@ tasking::SharedTask OpTask::CreateTask(ExecNode *node, CachedWorkspace ws) {
}

OpTask::OpTaskOutputs OpTask::Run() {
auto c = ++node_->actual_concurrency;
AtScopeExit([&]() { --node_->actual_concurrency; });
if (node_->concurrency) {
assert(c <= node_->concurrency->MaxCount());
}
SetWorkspaceInputs();
SetupOp();
RunOp();
Expand Down
2 changes: 2 additions & 0 deletions include/dali/core/exec/tasking/sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ class Semaphore : public Releasable {
explicit Semaphore(int max_count) : Semaphore(max_count, max_count) {}
Semaphore(int max_count, int initial_count) : count(initial_count), max_count(max_count) {}

constexpr int MaxCount() const { return max_count; }

protected:
mutable spinlock lock_;

Expand Down

0 comments on commit 3dfbac0

Please sign in to comment.