Skip to content

Commit

Permalink
Add default layouts.
Browse files Browse the repository at this point in the history
Signed-off-by: Michał Zientkiewicz <[email protected]>
  • Loading branch information
mzient committed Jul 18, 2024
1 parent 1bb0383 commit ecdbb55
Show file tree
Hide file tree
Showing 10 changed files with 250 additions and 83 deletions.
7 changes: 4 additions & 3 deletions dali/pipeline/executor/executor2/exec2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ void LimitBackendConcurrency(ExecGraph &graph, OpType backend, int max_concurren
if (n.backend == backend)
n.concurrency = sem;
}
graph.Invalidate();
}

void ApplyConcurrencyLimit(ExecGraph &graph, const Executor2::Config &config) {
switch (config.concurrency) {
void ApplyConcurrencyLimit(ExecGraph &graph, OperatorConcurrency concurrency) {
switch (concurrency) {
case OperatorConcurrency::Full:
// TODO(michalz): Fix ThreadPool.
LimitBackendConcurrency(graph, OpType::CPU);
Expand Down Expand Up @@ -94,7 +95,7 @@ class Executor2::Impl {
AnalyzeGraph();
CheckNodeTypes();
CalculatePrefetchDepth();
ApplyConcurrencyLimit(graph_, config_);
ApplyConcurrencyLimit(graph_, config_.concurrency);
SetupStreams();
SetupThreadPool();

Expand Down
3 changes: 3 additions & 0 deletions dali/pipeline/executor/executor2/exec2_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class DummyOpCPU : public Operator<CPUBackend> {

bool SetupImpl(std::vector<OutputDesc> &outs, const Workspace &ws) override {
int N = ws.GetRequestedBatchSize(0);
outs.resize(ws.NumOutput());
outs[0].shape = uniform_list_shape(N, TensorShape<>{});
outs[0].type = DALI_INT32;
return true;
Expand Down Expand Up @@ -77,6 +78,7 @@ class DummyOpGPU : public Operator<GPUBackend> {

bool SetupImpl(std::vector<OutputDesc> &outs, const Workspace &ws) override {
int N = ws.GetRequestedBatchSize(0);
outs.resize(ws.NumOutput());
outs[0].shape = uniform_list_shape(N, TensorShape<>{});
outs[0].type = DALI_INT32;
return true;
Expand All @@ -102,6 +104,7 @@ class CounterOp : public Operator<CPUBackend> {

bool SetupImpl(std::vector<OutputDesc> &outs, const Workspace &ws) override {
int N = ws.GetRequestedBatchSize(0);
outs.resize(ws.NumOutput());
outs[0].shape = uniform_list_shape(N, TensorShape<>{});
outs[0].type = DALI_INT32;
return true;
Expand Down
2 changes: 2 additions & 0 deletions dali/pipeline/executor/executor2/exec_graph.cc
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ void ExecGraph::PrepareIteration(
const std::shared_ptr<IterationData> &iter_data,
const WorkspaceParams &params) {
Validate();
Analyze();
for (auto &n : nodes_) {
n.NextIter();
n.CreateMainTask(iter_data, params);
Expand All @@ -277,6 +278,7 @@ void ExecGraph::PrepareIteration(

tasking::TaskFuture ExecGraph::Launch(tasking::Scheduler &sched) {
Validate();
Analyze();
std::optional<tasking::TaskFuture> ret;
for (auto &n : nodes_) {
auto maybe_future = n.Launch(sched);
Expand Down
19 changes: 15 additions & 4 deletions dali/pipeline/executor/executor2/exec_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ struct ExecOutputDesc {
SmallVector<ExecEdge *, 4> consumers;

StorageDevice device = StorageDevice::CPU;
bool pinned = false;
bool pinned = false;

bool parallel_consumers = true;
};

/** An execution node.
Expand Down Expand Up @@ -276,7 +278,7 @@ class DLL_PUBLIC ExecGraph {

template <typename... Args>
ExecNode *AddNode(Args &&...args) {
validated_ = false;
Invalidate();
ExecNode *node = &nodes_.emplace_back(std::forward<Args>(args)...);
if (!node->instance_name.empty()) {
if (!name2node_.emplace(node->instance_name, node).second) {
Expand All @@ -289,7 +291,7 @@ class DLL_PUBLIC ExecGraph {
}

ExecNode *AddOutputNode() {
validated_ = false;
Invalidate();
ExecNode *node = &nodes_.emplace_back(PipelineOutputTag());
if (!node->instance_name.empty()) {
if (!name2node_.emplace(node->instance_name, node).second) {
Expand All @@ -302,6 +304,7 @@ class DLL_PUBLIC ExecGraph {
}

ExecEdge *Link(ExecNode *producer, int out_idx, ExecNode *consumer, int in_idx) {
Invalidate();
auto &edge = edges_.emplace_back();
edge.producer = producer;
edge.producer_output_idx = out_idx;
Expand All @@ -319,6 +322,11 @@ class DLL_PUBLIC ExecGraph {
return &edge;
}

void Invalidate() {
validated_ = false;
analyzed_ = false;
}

void PrepareIteration(const std::shared_ptr<IterationData> &iter_data,
const WorkspaceParams &params);

Expand All @@ -327,14 +335,17 @@ class DLL_PUBLIC ExecGraph {
/** Populates the graph based on a pipeline definiton graph. */
void Lower(const graph::OpGraph &def);

void FindPinnedBuffers();
void Analyze();

private:
class Analyzer;

std::list<ExecNode> nodes_;
std::list<ExecEdge> edges_;
std::unordered_map<std::string_view, ExecNode *> name2node_;

bool validated_ = false;
bool analyzed_ = false;
/** A bugcheck for graph inconsitency. It throws upon detecting misconneted nodes. */
void Validate();
};
Expand Down
138 changes: 138 additions & 0 deletions dali/pipeline/executor/executor2/exec_graph_analysis.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <cassert>
#include <unordered_map>
#include <utility>
#include "dali/pipeline/executor/executor2/exec_graph.h"
#include "dali/pipeline/graph/op_graph2.h"

namespace dali {
namespace exec2 {

namespace {

/** Sets pinnedness of the input sources
*
* The function goes over the inputs of the node. If the node is non-CPU, then all of its
* CPU _regular_ inputs are marked as pinned.
* If the node is a CPU node but passes through an input `i` directly to a pinned output `o`,
* then the source of input `i` is also marked as pinned.
*/
void SetPinnedInputs(ExecNode *node) {
assert(node->op != nullptr);

// TODO(michalz): Update if/when we have passthrough for argument inputs
int ninp = node->op->GetSpec().NumRegularInput();
assert(static_cast<size_t>(ninp) <= node->inputs.size());

if (node->backend != OpType::CPU) {
for (int i = 0; i < ninp; i++) {
auto *inp = node->inputs[i];
inp->producer->outputs[inp->producer_output_idx].pinned = true;
}
} else if (node->op->GetSpec().GetSchema().HasPassThrough()) {
auto &schema = node->op->GetSpec().GetSchema();
int nout = node->outputs.size();
for (int i = 0; i < ninp; i++) {
auto *input = node->inputs[i];
if (input->device != StorageDevice::CPU) // we're not interested in non-CPU buffers
continue;

auto &source_output = input->producer->outputs[input->producer_output_idx];
if (source_output.pinned) // already pinned
continue;

for (int o = 0; o < nout; o++) {
// If input `i` passes to a pinned output `o`, then the input should also be marked
// as pinned. This will be followed in reverse topological order.
if (node->outputs[o].pinned && schema.IsPassThrough(i, o, false)) {
source_output.pinned = true;
break;
}
}
}
}
}

} // namespace

class ExecGraph::Analyzer {
public:
void FindPinnedBuffers(ExecGraph &g) {
// No non-cpu ops? Just mark everything as non-pinned and we're done.
auto is_gpu_edge = [](const ExecEdge &e) { return e.device == StorageDevice::GPU; };
bool has_gpu_buffers = std::find_if(g.edges_.begin(), g.edges_.end(), is_gpu_edge)
!= g.edges_.end();
if (!has_gpu_buffers) {
for (auto &n : g.nodes_)
for (auto &o : n.outputs)
o.pinned = false;
return;
}

// go in reverse topological order, from outputs to inputs
for (auto it = g.nodes_.rbegin(); it != g.nodes_.rend(); ++it) {
ExecNode &n = *it;
if (n.is_pipeline_output)
continue;
SetPinnedInputs(&n);
}
}

bool HasParallelConsumers(const ExecOutputDesc &out) {
int ncons = out.consumers.size();
// If there's just one outgoing edge from that input, we're safe.
if (ncons <= 1)
return false;

// If there are multiple edges, but they point to different inputs of the same
// consumer, then the input is effectively consumed in parallel.
for (int i = 1; i < ncons; i++)
if (out.consumers[i]->consumer == out.consumers[0]->consumer)
return true;

// Finally, let's go over all the consumers and check if they're guarded with one
// semaphore with MaxCount() == 1. If so, then the access to the node is sequential.
auto sem = out.consumers[0]->consumer->concurrency;
if (!sem)
return true;
if (sem->MaxCount() > 1)
return true;
for (size_t i = 1; i < out.consumers.size(); i++)
if (out.consumers[i]->consumer->concurrency != sem)
return true;
return false;
}

void FindParallelConsumers(ExecGraph &g) {
for (auto &n : g.nodes_) {
for (auto &o : n.outputs)
o.parallel_consumers = HasParallelConsumers(o);
}
}
};

void ExecGraph::Analyze() {
if (analyzed_)
return;
Analyzer a;
a.FindPinnedBuffers(*this);
// a.FindParallelConsumers(*this);
analyzed_ = true;
}


} // namespace exec2
} // namespace dali
70 changes: 2 additions & 68 deletions dali/pipeline/executor/executor2/exec_graph_lowering.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ namespace dali {
namespace exec2 {

void ExecGraph::Lower(const graph::OpGraph &def) {
validated_ = false;
analyzed_ = false;
std::unordered_map<const graph::OpNode *, ExecNode *> def2exec(def.OpNodes().size());
for (const graph::OpNode &op_node : def.OpNodes()) {
ExecNode *exec_node = AddNode(InstantiateOperator(op_node.spec), &op_node);
Expand Down Expand Up @@ -59,76 +61,8 @@ void ExecGraph::Lower(const graph::OpGraph &def) {
edge->device = data_node->device;
}

FindPinnedBuffers();
Validate();
}

namespace {

/** Sets pinnedness of the input sources
*
* The function goes over the inputs of the node. If the node is non-CPU, then all of its
* CPU _regular_ inputs are marked as pinned.
* If the node is a CPU node but passes through an input `i` directly to a pinned output `o`,
* then the source of input `i` is also marked as pinned.
*/
void SetPinnedInputs(ExecNode *node) {
assert(node->op != nullptr);

// TODO(michalz): Update if/when we have passthrough for argument inputs
int ninp = node->op->GetSpec().NumRegularInput();
assert(static_cast<size_t>(ninp) <= node->inputs.size());

if (node->backend != OpType::CPU) {
for (int i = 0; i < ninp; i++) {
auto *inp = node->inputs[i];
inp->producer->outputs[inp->producer_output_idx].pinned = true;
}
} else if (node->op->GetSpec().GetSchema().HasPassThrough()) {
auto &schema = node->op->GetSpec().GetSchema();
int nout = node->outputs.size();
for (int i = 0; i < ninp; i++) {
auto *input = node->inputs[i];
if (input->device != StorageDevice::CPU) // we're not interested in non-CPU buffers
continue;

auto &source_output = input->producer->outputs[input->producer_output_idx];
if (source_output.pinned) // already pinned
continue;

for (int o = 0; o < nout; o++) {
// If input `i` passes to a pinned output `o`, then the input should also be marked
// as pinned. This will be followed in reverse topological order.
if (node->outputs[o].pinned && schema.IsPassThrough(i, o, false)) {
source_output.pinned = true;
break;
}
}
}
}
}

} // namespace

void ExecGraph::FindPinnedBuffers() {
// No non-cpu ops? Just mark everything as non-pinned and we're done.
auto is_gpu_edge = [](const ExecEdge &e) { return e.device == StorageDevice::GPU; };
bool has_gpu_buffers = std::find_if(edges_.begin(), edges_.end(), is_gpu_edge) != edges_.end();
if (!has_gpu_buffers) {
for (auto &n : nodes_)
for (auto &o : n.outputs)
o.pinned = false;
return;
}

// go in reverse topological order, from outputs to inputs
for (auto it = nodes_.rbegin(); it != nodes_.rend(); ++it) {
ExecNode &n = *it;
if (n.is_pipeline_output)
continue;
SetPinnedInputs(&n);
}
}

} // namespace exec2
} // namespace dali
2 changes: 2 additions & 0 deletions dali/pipeline/executor/executor2/exec_graph_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ namespace exec2 {
namespace test {

namespace {
// TODO(michalz): Avoid this code duplication without messing up encapsulation
void LimitBackendConcurrency(ExecGraph &graph, OpType backend, int max_concurrency = 1) {
auto sem = std::make_shared<tasking::Semaphore>(max_concurrency);
for (auto &n : graph.Nodes()) {
if (n.backend == backend)
n.concurrency = sem;
}
graph.Invalidate();
}
} // namespace

Expand Down
Loading

0 comments on commit ecdbb55

Please sign in to comment.