diff --git a/dali/operators/generic/cast.cc b/dali/operators/generic/cast.cc index 7a9c24ab73..c9d15fff23 100644 --- a/dali/operators/generic/cast.cc +++ b/dali/operators/generic/cast.cc @@ -76,7 +76,7 @@ DALI_SCHEMA(Cast) DALI_SCHEMA(CastLike) .DocStr("Cast the first tensor to the type of the second tensor.") .NumInput(2) - .InputDevice(1, InputDevice::Any) + .InputDevice(1, InputDevice::Metadata) .NumOutput(1) .AllowSequences() .SupportVolumetric(); diff --git a/dali/operators/generic/constant_value.cc b/dali/operators/generic/constant_value.cc index cdacd15bde..bb82e8d1f9 100644 --- a/dali/operators/generic/constant_value.cc +++ b/dali/operators/generic/constant_value.cc @@ -84,7 +84,7 @@ DALI_SCHEMA(FullLike) .DocStr(R"code(Returns new data with the same shape and type as the input data, filled with a `fill_value`.)code") .NumInput(2) .InputDox(0, "data_like", "TensorList", R"code(The input data value to copy the shape and type from.)code") - .InputDevice(0, InputDevice::Any) + .InputDevice(0, InputDevice::Metadata) .InputDox(1, "fill_value", "TensorList", R"code(The fill value.)code") .NumOutput(1); DALI_REGISTER_OPERATOR(FullLike, FullLike, CPU); @@ -102,7 +102,7 @@ DALI_SCHEMA(ZerosLike) .DocStr(R"code(Returns new data with the same shape and type as the input array, filled with zeros.)code") .NumInput(1) .InputDox(0, "data_like", "TensorList", R"code(The input data value to copy the shape and type from.)code") - .InputDevice(0, InputDevice::Any) + .InputDevice(0, InputDevice::Metadata) .NumOutput(1) .AddOptionalTypeArg("dtype", R"code(Overrides the output data type.)code", DALI_INT32); DALI_REGISTER_OPERATOR(ZerosLike, ZerosLike, CPU); @@ -120,7 +120,7 @@ DALI_SCHEMA(OnesLike) .DocStr(R"code(Returns new data with the same shape and type as the input array, filled with ones.)code") .NumInput(1) .InputDox(0, "data_like", "TensorList", R"code(The input data value to copy the shape and type from.)code") - .InputDevice(0, InputDevice::Any) + .InputDevice(0, InputDevice::Metadata) .NumOutput(1) .AddOptionalTypeArg("dtype", R"code(Overrides the output data type.)code", DALI_INT32); DALI_REGISTER_OPERATOR(OnesLike, OnesLike, CPU); diff --git a/dali/operators/generic/shapes.cc b/dali/operators/generic/shapes.cc index 02a4282517..4fd85193de 100644 --- a/dali/operators/generic/shapes.cc +++ b/dali/operators/generic/shapes.cc @@ -19,7 +19,7 @@ namespace dali { DALI_SCHEMA(Shapes) .DocStr(R"code(Returns the shapes of inputs.)code") .NumInput(1) - .InputDevice(0, InputDevice::Any) + .InputDevice(0, InputDevice::Metadata) .NumOutput(1) .AllowSequences() .SupportVolumetric() diff --git a/dali/operators/random/beta_distribution_cpu.cc b/dali/operators/random/beta_distribution_cpu.cc index d30dbcf2ac..16c1d245ec 100644 --- a/dali/operators/random/beta_distribution_cpu.cc +++ b/dali/operators/random/beta_distribution_cpu.cc @@ -38,6 +38,7 @@ a single value per sample is generated. .NumInput(0, 1) .InputDox(0, "shape_like", "TensorList", "Shape of this input will be used to infer the shape of the output, if provided.") + .InputDevice(0, InputDevice::Metadata) .NumOutput(1) .AddOptionalArg("alpha", R"code(The alpha parameter, a positive ``float32`` scalar.)code", 1.0f, true) diff --git a/dali/operators/random/choice_cpu.cc b/dali/operators/random/choice_cpu.cc index c1530cb332..d996655206 100644 --- a/dali/operators/random/choice_cpu.cc +++ b/dali/operators/random/choice_cpu.cc @@ -42,6 +42,7 @@ that is: :meth:`nvidia.dali.types.DALIDataType`, :meth:`nvidia.dali.types.DALIIm "Otherwise ``__a`` is treated as 1D array of input samples.") .InputDox(1, "shape_like", "TensorList", "Shape of this input will be used to infer the shape of the output, if provided.") + .InputDevice(1, InputDevice::Metadata) .NumOutput(1) .AddOptionalArg>("p", "Distribution of the probabilities. " diff --git a/dali/operators/random/coin_flip_cpu.cc b/dali/operators/random/coin_flip_cpu.cc index 10a6db9670..1012d19466 100644 --- a/dali/operators/random/coin_flip_cpu.cc +++ b/dali/operators/random/coin_flip_cpu.cc @@ -30,6 +30,7 @@ a single value per sample is generated. .NumInput(0, 1) .InputDox(0, "shape_like", "TensorList", "Shape of this input will be used to infer the shape of the output, if provided.") + .InputDevice(0, InputDevice::Metadata) .NumOutput(1) .AddOptionalArg("probability", R"code(Probability of value 1.)code", @@ -51,6 +52,7 @@ sample is generated. .NumInput(0, 1) .InputDox(0, "shape_like", "TensorList", "Shape of this input will be used to infer the shape of the output, if provided.") + .InputDevice(0, InputDevice::Metadata) .NumOutput(1) .AddParent("random__CoinFlip") .Deprecate("random__CoinFlip"); // Deprecated in 0.30 diff --git a/dali/operators/random/normal_distribution_cpu.cc b/dali/operators/random/normal_distribution_cpu.cc index 98def24f88..48d1ca87d9 100644 --- a/dali/operators/random/normal_distribution_cpu.cc +++ b/dali/operators/random/normal_distribution_cpu.cc @@ -28,7 +28,7 @@ a single value per sample is generated. .NumInput(0, 1) .InputDox(0, "shape_like", "TensorList", "Shape of this input will be used to infer the shape of the output, if provided.") - .InputDevice(0, InputDevice::Any) + .InputDevice(0, InputDevice::Metadata) .NumOutput(1) .AddOptionalArg("mean", R"code(Mean of the distribution.)code", @@ -51,7 +51,7 @@ a single value per sample is generated. .NumInput(0, 1) .InputDox(0, "shape_like", "TensorList", "Shape of this input will be used to infer the shape of the output, if provided.") - .InputDevice(0, InputDevice::Any) + .InputDevice(0, InputDevice::Metadata) .NumOutput(1) .AddParent("random__Normal") .Deprecate("random__Normal"); // Deprecated in 0.30 diff --git a/dali/operators/random/uniform_distribution_cpu.cc b/dali/operators/random/uniform_distribution_cpu.cc index c56ad8e569..91c6ca40bf 100644 --- a/dali/operators/random/uniform_distribution_cpu.cc +++ b/dali/operators/random/uniform_distribution_cpu.cc @@ -32,6 +32,7 @@ a single value per sample is generated. .NumInput(0, 1) .InputDox(0, "shape_like", "TensorList", "Shape of this input will be used to infer the shape of the output, if provided.") + .InputDevice(0, InputDevice::Metadata) .NumOutput(1) .AddOptionalArg("range", R"code(Range ``[min, max)`` of a continuous uniform distribution. @@ -67,6 +68,7 @@ a single value per sample is generated. .NumInput(0, 1) .InputDox(0, "shape_like", "TensorList", "Shape of this input will be used to infer the shape of the output, if provided.") + .InputDevice(0, InputDevice::Metadata) .NumOutput(1) .AddOptionalArg("range", R"code(Range ``[min, max)`` of a continuous uniform distribution. diff --git a/dali/pipeline/executor/executor2/exec_graph.h b/dali/pipeline/executor/executor2/exec_graph.h index b8d2b03fc4..1f891f5411 100644 --- a/dali/pipeline/executor/executor2/exec_graph.h +++ b/dali/pipeline/executor/executor2/exec_graph.h @@ -90,13 +90,16 @@ struct ExecEdge { /** The index of the input in OpSpec. It matches the edge's index in consumer->inputs. */ int consumer_input_idx = 0; StorageDevice device = {}; + /** The input passes only the metadata, skipping stream synchronization. */ + bool metadata = false; constexpr bool operator==(const ExecEdge &other) const { return producer == other.producer && consumer == other.consumer && producer_output_idx == other.producer_output_idx && consumer_input_idx == other.consumer_input_idx && - device == other.device; + device == other.device && + metadata == other.metadata; } constexpr bool operator!=(const ExecEdge &other) const { diff --git a/dali/pipeline/executor/executor2/exec_graph_lowering.cc b/dali/pipeline/executor/executor2/exec_graph_lowering.cc index 9efff40a3e..bdb20adc68 100644 --- a/dali/pipeline/executor/executor2/exec_graph_lowering.cc +++ b/dali/pipeline/executor/executor2/exec_graph_lowering.cc @@ -50,7 +50,16 @@ void ExecGraph::Lower(const graph::OpGraph &def) { for (auto &consumer : out->consumers) { auto *exec_con = def2exec[consumer.op]; assert(exec_con != nullptr); - Link(&exec_node, o, exec_con, consumer.idx)->device = dev; + auto *edge = Link(&exec_node, o, exec_con, consumer.idx); + edge->device = dev; + if (consumer.op) { + auto &consumer_spec = consumer.op->spec; + auto &schema = consumer_spec.GetSchemaOrDefault(); + if (edge->consumer_input_idx < schema.MaxNumInput()) { // only regular inputs + if (schema.GetInputDevice(edge->consumer_input_idx) == InputDevice::Metadata) + edge->metadata = true; + } + } } exec_node.outputs[o].device = dev; } diff --git a/dali/pipeline/executor/executor2/exec_node_task.cc b/dali/pipeline/executor/executor2/exec_node_task.cc index bb4919944e..36360ce47b 100644 --- a/dali/pipeline/executor/executor2/exec_node_task.cc +++ b/dali/pipeline/executor/executor2/exec_node_task.cc @@ -278,16 +278,13 @@ void OpTask::SetWorkspaceInputs() { auto process_input = [&](int i, auto backend) { using Backend = decltype(backend); const auto &inp = TaskInput(ti); - // If the output order of the operator is `host` then we don't wait for GPU - // inputs - they can't be accessed directly on host and the operator will - // have to issue some sort of synchronization if and when necessary. - // This optimization is essential to avoid oversynchronization - // when the operator needs to access the metadata only (e.g. getting the shape). - if ((order.is_device() || std::is_same_v) /*see comment above */ && - inp.event && inp.order != order) + bool is_meta = node_->inputs[i]->metadata; + // metadata-only inputs don't need to be synchronized + if (!is_meta && inp.event && inp.order != order) events.insert(inp.event); - if (inp.order == order) { // use the input directly + // metadata-only inputs don't need a proper stream + if (inp.order == order || is_meta) { // use the input directly ws_->SetInput(i, inp.data); } else { // create another TL and set its order (and layout, while we're at it) auto tl = std::make_shared>(); @@ -477,7 +474,7 @@ tasking::SharedTask ExecNodeTask::CreateTask(ExecNode *node, const WorkspacePara } } -void ClearWorkspacePayload(Workspace &ws) { +void ClearWorkspacePayload(Workspace &ws, ExecNode &node) { auto event = ws.has_event() ? ws.event() : nullptr; for (int i = 0; i < ws.NumInput(); i++) { // TODO(michalz): Some smarter deletion management @@ -492,14 +489,16 @@ void ClearWorkspacePayload(Workspace &ws) { if (ws.InputIsType(i)) { if (auto &pinp = ws.InputPtr(i)) { auto &inp = *pinp; - if (inp.is_pinned() && event && inp.order() != ws.output_order()) + if (event && + !node.inputs[i]->metadata && + inp.is_pinned() && inp.order() != ws.output_order()) inp.order().wait(event); ws.SetInput(i, nullptr); } } else if (ws.InputIsType(i)) { if (auto &pinp = ws.InputPtr(i)) { auto &inp = *pinp; - if (event && inp.order() != ws.output_order()) + if (event && !node.inputs[i]->metadata && inp.order() != ws.output_order()) inp.order().wait(event); ws.SetInput(i, nullptr); } @@ -525,7 +524,7 @@ void ClearWorkspacePayload(Workspace &ws) { void ExecNodeTask::ClearWorkspace() { assert(ws_); - ClearWorkspacePayload(*ws_); + ClearWorkspacePayload(*ws_, *node_); } } // namespace exec2 diff --git a/dali/pipeline/executor/executor2/stream_assignment.h b/dali/pipeline/executor/executor2/stream_assignment.h index d5cfb43b1b..0c8a6a93bf 100644 --- a/dali/pipeline/executor/executor2/stream_assignment.h +++ b/dali/pipeline/executor/executor2/stream_assignment.h @@ -35,15 +35,15 @@ template class StreamAssignment; inline bool NeedsStream(const ExecNode *node) { - if (node->is_pipeline_output) { - for (auto &pipe_out : node->inputs) { - if (pipe_out->device == StorageDevice::GPU) + if (node->is_pipeline_output || node->backend == OpType::CPU) { + for (auto &input : node->inputs) { + if (input->device == StorageDevice::GPU && !input->metadata) return true; } + return false; } else { - return node->backend != OpType::CPU; + return true; } - return false; } inline OpType NodeType(const ExecNode *node) { @@ -117,6 +117,12 @@ class StreamAssignment { if (has_gpu_) return; // we already have both, nothing more can happen break; + case OpType::CPU: + if (NeedsStream(&node)) { // treat CPU nodes with GPU inputs as GPU + has_gpu_ = true; + if (has_mixed_) + return; + } default: break; } @@ -128,11 +134,14 @@ class StreamAssignment { * If the node is a Mixed node, it gets stream index 0. * If the node is a GPU node it gets stream index 1 if there are any mixed nodes, otherwise * the only stream is the GPU stream and the returned index is 0. + * CPU nodes get GPU stream if they need one (i.e. they have a GPU input) */ std::optional operator[](const ExecNode *node) const { switch (NodeType(node)) { case OpType::CPU: - return std::nullopt; + if (!NeedsStream(node)) + return std::nullopt; + // fall-through to GPU case OpType::GPU: return has_mixed_ ? 1 : 0; case OpType::MIXED: diff --git a/dali/pipeline/executor/executor2/stream_assignment_test.cc b/dali/pipeline/executor/executor2/stream_assignment_test.cc index b369a7284e..a202162648 100644 --- a/dali/pipeline/executor/executor2/stream_assignment_test.cc +++ b/dali/pipeline/executor/executor2/stream_assignment_test.cc @@ -48,6 +48,31 @@ DALI_REGISTER_OPERATOR(StreamAssignmentDummyOp, StreamAssignmentDummyOp, Mixed); DALI_REGISTER_OPERATOR(StreamAssignmentDummyOp, StreamAssignmentDummyOp, GPU); + +template +class StreamAssignmentMetaOp : public Operator { + public: + using Operator::Operator; + USE_OPERATOR_MEMBERS(); + + void RunImpl(Workspace &ws) override {} + bool SetupImpl(std::vector &output_desc, const Workspace &ws) override { + return false; + } +}; + +DALI_SCHEMA(StreamAssignmentMetaOp) + .NumInput(0, 999) + .InputDevice(0, 999, InputDevice::Metadata) + .NumOutput(0) + .AdditionalOutputsFn([](const OpSpec &spec) { + return spec.NumOutput(); + }); + +DALI_REGISTER_OPERATOR(StreamAssignmentMetaOp, StreamAssignmentMetaOp, CPU); +DALI_REGISTER_OPERATOR(StreamAssignmentMetaOp, StreamAssignmentMetaOp, Mixed); +DALI_REGISTER_OPERATOR(StreamAssignmentMetaOp, StreamAssignmentMetaOp, GPU); + namespace exec2 { namespace { @@ -71,6 +96,27 @@ OpSpec SpecMixed() { return SpecDev("mixed"); } + +OpSpec SpecMetaDev(const std::string &device) { + return OpSpec("StreamAssignmentMetaOp") + .AddArg("device", device) + .AddArg("num_threads", 1) + .AddArg("max_batch_size", 1); +} + +OpSpec SpecMetaGPU() { + return SpecMetaDev("gpu"); +} + +OpSpec SpecMetaCPU() { + return SpecMetaDev("cpu"); +} + +OpSpec SpecMetaMixed() { + return SpecMetaDev("mixed"); +} + + auto MakeNodeMap(const ExecGraph &graph) { std::map> map; for (auto &n : graph.Nodes()) @@ -122,6 +168,38 @@ TEST(Exec2Test, StreamAssignment_Single_CPUMixedGPU) { EXPECT_EQ(assignment[map["c"]], 0); } +template +void TestGPU2CPUAssignment() { + graph::OpGraph::Builder b; + b.Add("a", + SpecGPU() + .AddOutput("a->b", "gpu") + .AddOutput("a->c", "gpu")); + b.Add("b", + SpecCPU() + .AddInput("a->b", "gpu") + .AddOutput("b->out", "cpu")); + b.Add("c", + SpecMetaCPU() + .AddInput("a->c", "gpu") + .AddOutput("c->out", "cpu")); + b.AddOutput("b->out_cpu"); + b.AddOutput("c->out_cpu"); + auto g = std::move(b).GetGraph(true); + ExecGraph eg; + eg.Lower(g); + + StreamAssignment assignment(eg); + auto map = MakeNodeMap(eg); + EXPECT_EQ(assignment[map["a"]], 0); + EXPECT_EQ(assignment[map["b"]], 0); // CPU operator with GPU input + EXPECT_EQ(assignment[map["c"]], std::nullopt); // metadata-only +} + +TEST(Exec2Test, StreamAssignment_Single_GPU2CPU) { + TestGPU2CPUAssignment(); +} + TEST(Exec2Test, StreamAssignment_PerBackend_OnlyCPU) { graph::OpGraph::Builder b; @@ -194,6 +272,13 @@ TEST(Exec2Test, StreamAssignment_PerBackend_CPUMixedGPU) { } +TEST(Exec2Test, StreamAssignment_PerBackend_GPU2CPU) { + TestGPU2CPUAssignment(); +} + +TEST(Exec2Test, StreamAssignment_OperOperator_GPU2CPU) { + TestGPU2CPUAssignment(); +} TEST(Exec2Test, StreamAssignment_PerOperator_1) { ExecGraph eg; @@ -272,7 +357,7 @@ TEST(Exec2Test, StreamAssignment_PerOperator_2) { SpecGPU() .AddOutput("i->j", "gpu")); b.Add("j", - SpecCPU() + SpecMetaCPU() .AddInput("i->j", "gpu") .AddOutput("j->h", "cpu")); b.Add("b", @@ -320,7 +405,7 @@ TEST(Exec2Test, StreamAssignment_PerOperator_2) { StreamAssignment assignment(eg); auto map = MakeNodeMap(eg); EXPECT_EQ(assignment[map["a"]], 0); - EXPECT_EQ(assignment[map["b"]], std::nullopt); + EXPECT_EQ(assignment[map["b"]], 0); // CPU operator with a GPU input needs a stream EXPECT_EQ(assignment[map["c"]], 0); EXPECT_EQ(assignment[map["d"]], 0); EXPECT_EQ(assignment[map["e"]], 1); @@ -328,7 +413,7 @@ TEST(Exec2Test, StreamAssignment_PerOperator_2) { EXPECT_EQ(assignment[map["g"]], 0); EXPECT_EQ(assignment[map["h"]], 0); EXPECT_EQ(assignment[map["i"]], 2); - EXPECT_EQ(assignment[map["j"]], std::nullopt); + EXPECT_EQ(assignment[map["j"]], std::nullopt); // metadata only EXPECT_EQ(assignment[map["k"]], 3); } diff --git a/dali/pipeline/operator/op_schema.h b/dali/pipeline/operator/op_schema.h index 19de8cb132..609e86762e 100644 --- a/dali/pipeline/operator/op_schema.h +++ b/dali/pipeline/operator/op_schema.h @@ -63,11 +63,34 @@ struct TensorArgDesc { }; enum class InputDevice : uint8_t { + /** CPU for CPU and Mixed operators; GPU for GPU operators. */ MatchBackend = 0, - CPU = 1, - GPU = 2, - Any = 3, - MatchBackendOrCPU = 4 + + /** Always CPU */ + CPU, + + /** Always GPU */ + GPU, + + /** Any kind of input device, regardless of operator's backend */ + Any, + + /** CPU for CPU and Mixed; anything for GPU */ + MatchBackendOrCPU, + + /** Any backend, but the operator will not access the actual data. + * + * This is useful for operators that only look at the metadata of the input - things like shape, + * source info, dtype, etc. + * + * Specifying this flag allows the executor to skip synchronization or even to provide + * a tensor without actual payload. + * It is forbidden to: + * - look at the data inside the operator (`data`, `raw_data`, etc) + * - copy the input data (`TensorList::Copy` or calling copy on the samples) + * - forward the input or its parts to the output with ShareData, SetSample or similar. + */ + Metadata, }; class DLL_PUBLIC OpSchema { diff --git a/dali/pipeline/operator/op_spec.cc b/dali/pipeline/operator/op_spec.cc index 398bb274f3..d88eb818ab 100644 --- a/dali/pipeline/operator/op_spec.cc +++ b/dali/pipeline/operator/op_spec.cc @@ -31,6 +31,7 @@ inline bool IsCompatibleDevice(StorageDevice provided, InputDevice required, OpT case InputDevice::MatchBackendOrCPU: return op_device == OpType::CPU ? provided == StorageDevice::CPU : true; case InputDevice::Any: + case InputDevice::Metadata: return true; default: return false; @@ -48,6 +49,7 @@ inline std::string ValidDevices(InputDevice required, OpType op_device) { case InputDevice::MatchBackendOrCPU: return op_device == OpType::GPU ? "\"gpu\" or \"cpu\"" : "\"cpu\""; case InputDevice::Any: + case InputDevice::Metadata: return "\"gpu\" or \"cpu\""; default: assert(!"Unrechable");