Skip to content

Commit

Permalink
Add metadata-only inputs. (#5635)
Browse files Browse the repository at this point in the history
* Assign a stream to CPU inputs with GPU (non-metadata) inputs
* Add Metadata input device - this declares that the input is used for metadata (shape, dtype, etc) access only
* Don't synchronize metadata inputs in executor.
* Don't prolong the lifetime of metadata-only inputs.
* Add InputDevice specifier to random number generators shape_like input.

---------

Signed-off-by: Michał Zientkiewicz <[email protected]>
  • Loading branch information
mzient committed Sep 18, 2024
1 parent 8904209 commit 94f02ad
Show file tree
Hide file tree
Showing 15 changed files with 170 additions and 34 deletions.
2 changes: 1 addition & 1 deletion dali/operators/generic/cast.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ DALI_SCHEMA(Cast)
DALI_SCHEMA(CastLike)
.DocStr("Cast the first tensor to the type of the second tensor.")
.NumInput(2)
.InputDevice(1, InputDevice::Any)
.InputDevice(1, InputDevice::Metadata)
.NumOutput(1)
.AllowSequences()
.SupportVolumetric();
Expand Down
6 changes: 3 additions & 3 deletions dali/operators/generic/constant_value.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ DALI_SCHEMA(FullLike)
.DocStr(R"code(Returns new data with the same shape and type as the input data, filled with a `fill_value`.)code")
.NumInput(2)
.InputDox(0, "data_like", "TensorList", R"code(The input data value to copy the shape and type from.)code")
.InputDevice(0, InputDevice::Any)
.InputDevice(0, InputDevice::Metadata)
.InputDox(1, "fill_value", "TensorList", R"code(The fill value.)code")
.NumOutput(1);
DALI_REGISTER_OPERATOR(FullLike, FullLike<CPUBackend>, CPU);
Expand All @@ -102,7 +102,7 @@ DALI_SCHEMA(ZerosLike)
.DocStr(R"code(Returns new data with the same shape and type as the input array, filled with zeros.)code")
.NumInput(1)
.InputDox(0, "data_like", "TensorList", R"code(The input data value to copy the shape and type from.)code")
.InputDevice(0, InputDevice::Any)
.InputDevice(0, InputDevice::Metadata)
.NumOutput(1)
.AddOptionalTypeArg("dtype", R"code(Overrides the output data type.)code", DALI_INT32);
DALI_REGISTER_OPERATOR(ZerosLike, ZerosLike<CPUBackend>, CPU);
Expand All @@ -120,7 +120,7 @@ DALI_SCHEMA(OnesLike)
.DocStr(R"code(Returns new data with the same shape and type as the input array, filled with ones.)code")
.NumInput(1)
.InputDox(0, "data_like", "TensorList", R"code(The input data value to copy the shape and type from.)code")
.InputDevice(0, InputDevice::Any)
.InputDevice(0, InputDevice::Metadata)
.NumOutput(1)
.AddOptionalTypeArg("dtype", R"code(Overrides the output data type.)code", DALI_INT32);
DALI_REGISTER_OPERATOR(OnesLike, OnesLike<CPUBackend>, CPU);
Expand Down
2 changes: 1 addition & 1 deletion dali/operators/generic/shapes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ namespace dali {
DALI_SCHEMA(Shapes)
.DocStr(R"code(Returns the shapes of inputs.)code")
.NumInput(1)
.InputDevice(0, InputDevice::Any)
.InputDevice(0, InputDevice::Metadata)
.NumOutput(1)
.AllowSequences()
.SupportVolumetric()
Expand Down
1 change: 1 addition & 0 deletions dali/operators/random/beta_distribution_cpu.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ a single value per sample is generated.
.NumInput(0, 1)
.InputDox(0, "shape_like", "TensorList",
"Shape of this input will be used to infer the shape of the output, if provided.")
.InputDevice(0, InputDevice::Metadata)
.NumOutput(1)
.AddOptionalArg("alpha", R"code(The alpha parameter, a positive ``float32`` scalar.)code", 1.0f,
true)
Expand Down
1 change: 1 addition & 0 deletions dali/operators/random/choice_cpu.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ that is: :meth:`nvidia.dali.types.DALIDataType`, :meth:`nvidia.dali.types.DALIIm
"Otherwise ``__a`` is treated as 1D array of input samples.")
.InputDox(1, "shape_like", "TensorList",
"Shape of this input will be used to infer the shape of the output, if provided.")
.InputDevice(1, InputDevice::Metadata)
.NumOutput(1)
.AddOptionalArg<std::vector<float>>("p",
"Distribution of the probabilities. "
Expand Down
2 changes: 2 additions & 0 deletions dali/operators/random/coin_flip_cpu.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ a single value per sample is generated.
.NumInput(0, 1)
.InputDox(0, "shape_like", "TensorList",
"Shape of this input will be used to infer the shape of the output, if provided.")
.InputDevice(0, InputDevice::Metadata)
.NumOutput(1)
.AddOptionalArg<float>("probability",
R"code(Probability of value 1.)code",
Expand All @@ -51,6 +52,7 @@ sample is generated.
.NumInput(0, 1)
.InputDox(0, "shape_like", "TensorList",
"Shape of this input will be used to infer the shape of the output, if provided.")
.InputDevice(0, InputDevice::Metadata)
.NumOutput(1)
.AddParent("random__CoinFlip")
.Deprecate("random__CoinFlip"); // Deprecated in 0.30
Expand Down
4 changes: 2 additions & 2 deletions dali/operators/random/normal_distribution_cpu.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ a single value per sample is generated.
.NumInput(0, 1)
.InputDox(0, "shape_like", "TensorList",
"Shape of this input will be used to infer the shape of the output, if provided.")
.InputDevice(0, InputDevice::Any)
.InputDevice(0, InputDevice::Metadata)
.NumOutput(1)
.AddOptionalArg<float>("mean",
R"code(Mean of the distribution.)code",
Expand All @@ -51,7 +51,7 @@ a single value per sample is generated.
.NumInput(0, 1)
.InputDox(0, "shape_like", "TensorList",
"Shape of this input will be used to infer the shape of the output, if provided.")
.InputDevice(0, InputDevice::Any)
.InputDevice(0, InputDevice::Metadata)
.NumOutput(1)
.AddParent("random__Normal")
.Deprecate("random__Normal"); // Deprecated in 0.30
Expand Down
2 changes: 2 additions & 0 deletions dali/operators/random/uniform_distribution_cpu.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ a single value per sample is generated.
.NumInput(0, 1)
.InputDox(0, "shape_like", "TensorList",
"Shape of this input will be used to infer the shape of the output, if provided.")
.InputDevice(0, InputDevice::Metadata)
.NumOutput(1)
.AddOptionalArg("range",
R"code(Range ``[min, max)`` of a continuous uniform distribution.
Expand Down Expand Up @@ -67,6 +68,7 @@ a single value per sample is generated.
.NumInput(0, 1)
.InputDox(0, "shape_like", "TensorList",
"Shape of this input will be used to infer the shape of the output, if provided.")
.InputDevice(0, InputDevice::Metadata)
.NumOutput(1)
.AddOptionalArg("range",
R"code(Range ``[min, max)`` of a continuous uniform distribution.
Expand Down
5 changes: 4 additions & 1 deletion dali/pipeline/executor/executor2/exec_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,16 @@ struct ExecEdge {
/** The index of the input in OpSpec. It matches the edge's index in consumer->inputs. */
int consumer_input_idx = 0;
StorageDevice device = {};
/** The input passes only the metadata, skipping stream synchronization. */
bool metadata = false;

constexpr bool operator==(const ExecEdge &other) const {
return producer == other.producer &&
consumer == other.consumer &&
producer_output_idx == other.producer_output_idx &&
consumer_input_idx == other.consumer_input_idx &&
device == other.device;
device == other.device &&
metadata == other.metadata;
}

constexpr bool operator!=(const ExecEdge &other) const {
Expand Down
11 changes: 10 additions & 1 deletion dali/pipeline/executor/executor2/exec_graph_lowering.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,16 @@ void ExecGraph::Lower(const graph::OpGraph &def) {
for (auto &consumer : out->consumers) {
auto *exec_con = def2exec[consumer.op];
assert(exec_con != nullptr);
Link(&exec_node, o, exec_con, consumer.idx)->device = dev;
auto *edge = Link(&exec_node, o, exec_con, consumer.idx);
edge->device = dev;
if (consumer.op) {
auto &consumer_spec = consumer.op->spec;
auto &schema = consumer_spec.GetSchemaOrDefault();
if (edge->consumer_input_idx < schema.MaxNumInput()) { // only regular inputs
if (schema.GetInputDevice(edge->consumer_input_idx) == InputDevice::Metadata)
edge->metadata = true;
}
}
}
exec_node.outputs[o].device = dev;
}
Expand Down
23 changes: 11 additions & 12 deletions dali/pipeline/executor/executor2/exec_node_task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -278,16 +278,13 @@ void OpTask::SetWorkspaceInputs() {
auto process_input = [&](int i, auto backend) {
using Backend = decltype(backend);
const auto &inp = TaskInput<Backend>(ti);
// If the output order of the operator is `host` then we don't wait for GPU
// inputs - they can't be accessed directly on host and the operator will
// have to issue some sort of synchronization if and when necessary.
// This optimization is essential to avoid oversynchronization
// when the operator needs to access the metadata only (e.g. getting the shape).
if ((order.is_device() || std::is_same_v<Backend, CPUBackend>) /*see comment above */ &&
inp.event && inp.order != order)
bool is_meta = node_->inputs[i]->metadata;
// metadata-only inputs don't need to be synchronized
if (!is_meta && inp.event && inp.order != order)
events.insert(inp.event);

if (inp.order == order) { // use the input directly
// metadata-only inputs don't need a proper stream
if (inp.order == order || is_meta) { // use the input directly
ws_->SetInput(i, inp.data);
} else { // create another TL and set its order (and layout, while we're at it)
auto tl = std::make_shared<TensorList<Backend>>();
Expand Down Expand Up @@ -477,7 +474,7 @@ tasking::SharedTask ExecNodeTask::CreateTask(ExecNode *node, const WorkspacePara
}
}

void ClearWorkspacePayload(Workspace &ws) {
void ClearWorkspacePayload(Workspace &ws, ExecNode &node) {
auto event = ws.has_event() ? ws.event() : nullptr;
for (int i = 0; i < ws.NumInput(); i++) {
// TODO(michalz): Some smarter deletion management
Expand All @@ -492,14 +489,16 @@ void ClearWorkspacePayload(Workspace &ws) {
if (ws.InputIsType<CPUBackend>(i)) {
if (auto &pinp = ws.InputPtr<CPUBackend>(i)) {
auto &inp = *pinp;
if (inp.is_pinned() && event && inp.order() != ws.output_order())
if (event &&
!node.inputs[i]->metadata &&
inp.is_pinned() && inp.order() != ws.output_order())
inp.order().wait(event);
ws.SetInput<CPUBackend>(i, nullptr);
}
} else if (ws.InputIsType<GPUBackend>(i)) {
if (auto &pinp = ws.InputPtr<GPUBackend>(i)) {
auto &inp = *pinp;
if (event && inp.order() != ws.output_order())
if (event && !node.inputs[i]->metadata && inp.order() != ws.output_order())
inp.order().wait(event);
ws.SetInput<GPUBackend>(i, nullptr);
}
Expand All @@ -525,7 +524,7 @@ void ClearWorkspacePayload(Workspace &ws) {

void ExecNodeTask::ClearWorkspace() {
assert(ws_);
ClearWorkspacePayload(*ws_);
ClearWorkspacePayload(*ws_, *node_);
}

} // namespace exec2
Expand Down
21 changes: 15 additions & 6 deletions dali/pipeline/executor/executor2/stream_assignment.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ template <StreamPolicy policy>
class StreamAssignment;

inline bool NeedsStream(const ExecNode *node) {
if (node->is_pipeline_output) {
for (auto &pipe_out : node->inputs) {
if (pipe_out->device == StorageDevice::GPU)
if (node->is_pipeline_output || node->backend == OpType::CPU) {
for (auto &input : node->inputs) {
if (input->device == StorageDevice::GPU && !input->metadata)
return true;
}
return false;
} else {
return node->backend != OpType::CPU;
return true;
}
return false;
}

inline OpType NodeType(const ExecNode *node) {
Expand Down Expand Up @@ -117,6 +117,12 @@ class StreamAssignment<StreamPolicy::PerBackend> {
if (has_gpu_)
return; // we already have both, nothing more can happen
break;
case OpType::CPU:
if (NeedsStream(&node)) { // treat CPU nodes with GPU inputs as GPU
has_gpu_ = true;
if (has_mixed_)
return;
}
default:
break;
}
Expand All @@ -128,11 +134,14 @@ class StreamAssignment<StreamPolicy::PerBackend> {
* If the node is a Mixed node, it gets stream index 0.
* If the node is a GPU node it gets stream index 1 if there are any mixed nodes, otherwise
* the only stream is the GPU stream and the returned index is 0.
* CPU nodes get GPU stream if they need one (i.e. they have a GPU input)
*/
std::optional<int> operator[](const ExecNode *node) const {
switch (NodeType(node)) {
case OpType::CPU:
return std::nullopt;
if (!NeedsStream(node))
return std::nullopt;
// fall-through to GPU
case OpType::GPU:
return has_mixed_ ? 1 : 0;
case OpType::MIXED:
Expand Down
91 changes: 88 additions & 3 deletions dali/pipeline/executor/executor2/stream_assignment_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,31 @@ DALI_REGISTER_OPERATOR(StreamAssignmentDummyOp, StreamAssignmentDummyOp<CPUBacke
DALI_REGISTER_OPERATOR(StreamAssignmentDummyOp, StreamAssignmentDummyOp<MixedBackend>, Mixed);
DALI_REGISTER_OPERATOR(StreamAssignmentDummyOp, StreamAssignmentDummyOp<GPUBackend>, GPU);


template <typename Backend>
class StreamAssignmentMetaOp : public Operator<Backend> {
public:
using Operator<Backend>::Operator;
USE_OPERATOR_MEMBERS();

void RunImpl(Workspace &ws) override {}
bool SetupImpl(std::vector<OutputDesc> &output_desc, const Workspace &ws) override {
return false;
}
};

DALI_SCHEMA(StreamAssignmentMetaOp)
.NumInput(0, 999)
.InputDevice(0, 999, InputDevice::Metadata)
.NumOutput(0)
.AdditionalOutputsFn([](const OpSpec &spec) {
return spec.NumOutput();
});

DALI_REGISTER_OPERATOR(StreamAssignmentMetaOp, StreamAssignmentMetaOp<CPUBackend>, CPU);
DALI_REGISTER_OPERATOR(StreamAssignmentMetaOp, StreamAssignmentMetaOp<MixedBackend>, Mixed);
DALI_REGISTER_OPERATOR(StreamAssignmentMetaOp, StreamAssignmentMetaOp<GPUBackend>, GPU);

namespace exec2 {

namespace {
Expand All @@ -71,6 +96,27 @@ OpSpec SpecMixed() {
return SpecDev("mixed");
}


OpSpec SpecMetaDev(const std::string &device) {
return OpSpec("StreamAssignmentMetaOp")
.AddArg("device", device)
.AddArg("num_threads", 1)
.AddArg("max_batch_size", 1);
}

OpSpec SpecMetaGPU() {
return SpecMetaDev("gpu");
}

OpSpec SpecMetaCPU() {
return SpecMetaDev("cpu");
}

OpSpec SpecMetaMixed() {
return SpecMetaDev("mixed");
}


auto MakeNodeMap(const ExecGraph &graph) {
std::map<std::string_view, const ExecNode *, std::less<>> map;
for (auto &n : graph.Nodes())
Expand Down Expand Up @@ -122,6 +168,38 @@ TEST(Exec2Test, StreamAssignment_Single_CPUMixedGPU) {
EXPECT_EQ(assignment[map["c"]], 0);
}

template <StreamPolicy policy>
void TestGPU2CPUAssignment() {
graph::OpGraph::Builder b;
b.Add("a",
SpecGPU()
.AddOutput("a->b", "gpu")
.AddOutput("a->c", "gpu"));
b.Add("b",
SpecCPU()
.AddInput("a->b", "gpu")
.AddOutput("b->out", "cpu"));
b.Add("c",
SpecMetaCPU()
.AddInput("a->c", "gpu")
.AddOutput("c->out", "cpu"));
b.AddOutput("b->out_cpu");
b.AddOutput("c->out_cpu");
auto g = std::move(b).GetGraph(true);
ExecGraph eg;
eg.Lower(g);

StreamAssignment<policy> assignment(eg);
auto map = MakeNodeMap(eg);
EXPECT_EQ(assignment[map["a"]], 0);
EXPECT_EQ(assignment[map["b"]], 0); // CPU operator with GPU input
EXPECT_EQ(assignment[map["c"]], std::nullopt); // metadata-only
}

TEST(Exec2Test, StreamAssignment_Single_GPU2CPU) {
TestGPU2CPUAssignment<StreamPolicy::Single>();
}


TEST(Exec2Test, StreamAssignment_PerBackend_OnlyCPU) {
graph::OpGraph::Builder b;
Expand Down Expand Up @@ -194,6 +272,13 @@ TEST(Exec2Test, StreamAssignment_PerBackend_CPUMixedGPU) {
}


TEST(Exec2Test, StreamAssignment_PerBackend_GPU2CPU) {
TestGPU2CPUAssignment<StreamPolicy::PerBackend>();
}

TEST(Exec2Test, StreamAssignment_OperOperator_GPU2CPU) {
TestGPU2CPUAssignment<StreamPolicy::PerOperator>();
}

TEST(Exec2Test, StreamAssignment_PerOperator_1) {
ExecGraph eg;
Expand Down Expand Up @@ -272,7 +357,7 @@ TEST(Exec2Test, StreamAssignment_PerOperator_2) {
SpecGPU()
.AddOutput("i->j", "gpu"));
b.Add("j",
SpecCPU()
SpecMetaCPU()
.AddInput("i->j", "gpu")
.AddOutput("j->h", "cpu"));
b.Add("b",
Expand Down Expand Up @@ -320,15 +405,15 @@ TEST(Exec2Test, StreamAssignment_PerOperator_2) {
StreamAssignment<StreamPolicy::PerOperator> assignment(eg);
auto map = MakeNodeMap(eg);
EXPECT_EQ(assignment[map["a"]], 0);
EXPECT_EQ(assignment[map["b"]], std::nullopt);
EXPECT_EQ(assignment[map["b"]], 0); // CPU operator with a GPU input needs a stream
EXPECT_EQ(assignment[map["c"]], 0);
EXPECT_EQ(assignment[map["d"]], 0);
EXPECT_EQ(assignment[map["e"]], 1);
EXPECT_EQ(assignment[map["f"]], 0);
EXPECT_EQ(assignment[map["g"]], 0);
EXPECT_EQ(assignment[map["h"]], 0);
EXPECT_EQ(assignment[map["i"]], 2);
EXPECT_EQ(assignment[map["j"]], std::nullopt);
EXPECT_EQ(assignment[map["j"]], std::nullopt); // metadata only
EXPECT_EQ(assignment[map["k"]], 3);
}

Expand Down
Loading

0 comments on commit 94f02ad

Please sign in to comment.