From c46577ec94f954593a4afd4bad2a777f3330faaf Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Wed, 17 Jul 2024 17:20:46 +0200 Subject: [PATCH] Fix pinnedness. Signed-off-by: Michal Zientkiewicz --- dali/pipeline/data/buffer.cc | 7 +- .../pipeline/executor/executor2/exec_graph.cc | 8 +- dali/pipeline/executor/executor2/exec_graph.h | 13 +++- .../executor/executor2/exec_graph_lowering.cc | 73 ++++++++++++++++++- .../executor/executor2/exec_graph_test.cc | 41 ++++++----- dali/pipeline/executor/executor2/op_task.cc | 3 +- .../executor/executor2/stream_assignment.h | 4 +- 7 files changed, 118 insertions(+), 31 deletions(-) diff --git a/dali/pipeline/data/buffer.cc b/dali/pipeline/data/buffer.cc index 5d056407fdf..9097d2ed93d 100644 --- a/dali/pipeline/data/buffer.cc +++ b/dali/pipeline/data/buffer.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -64,6 +64,11 @@ DLL_PUBLIC shared_ptr AllocBuffer(size_t bytes, bool pinned, DLL_PUBLIC bool RestrictPinnedMemUsage() { static bool val = []() { + /* int n = 0; + if (cudaGetDeviceCount(&n) != CUDA_SUCCESS) + return true; + if (n == 0) + return true; */ const char *env = getenv("DALI_RESTRICT_PINNED_MEM"); return env && atoi(env); }(); diff --git a/dali/pipeline/executor/executor2/exec_graph.cc b/dali/pipeline/executor/executor2/exec_graph.cc index 6538aa97a38..da2849a526f 100644 --- a/dali/pipeline/executor/executor2/exec_graph.cc +++ b/dali/pipeline/executor/executor2/exec_graph.cc @@ -111,8 +111,8 @@ void ExecNode::CreateAuxTasks() { release_outputs = Task::Create([]() {}); release_outputs->ReleaseAfterRun(output_queue_limit); release_outputs->Succeed(main_task); - for (auto &consumers : outputs) { - for (auto *edge : consumers) { + for (auto &output : outputs) { + for (auto *edge : output.consumers) { if (edge->consumer->main_task) release_outputs->Succeed(edge->consumer->main_task); } @@ -215,7 +215,7 @@ void ExecGraph::Validate() { if (e.producer_output_idx >= static_cast(e.producer->outputs.size())) err("producer output index is out of range."); - auto &consumer_edges = e.producer->outputs[e.producer_output_idx]; + auto &consumer_edges = e.producer->outputs[e.producer_output_idx].consumers; if (std::count(consumer_edges.begin(), consumer_edges.end(), &e) != 1) err("the relevant producer's output doesn't have this edge as one of the consumers."); @@ -233,7 +233,7 @@ void ExecGraph::Validate() { } for (int o = 0, nout = n.outputs.size(); o < nout; o++) { - auto &consumers = n.outputs[o]; + auto &consumers = n.outputs[o].consumers; for (auto &e : consumers) { if (!known_edges.count(e)) err("a node's output is not a known edge pointer."); diff --git a/dali/pipeline/executor/executor2/exec_graph.h b/dali/pipeline/executor/executor2/exec_graph.h index f654f40a115..f0c72b3bc1b 100644 --- a/dali/pipeline/executor/executor2/exec_graph.h +++ b/dali/pipeline/executor/executor2/exec_graph.h @@ -91,6 +91,13 @@ struct ExecEdge { /** A tag type for constructing output ExecNode */ struct PipelineOutputTag {}; +struct ExecOutputDesc { + SmallVector consumers; + + StorageDevice device = StorageDevice::CPU; + bool pinned = false; +}; + /** An execution node. * * An execution node corresponds to an operator node or an output node in the pipeline @@ -119,7 +126,7 @@ class DLL_PUBLIC ExecNode { * The outputs must appear in the same order as they're defined in the operator's OpSpec. * The order of consumer edges in each output is not important. */ - SmallVector, 4> outputs; + SmallVector outputs; /** A semaphore limiting the cuncurrency of the operator. * @@ -303,7 +310,7 @@ class DLL_PUBLIC ExecGraph { if (producer) { producer->outputs.resize(std::max(producer->outputs.size(), out_idx + 1)); - producer->outputs[out_idx].push_back(&edge); + producer->outputs[out_idx].consumers.push_back(&edge); } if (consumer) { consumer->inputs.resize(std::max(consumer->inputs.size(), in_idx + 1)); @@ -320,6 +327,8 @@ class DLL_PUBLIC ExecGraph { /** Populates the graph based on a pipeline definiton graph. */ void Lower(const graph::OpGraph &def); + void FindPinnedBuffers(); + private: std::list nodes_; std::list edges_; diff --git a/dali/pipeline/executor/executor2/exec_graph_lowering.cc b/dali/pipeline/executor/executor2/exec_graph_lowering.cc index 182e7cac5c9..4861a418c23 100644 --- a/dali/pipeline/executor/executor2/exec_graph_lowering.cc +++ b/dali/pipeline/executor/executor2/exec_graph_lowering.cc @@ -32,13 +32,16 @@ void ExecGraph::Lower(const graph::OpGraph &def) { for (ExecNode &exec_node : nodes_) { assert(it_def != def.OpNodes().end()); auto op_node = *it_def++; + assert(exec_node.outputs.size() == op_node.outputs.size()); for (int o = 0, nout = op_node.outputs.size(); o < nout; o++) { const auto &out = op_node.outputs[o]; + auto dev = out->device; for (auto &consumer : out->consumers) { auto *exec_con = def2exec[consumer.op]; assert(exec_con != nullptr); - Link(&exec_node, o, exec_con, consumer.idx); + Link(&exec_node, o, exec_con, consumer.idx)->device = dev; } + exec_node.outputs[o].device = dev; } } @@ -56,8 +59,76 @@ void ExecGraph::Lower(const graph::OpGraph &def) { edge->device = data_node->device; } + FindPinnedBuffers(); Validate(); } +namespace { + +/** Sets pinnedness of the input sources + * + * The function goes over the inputs of the node. If the node is non-CPU, then all of its + * CPU _regular_ inputs are marked as pinned. + * If the node is a CPU node but passes through an input `i` directly to a pinned output `o`, + * then the source of input `i` is also marked as pinned. + */ +void SetPinnedInputs(ExecNode *node) { + assert(node->op != nullptr); + + // TODO(michalz): Update if/when we have passthrough for argument inputs + int ninp = node->op->GetSpec().NumRegularInput(); + assert(static_cast(ninp) <= node->inputs.size()); + + if (node->backend != OpType::CPU) { + for (int i = 0; i < ninp; i++) { + auto *inp = node->inputs[i]; + inp->producer->outputs[inp->producer_output_idx].pinned = true; + } + } else if (node->op->GetSpec().GetSchema().HasPassThrough()) { + auto &schema = node->op->GetSpec().GetSchema(); + int nout = node->outputs.size(); + for (int i = 0; i < ninp; i++) { + auto *input = node->inputs[i]; + if (input->device != StorageDevice::CPU) // we're not interested in non-CPU buffers + continue; + + auto &source_output = input->producer->outputs[input->producer_output_idx]; + if (source_output.pinned) // already pinned + continue; + + for (int o = 0; o < nout; o++) { + // If input `i` passes to a pinned output `o`, then the input should also be marked + // as pinned. This will be followed in reverse topological order. + if (node->outputs[o].pinned && schema.IsPassThrough(i, o, false)) { + source_output.pinned = true; + break; + } + } + } + } +} + +} // namespace + +void ExecGraph::FindPinnedBuffers() { + // No non-cpu ops? Just mark everything as non-pinned and we're done. + auto is_gpu_edge = [](const ExecEdge &e) { return e.device == StorageDevice::GPU; }; + bool has_gpu_buffers = std::find_if(edges_.begin(), edges_.end(), is_gpu_edge) != edges_.end(); + if (!has_gpu_buffers) { + for (auto &n : nodes_) + for (auto &o : n.outputs) + o.pinned = false; + return; + } + + // go in reverse topological order, from outputs to inputs + for (auto it = nodes_.rbegin(); it != nodes_.rend(); ++it) { + ExecNode &n = *it; + if (n.is_pipeline_output) + continue; + SetPinnedInputs(&n); + } +} + } // namespace exec2 } // namespace dali diff --git a/dali/pipeline/executor/executor2/exec_graph_test.cc b/dali/pipeline/executor/executor2/exec_graph_test.cc index a285edb7dac..7e0b6a89ee1 100644 --- a/dali/pipeline/executor/executor2/exec_graph_test.cc +++ b/dali/pipeline/executor/executor2/exec_graph_test.cc @@ -258,14 +258,15 @@ TEST(ExecGraphTest, Exception) { .AddOutput("op2e0", "cpu") .AddArg("name", "op2"); auto op2 = std::make_unique(spec2); - ExecGraph def; - ExecNode *n2 = def.AddNode(std::move(op2)); - ExecNode *n1 = def.AddNode(std::move(op1)); - ExecNode *n0 = def.AddNode(std::move(op0)); - ExecNode *no = def.AddOutputNode(); - def.Link(n0, 0, n2, 0); - def.Link(n1, 0, n2, 1); - def.Link(n2, 0, no, 0); + ExecGraph g; + ExecNode *n2 = g.AddNode(std::move(op2)); + ExecNode *n1 = g.AddNode(std::move(op1)); + ExecNode *n0 = g.AddNode(std::move(op0)); + ExecNode *no = g.AddOutputNode(); + g.Link(n0, 0, n2, 0); + g.Link(n1, 0, n2, 1); + g.Link(n2, 0, no, 0); + LimitBackendConcurrency(g, OpType::CPU); ThreadPool tp(std::thread::hardware_concurrency(), 0, false, "test"); WorkspaceParams params = {}; ExecEnv env; @@ -276,8 +277,8 @@ TEST(ExecGraphTest, Exception) { tasking::Executor ex(4); ex.Start(); for (int i = 0; i < 10; i++) { - def.PrepareIteration(std::make_shared(), params); - auto fut = def.Launch(ex); + g.PrepareIteration(std::make_shared(), params); + auto fut = g.Launch(ex); EXPECT_THROW(fut.Value(), DALIException); } } @@ -313,25 +314,25 @@ TEST(ExecGraphTest, LoweredStructureMatch) { auto &ex_out = g.Nodes().back(); ASSERT_EQ(ex0.outputs.size(), 1_uz); - ASSERT_EQ(ex0.outputs[0].size(), 2_uz); - EXPECT_EQ(ex0.outputs[0][0]->consumer, &ex2); - EXPECT_EQ(ex0.outputs[0][1]->consumer, &ex3); + ASSERT_EQ(ex0.outputs[0].consumers.size(), 2_uz); + EXPECT_EQ(ex0.outputs[0].consumers[0]->consumer, &ex2); + EXPECT_EQ(ex0.outputs[0].consumers[1]->consumer, &ex3); ASSERT_EQ(ex1.outputs.size(), 1_uz); - EXPECT_EQ(ex1.outputs[0][0]->consumer, &ex2); - ASSERT_EQ(ex1.outputs[0].size(), 2_uz); - EXPECT_EQ(ex1.outputs[0][1]->consumer, &ex3); + EXPECT_EQ(ex1.outputs[0].consumers[0]->consumer, &ex2); + ASSERT_EQ(ex1.outputs[0].consumers.size(), 2_uz); + EXPECT_EQ(ex1.outputs[0].consumers[1]->consumer, &ex3); ASSERT_EQ(ex2.outputs.size(), 1_uz); - ASSERT_EQ(ex2.outputs[0].size(), 1_uz); - EXPECT_EQ(ex2.outputs[0][0]->consumer, &ex_out); + ASSERT_EQ(ex2.outputs[0].consumers.size(), 1_uz); + EXPECT_EQ(ex2.outputs[0].consumers[0]->consumer, &ex_out); ASSERT_EQ(ex2.inputs.size(), 2_uz); EXPECT_EQ(ex2.inputs[0]->producer, &ex0); EXPECT_EQ(ex2.inputs[1]->producer, &ex1); ASSERT_EQ(ex3.outputs.size(), 1_uz); - ASSERT_EQ(ex3.outputs[0].size(), 1_uz); - EXPECT_EQ(ex3.outputs[0][0]->consumer, &ex_out); + ASSERT_EQ(ex3.outputs[0].consumers.size(), 1_uz); + EXPECT_EQ(ex3.outputs[0].consumers[0]->consumer, &ex_out); EXPECT_EQ(ex3.inputs[0]->producer, &ex0); EXPECT_EQ(ex3.inputs[1]->producer, &ex1); diff --git a/dali/pipeline/executor/executor2/op_task.cc b/dali/pipeline/executor/executor2/op_task.cc index 4ad8c21f841..0905c537a3b 100644 --- a/dali/pipeline/executor/executor2/op_task.cc +++ b/dali/pipeline/executor/executor2/op_task.cc @@ -125,6 +125,7 @@ void OpTask::SetupOp() { if (ws.OutputIsType(i)) { if (!ws.OutputPtr(i)) { auto tl = std::make_shared>(output_descs[i].shape.num_samples()); + tl->set_pinned(node_->outputs[i].pinned); ws.SetOutput(i, tl); } if (should_resize) @@ -190,7 +191,7 @@ void OpTask::SetWorkspaceInputs() { AccessOrder OpTask::OutputConsumerOrder(int output_idx) { assert(static_cast(output_idx) < node_->outputs.size()); // Return the common strueam. - auto &consumers = node_->outputs[output_idx]; + auto &consumers = node_->outputs[output_idx].consumers; if (consumers.empty()) return {}; // definitely no consumer AccessOrder order = consumers[0]->consumer->env.order; diff --git a/dali/pipeline/executor/executor2/stream_assignment.h b/dali/pipeline/executor/executor2/stream_assignment.h index a36166cb04b..e1a2567c25e 100644 --- a/dali/pipeline/executor/executor2/stream_assignment.h +++ b/dali/pipeline/executor/executor2/stream_assignment.h @@ -229,8 +229,8 @@ class StreamAssignment { if (stream_id.has_value()) free_stream_ids_.insert(*stream_id); - for (auto &consumers : node->outputs) { - for (auto *out : consumers) { + for (auto &output_desc : node->outputs) { + for (auto *out : output_desc.consumers) { auto out_stream_id = NextStreamId(out->consumer, stream_id); if (out_stream_id.has_value()) keep_stream_id = false;