diff --git a/dali/pipeline/executor/executor2/stream_assignment.h b/dali/pipeline/executor/executor2/stream_assignment.h new file mode 100644 index 0000000000..6e8b25fbea --- /dev/null +++ b/dali/pipeline/executor/executor2/stream_assignment.h @@ -0,0 +1,344 @@ +// Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DALI_PIPELINE_EXECUTOR_EXECUTOR2_STREAM_ASSIGNMENT_H_ +#define DALI_PIPELINE_EXECUTOR_EXECUTOR2_STREAM_ASSIGNMENT_H_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "dali/pipeline/graph/graph_util.h" +#include "dali/pipeline/executor/executor2/exec_graph.h" +#include "dali/pipeline/executor/executor2/exec2.h" + +namespace dali { +namespace exec2 { + +template +class StreamAssignment; + +inline bool NeedsStream(const ExecNode *node) { + if (node->is_pipeline_output) { + for (auto &pipe_out : node->inputs) { + if (pipe_out->device == StorageDevice::GPU) + return true; + } + } else { + return node->backend != OpType::CPU; + } + return false; +} + +inline OpType NodeType(const ExecNode *node) { + if (node->is_pipeline_output) { + OpType type = OpType::CPU; + for (auto &pipe_out : node->inputs) { + if (pipe_out->device == StorageDevice::GPU) { + auto producer_type = pipe_out->producer->backend; + if (producer_type == OpType::GPU) { + return OpType::GPU; + } else if (producer_type == OpType::MIXED) { + type = OpType::MIXED; + } + } + } + return type; + } else { + return node->backend; + } +} + +template <> +class StreamAssignment { + public: + explicit StreamAssignment(ExecGraph &graph) { + for (auto &node : graph.Nodes()) { + if (NeedsStream(&node)) { + needs_stream_ = true; + } + } + } + + std::optional operator[](const ExecNode *node) const { + if (NeedsStream(node)) + return 0; + else + return std::nullopt; + } + + int NumStreams() const { + return needs_stream_ ? 1 : 0; + } + + private: + bool needs_stream_ = false; +}; + + +template <> +class StreamAssignment { + public: + explicit StreamAssignment(ExecGraph &graph) { + for (auto &node : graph.Nodes()) { + switch (NodeType(&node)) { + case OpType::GPU: + has_gpu_ = true; + if (has_mixed_) + return; // we already have both, nothing more can happen + break; + case OpType::MIXED: + has_mixed_ = true; + if (has_gpu_) + return; // we already have both, nothing more can happen + break; + default: + break; + } + } + } + + /** Returns a stream index for a non-CPU operator. + * + * If the node is a Mixed node, it gets stream index 0. + * If the node is a GPU node it gets stream index 1 if there are any mixed nodes, otherwise + * the only stream is the GPU stream and the returned index is 0. + */ + std::optional operator[](const ExecNode *node) const { + switch (NodeType(node)) { + case OpType::CPU: + return std::nullopt; + case OpType::GPU: + return has_mixed_ ? 1 : 0; + case OpType::MIXED: + return 0; + default: + assert(false && "Unreachable"); + return std::nullopt; + } + } + + int NumStreams() const { + return has_gpu_ + has_mixed_; + } + + private: + bool has_gpu_ = false; + bool has_mixed_ = false; +}; + +/** Implements per-operator stream assignment. + * + * This policy implements stream assingment such that independent GPU/Mixed operators get + * separate streams. When there's a dependency then one dependent operator shares the stream of + * its predecessor. + * + * Example - numbers are stream indices, "X" means no stream, "s" means synchronization + * ``` + * CPU(X) ---- GPU(0) --- GPU(0) -- GPU(0) -- output 0 + * \ s + * \ / + * ----- GPU(1) ---- + * \ + * \ + * CPU(X) --- GPU(2) ----s GPU(1) ----------s output 1 + * ``` + */ +template <> +class StreamAssignment { + public: + explicit StreamAssignment(ExecGraph &graph) { + Assign(graph); + } + + std::optional operator[](const ExecNode *node) const { + auto it = node_ids_.find(node); + assert(it != node_ids_.end()); + return stream_assignment_[it->second]; + } + + /** Gets the total number of streams required to run independent operators on separate streams. */ + int NumStreams() const { + return total_streams_; + } + + private: + void Assign(ExecGraph &graph) { + // pre-fill the id pool with sequential numbers + for (int i = 0, n = graph.Nodes().size(); i < n; i++) { + free_stream_ids_.insert(i); + } + + // the nodes in the graph must be sorted topologically + sorted_nodes_.reserve(graph.Nodes().size()); + for (auto &node : graph.Nodes()) { + int idx = sorted_nodes_.size(); + sorted_nodes_.push_back(&node); + node_ids_[&node] = idx; + if (node.inputs.empty()) { + queue_.push({ node_ids_[&node], NextStreamId(&node).value_or(kInvalidStreamIdx) }); + } else { + for (auto &inp : node.inputs) { + assert(node_ids_.count(inp->producer) >= 0 && "Nodes must be topologically sorted."); + } + } + } + + assert(graph.Nodes().size() == sorted_nodes_.size()); + stream_assignment_.resize(sorted_nodes_.size()); + + FindGPUContributors(graph); + + graph::ClearVisitMarkers(graph.Nodes()); + Traverse(); + ClearCPUStreams(); + total_streams_ = CalcNumStreams(); + } + + void Traverse() { + while (!queue_.empty()) { + // PrintQueue(); /* uncomment for debugging */ + auto [idx, stream_idx] = queue_.top(); + std::optional stream_id; + if (stream_idx != kInvalidStreamIdx) + stream_id = stream_idx; + + queue_.pop(); + auto *node = sorted_nodes_[idx]; + // This will be true for nodes which has no outputs or which doesn't contribute to any + // GPU nodes. + bool keep_stream_id = stream_id.has_value(); + + graph::Visit v(node); + if (!v) { + assert(stream_assignment_[idx].value_or(kInvalidStreamIdx) <= stream_idx); + continue; // we've been here already - skip + } + + stream_assignment_[idx] = stream_id; + + if (stream_id.has_value()) + free_stream_ids_.insert(*stream_id); + for (auto &output_desc : node->outputs) { + for (auto *out : output_desc.consumers) { + auto out_stream_id = NextStreamId(out->consumer, stream_id); + if (out_stream_id.has_value()) + keep_stream_id = false; + queue_.push({node_ids_[out->consumer], out_stream_id.value_or(kInvalidStreamIdx)}); + } + } + if (keep_stream_id) + free_stream_ids_.erase(*stream_id); + } + } + + void ClearCPUStreams() { + for (int i = 0, n = sorted_nodes_.size(); i < n; i++) { + if (!NeedsStream(sorted_nodes_[i])) + stream_assignment_[i] = std::nullopt; + } + } + + int CalcNumStreams() { + int max = -1; + for (auto a : stream_assignment_) { + if (a.has_value()) + max = std::max(max, *a); + } + return max + 1; + } + + void PrintQueue(std::ostream &os = std::cout) { + auto q2 = queue_; + while (!q2.empty()) { + auto [idx, stream_idx] = q2.top(); + q2.pop(); + auto *node = sorted_nodes_[idx]; + if (!node->instance_name.empty()) + os << node->instance_name; + else if (node->is_pipeline_output) + os << ""; + else + os << "[" << idx << "]"; + os << "("; + if (stream_idx != kInvalidStreamIdx) + os << stream_idx; + else + os << "none"; + os << ") "; + } + os << "\n"; + } + + std::optional NextStreamId(const ExecNode *node, + std::optional prev_stream_id = std::nullopt) { + // If the preceding node had a stream, then we have to pass it on through CPU nodes + // if there are any GPU nodes down the graph. + // If the preceding node didn't have a stream, then we only need a stream if current + // node needs a stram. + bool needs_stream = prev_stream_id.has_value() + ? gpu_contributors_.count(node) != 0 + : NeedsStream(node); + if (needs_stream) { + assert(!free_stream_ids_.empty()); + auto b = free_stream_ids_.begin(); + int ret = *b; + free_stream_ids_.erase(b); + return ret; + } else { + return std::nullopt; + } + } + + void FindGPUContributors(ExecGraph &graph) { + // Run DFS, output to input, and find nodes which contribute to any node that requires a stream + graph::ClearVisitMarkers(graph.Nodes()); + for (auto &node : graph.Nodes()) { + if (node.outputs.empty()) + FindGPUContributors(&node, false); + } + } + + void FindGPUContributors(const ExecNode *node, bool is_gpu_contributor) { + graph::Visit v(node); + if (!v) + return; + if (!is_gpu_contributor) + is_gpu_contributor = NeedsStream(node); + if (is_gpu_contributor) + gpu_contributors_.insert(node); + for (auto *inp : node->inputs) + FindGPUContributors(inp->producer, is_gpu_contributor); + } + + + static constexpr int kInvalidStreamIdx = 0x7fffffff; + std::vector> stream_assignment_; + int total_streams_ = 0; + std::unordered_map node_ids_; // topologically sorted nodes + std::set gpu_contributors_; + std::vector sorted_nodes_; + std::set free_stream_ids_; + std::priority_queue, std::vector>, std::greater<>> queue_; +}; + +} // namespace exec2 +} // namespace dali + +#endif // DALI_PIPELINE_EXECUTOR_EXECUTOR2_STREAM_ASSIGNMENT_H_ diff --git a/dali/pipeline/executor/executor2/stream_assignment_test.cc b/dali/pipeline/executor/executor2/stream_assignment_test.cc new file mode 100644 index 0000000000..61aa546b02 --- /dev/null +++ b/dali/pipeline/executor/executor2/stream_assignment_test.cc @@ -0,0 +1,223 @@ +// Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include "dali/pipeline/executor/executor2/stream_assignment.h" +#include "dali/pipeline/operator/operator.h" +#include "dali/pipeline/operator/operator_factory.h" + +namespace dali { + +template +class StreamAssignmentDummyOp : public Operator { + public: + using Operator::Operator; + USE_OPERATOR_MEMBERS(); + + void RunImpl(Workspace &ws) override {} + bool SetupImpl(std::vector &output_desc, const Workspace &ws) override { + return false; + } +}; + +DALI_SCHEMA(StreamAssignmentDummyOp) + .NumInput(0, 999) + .NumOutput(0) + .AdditionalOutputsFn([](const OpSpec &spec) { + return spec.NumOutput(); + }); + +DALI_REGISTER_OPERATOR(StreamAssignmentDummyOp, StreamAssignmentDummyOp, CPU); +DALI_REGISTER_OPERATOR(StreamAssignmentDummyOp, StreamAssignmentDummyOp, Mixed); +DALI_REGISTER_OPERATOR(StreamAssignmentDummyOp, StreamAssignmentDummyOp, GPU); + +namespace exec2 { + +namespace { + +OpSpec SpecDev(const std::string &device) { + return OpSpec("StreamAssignmentDummyOp") + .AddArg("device", device) + .AddArg("num_threads", 1) + .AddArg("max_batch_size", 1); +} + +OpSpec SpecGPU() { + return SpecDev("gpu"); +} + +OpSpec SpecCPU() { + return SpecDev("cpu"); +} + +OpSpec SpecMixed() { + return SpecDev("mixed"); +} + +auto MakeNodeMap(const ExecGraph &graph) { + std::map> map; + for (auto &n : graph.Nodes()) + if (!n.instance_name.empty()) { + map[n.instance_name] = &n; + } + return map; +} + +} // namespace + + +TEST(Exec2Test, StreamAssignment_PerOperator_1) { + ExecGraph eg; + /* + a -- b ----- c --------- g -- out + \ / + ---d -- e (cpu)---f (mixed) + + */ + graph::OpGraph::Builder b; + b.Add("a", + SpecGPU() + .AddOutput("a->b", "gpu") + .AddOutput("a->d", "gpu")); + b.Add("b", + SpecGPU() + .AddInput("a->b", "gpu") + .AddOutput("b->c", "gpu")); + b.Add("c", + SpecGPU() + .AddInput("b->c", "gpu") + .AddOutput("c->g", "gpu")); + b.Add("d", + SpecGPU() + .AddInput("a->d", "gpu") + .AddOutput("d->e", "cpu")); + b.Add("e", + SpecCPU() + .AddInput("d->e", "cpu") + .AddOutput("e->f", "cpu")); + b.Add("f", + SpecMixed() + .AddInput("e->f", "cpu") + .AddOutput("f->g", "gpu")); + b.Add("g", + SpecGPU() + .AddInput("c->g", "gpu") + .AddInput("f->g", "gpu") + .AddOutput("g->o", "gpu")); + b.AddOutput("g->o_gpu"); + auto g = std::move(b).GetGraph(true); + eg.Lower(g); + + StreamAssignment assignment(eg); + auto map = MakeNodeMap(eg); + EXPECT_EQ(assignment[map["a"]], 0); + EXPECT_EQ(assignment[map["b"]], 0); + EXPECT_EQ(assignment[map["c"]], 0); + EXPECT_EQ(assignment[map["d"]], 1); + EXPECT_EQ(assignment[map["e"]], std::nullopt); + EXPECT_EQ(assignment[map["f"]], 1); + EXPECT_EQ(assignment[map["g"]], 0); +} + + +TEST(Exec2Test, StreamAssignment_PerOperator_2) { + ExecGraph eg; + /* + --c-- g + / \ / \ + a -- b ----- d ----- f ---- h ---> out + \ (cpu) / / + --------------e / + / + i ----------------- j(cpu) + + k ----------------------------> out + + */ + graph::OpGraph::Builder b; + b.Add("a", + SpecGPU() + .AddOutput("a->b", "gpu") + .AddOutput("a->e", "gpu")); + b.Add("i", + SpecGPU() + .AddOutput("i->j", "gpu")); + b.Add("j", + SpecCPU() + .AddInput("i->j", "gpu") + .AddOutput("j->h", "cpu")); + b.Add("b", + SpecCPU() + .AddInput("a->b", "gpu") + .AddOutput("b->c", "cpu") + .AddOutput("b->d", "cpu")); + b.Add("c", + SpecGPU() + .AddInput("b->c", "cpu") + .AddOutput("c->d", "gpu")); + b.Add("d", + SpecGPU() + .AddInput("b->d", "cpu") + .AddInput("c->d", "gpu") + .AddOutput("d->f", "gpu")); + b.Add("e", + SpecGPU() + .AddInput("a->e", "gpu") + .AddOutput("e->f", "gpu")); + b.Add("f", + SpecGPU() + .AddInput("d->f", "gpu") + .AddInput("e->f", "gpu") + .AddOutput("f->g", "gpu") + .AddOutput("f->h", "gpu")); + b.Add("g", + SpecGPU() + .AddInput("f->g", "gpu") + .AddOutput("g->h", "gpu")); + b.Add("h", + SpecGPU() + .AddInput("f->h", "gpu") + .AddInput("g->h", "gpu") + .AddInput("j->h", "cpu") + .AddOutput("h->o", "gpu")); + b.Add("k", + SpecGPU() + .AddOutput("k->o", "gpu")); // directly to output + b.AddOutput("h->o_gpu"); + b.AddOutput("k->o_gpu"); + auto g = std::move(b).GetGraph(true); + eg.Lower(g); + + StreamAssignment assignment(eg); + auto map = MakeNodeMap(eg); + EXPECT_EQ(assignment[map["a"]], 0); + EXPECT_EQ(assignment[map["b"]], std::nullopt); + EXPECT_EQ(assignment[map["c"]], 0); + EXPECT_EQ(assignment[map["d"]], 0); + EXPECT_EQ(assignment[map["e"]], 3); + EXPECT_EQ(assignment[map["f"]], 0); + EXPECT_EQ(assignment[map["g"]], 0); + EXPECT_EQ(assignment[map["h"]], 0); + EXPECT_EQ(assignment[map["i"]], 1); + EXPECT_EQ(assignment[map["j"]], std::nullopt); + EXPECT_EQ(assignment[map["k"]], 2); +} + +} // namespace exec2 +} // namespace dali