Skip to content

Commit

Permalink
.cpu() works with experimental executor.
Browse files Browse the repository at this point in the history
Signed-off-by: Michał Zientkiewicz <[email protected]>
  • Loading branch information
mzient committed Sep 2, 2024
1 parent 93d592e commit 99b44cd
Show file tree
Hide file tree
Showing 9 changed files with 141 additions and 197 deletions.
33 changes: 31 additions & 2 deletions dali/pipeline/operator/builtin/copy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,30 @@
namespace dali {

template <>
void Copy<CPUBackend>::RunCopies(Workspace &ws) {
scatter_gather_.Run(ws.GetThreadPool(), true);
void Copy<CPUBackend, CPUBackend>::RunImpl(Workspace &ws) {
auto &input = ws.Input<CPUBackend>(0);
auto &output = ws.Output<CPUBackend>(0);

int batch_size = input.num_samples();
output.SetLayout(input.GetLayout());
auto shapes = input.shape();

auto &thread_pool = ws.GetThreadPool();
for (int sample_id = 0; sample_id < batch_size; ++sample_id) {
thread_pool.AddWork(
[sample_id, &input, &output](int tid) {
output.CopySample(sample_id, input, sample_id, AccessOrder::host());
},
shapes.tensor_size(sample_id));
}
thread_pool.RunAll();
}

DALI_REGISTER_OPERATOR(Copy, Copy<CPUBackend>, CPU);
DALI_REGISTER_OPERATOR(Copy, Copy<GPUBackend>, GPU);

using CopyD2H = Copy<CPUBackend, GPUBackend>;
DALI_REGISTER_OPERATOR(CopyD2H, CopyD2H, CPU);

DALI_SCHEMA(Copy)
.DocStr("Creates a copy of the input tensor.")
Expand All @@ -30,4 +49,14 @@ DALI_SCHEMA(Copy)
.AllowSequences()
.SupportVolumetric();


DALI_SCHEMA(CopyD2H)
.DocStr("Creates a copy of the input tensor.")
.NumInput(1)
.InputDevice(0, InputDevice::GPU)
.NumOutput(1)
.AllowSequences()
.SupportVolumetric();


} // namespace dali
27 changes: 0 additions & 27 deletions dali/pipeline/operator/builtin/copy.cu

This file was deleted.

37 changes: 9 additions & 28 deletions dali/pipeline/operator/builtin/copy.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,11 @@

namespace dali {

template <typename Backend>
class Copy : public StatelessOperator<Backend> {
template <typename DstBackend, typename SrcBackend = DstBackend>
class Copy : public StatelessOperator<DstBackend> {
public:
inline explicit Copy(const OpSpec &spec) :
StatelessOperator<Backend>(spec), scatter_gather_(kMaxSizePerBlock) {}

inline ~Copy() override = default;
explicit Copy(const OpSpec &spec) :
StatelessOperator<DstBackend>(spec) {}

DISABLE_COPY_MOVE_ASSIGN(Copy);

Expand All @@ -42,37 +40,20 @@ class Copy : public StatelessOperator<Backend> {

bool SetupImpl(std::vector<OutputDesc> &output_desc, const Workspace &ws) override {
output_desc.resize(1);
const auto &input = ws.Input<Backend>(0);
const auto &input = ws.Input<SrcBackend>(0);
output_desc[0].type = input.type();
output_desc[0].shape = input.shape();
return true;
}

void RunImpl(Workspace &ws) override {
auto &input = ws.Input<Backend>(0);
auto data_type_size = input.type_info().size();
auto &output = ws.Output<Backend>(0);
output.SetLayout(input.GetLayout());
for (int i = 0; i < input.num_samples(); i++) {
auto tensor_shape = input.tensor_shape(i);
auto tensor_size = volume(tensor_shape);
scatter_gather_.AddCopy(output.raw_mutable_tensor(i), input.raw_tensor(i),
tensor_size * data_type_size);
}
RunCopies(ws);
auto &input = ws.Input<SrcBackend>(0);
auto &output = ws.Output<DstBackend>(0);
output.Copy(input, ws.output_order());
}

void RunCopies(Workspace &ws);

std::conditional_t<
std::is_same<Backend, CPUBackend>::value,
kernels::ScatterGatherCPU,
kernels::ScatterGatherGPU> scatter_gather_;
// 256 kB per block for GPU
static constexpr size_t kMaxSizePerBlock =
std::is_same<Backend, CPUBackend>::value ? kernels::ScatterGatherCPU::kAnyBlockSize : 1 << 18;
};


} // namespace dali

#endif // DALI_PIPELINE_OPERATOR_BUILTIN_COPY_H_
151 changes: 60 additions & 91 deletions dali/pipeline/pipeline.cc
Original file line number Diff line number Diff line change
Expand Up @@ -329,43 +329,14 @@ int Pipeline::AddOperatorImpl(const OpSpec &const_spec, const std::string &inst_
make_string("Data node \"", input_name, "\" requested as ", FormatInput(spec, i),
" to operator \"", inst_name, "\" is not known to the pipeline."));

// Table of possible scenarios:
// Op location / requested input type / data location
// cpu / cpu / cpu -> everything is fine
// cpu / cpu / gpu -> error, data does not exist on cpu
// cpu / gpu / cpu -> error, cpu op not allowed to have gpu inputs
// cpu / gpu / gpu -> both of above errors
// gpu / cpu / cpu -> need to use contiguous version
// gpu / cpu / gpu -> error, data not in specified location
// gpu / gpu / cpu -> need to insert copy to device
// gpu / gpu / gpu -> everything is fine
// mixed / cpu / cpu -> everything is fine
// mixed / cpu / gpu -> error, data does not exist on cpu
// mixed / gpu / cpu -> error, mixed op not allowed to have gpu inputs
// mixed / gpu / gpu -> both of above errors
if (device == "cpu" || device == "mixed") {
DALI_ENFORCE(input_device == "cpu",
make_string("Error while specifying ", FormatInput(spec, i),
". CPU/Mixed ops can only take CPU inputs. CPU operator cannot "
"follow GPU operator. "));
DALI_ENFORCE(it->second.has_cpu,
make_string("Error while specifying ", FormatInput(spec, i),
". CPU input requested by operator exists only on GPU. CPU "
"operator cannot follow GPU operator."));
DALI_ENFORCE(device_id_ != CPU_ONLY_DEVICE_ID || device == "cpu",
"Cannot add a Mixed operator with a GPU output, 'device_id' "
"should not be `CPU_ONLY_DEVICE_ID`.");
}
DALI_ENFORCE(device_id_ != CPU_ONLY_DEVICE_ID || device == "cpu",
"Cannot add a Mixed operator with a GPU output, 'device_id' "
"should not be `CPU_ONLY_DEVICE_ID`.");

if (input_device == "gpu") {
SetupGPUInput(it);
ToGPU(it);
} else {
// device == gpu
// TODO(michalz): Add a D2H copy instead
DALI_ENFORCE(it->second.has_cpu,
make_string("Error while specifying ", FormatInput(spec, i),
". CPU input requested by operator exists only on GPU. CPU "
"operator cannot follow GPU operator."));
ToCPU(it);
}
}

Expand All @@ -379,12 +350,7 @@ int Pipeline::AddOperatorImpl(const OpSpec &const_spec, const std::string &inst_
make_string("Data node \"", input_name, "\" requested as ", FormatArgument(spec, arg_name),
" to operator \"", inst_name, "\" is not known to the pipeline."));

if (!it->second.has_cpu) {
assert(it->second.has_gpu);
DALI_FAIL(make_string("Error while specifying ", FormatArgument(spec, arg_name),
". Named argument inputs to operators must be CPU data nodes. "
"However, a GPU data node was provided."));
}
ToCPU(it);
}

// Verify and record the outputs of the op
Expand All @@ -398,30 +364,14 @@ int Pipeline::AddOperatorImpl(const OpSpec &const_spec, const std::string &inst_
make_string("Error while specifying ", FormatOutput(spec, i), ". Output name \"",
output_name, "\" conflicts with an existing intermediate result name."));

// Validate output data conforms to graph constraints
// Note: DALI CPU -> GPU flow is enforced, when the operators are added via the Python layer
// in `generate_outputs` - the output_device is calculated and assigned to DataNode.
// TODO(michalz): Remove this constraint! Insert GPU->CPU copy instead.
bool mark_explicitly_contiguous = false;
if (device == "cpu") {
DALI_ENFORCE(output_device == "cpu",
make_string("Error while specifying ", FormatOutput(spec, i),
". Only CPU operators can produce CPU outputs."));
} else if (device == "gpu") {
if (output_device == "cpu") {
mark_explicitly_contiguous = true;
}
}

// The edge describes that the named output of this operator produces the CPU or GPU data,
// the former for "cpu" ops, the latter for "mixed" and "gpu" (see Note above about the DALI
// CPU -> GPU flow).
// There are exceptions: we can have CPU output from Mixed MakeContiguous - see
// [cpu output of mixed] where we break the constraints from Python frontend.
// the former for "cpu" ops, the latter for "mixed" and "gpu".
EdgeMeta meta = NewEdge(output_device);
if (mark_explicitly_contiguous) {
meta.has_contiguous = true;
}

DALI_ENFORCE(edge_names_.insert({output_name, meta}).second,
make_string("Error while specifying ", FormatOutput(spec, i), "node name: \"",
Expand Down Expand Up @@ -518,21 +468,6 @@ void Pipeline::Build(std::vector<PipelineOutputDesc> output_descs) {
executor_->EnableCheckpointing(checkpointing_);
executor_->Init();

// Creating the graph

for (auto& name_op_spec : op_specs_) {
string& inst_name = name_op_spec.instance_name;
OpSpec op_spec = name_op_spec.spec;
PrepareOpSpec(&op_spec, name_op_spec.logical_id);
try {
graph_builder_.Add(inst_name, op_spec);
} catch (...) {
PropagateError({std::current_exception(),
"Critical error when building pipeline:\n" + GetErrorContextMessage(op_spec),
"\nCurrent pipeline object is no longer valid."});
}
}

// Validate the output tensors names
vector<string> outputs;
for (const auto &out_desc : output_descs_) {
Expand All @@ -543,14 +478,17 @@ void Pipeline::Build(std::vector<PipelineOutputDesc> output_descs) {
name + "' is not known to the pipeline.");

if (device == "cpu") {
DALI_ENFORCE(it->second.has_cpu, "Requested cpu output '" +
name + "' only exists on gpu.");
// Add a make contiguous op to produce this output - we need pipeline outputs to be dense.
auto output_name = AddMakeContiguousNode(it->second, name, "cpu", "cpu", "cpu");
if (!it->second.has_contiguous) {
it->second.has_contiguous = true;
if (!it->second.has_cpu)
ToCPU(it);

if (!it->second.has_contiguous_cpu) {
// Add a make contiguous op to produce this output - we need pipeline outputs to be dense.
auto output_name = AddMakeContiguousNode(it->second, name, "cpu", "cpu", "cpu");
outputs.push_back(output_name);
} else {
outputs.push_back(it->first + "_cpu");
}
outputs.push_back(output_name);

} else if (device == "gpu") {
DALI_ENFORCE(device_id_ != CPU_ONLY_DEVICE_ID,
make_string(
Expand All @@ -559,23 +497,36 @@ void Pipeline::Build(std::vector<PipelineOutputDesc> output_descs) {
"is set to `CPU_ONLY_DEVICE_ID`. Set 'device_id' "
"to a valid GPU identifier to enable GPU features "
"in the pipeline."));
if (!it->second.has_gpu) {
DALI_ENFORCE(it->second.has_cpu, "Output '" + name +
"' exists on neither cpu or gpu, internal error");
// Add a copy to device to create the gpu output
auto output_name = AddMakeContiguousNode(it->second, name, "cpu", "mixed", "gpu");
outputs.push_back(output_name);
} else {
// Add an optional copy/pass through to normalize the output.
if (!it->second.has_gpu)
ToGPU(it);

if (!it->second.has_contiguous_gpu) {
auto output_name = AddMakeContiguousNode(it->second, name, "gpu", "gpu", "gpu");
outputs.push_back(output_name);
} else {
outputs.push_back(it->first + "_gpu");
}
} else {
DALI_FAIL("Invalid device argument \"" + device +
"\". Valid options are \"cpu\" or \"gpu\"");
}
}

// Creating the graph

for (auto& name_op_spec : op_specs_) {
string& inst_name = name_op_spec.instance_name;
OpSpec op_spec = name_op_spec.spec;
PrepareOpSpec(&op_spec, name_op_spec.logical_id);
try {
graph_builder_.Add(inst_name, op_spec);
} catch (...) {
PropagateError({std::current_exception(),
"Critical error when building pipeline:\n" + GetErrorContextMessage(op_spec),
"\nCurrent pipeline object is no longer valid."});
}
}

for (auto &out : outputs)
graph_builder_.AddOutput(out);

Expand Down Expand Up @@ -677,8 +628,25 @@ void Pipeline::ReleaseOutputs() {
}
}

void Pipeline::SetupGPUInput(std::map<string, EdgeMeta>::iterator it) {
if (it->second.has_gpu) return;
void Pipeline::ToCPU(std::map<string, EdgeMeta>::iterator it) {
// Insert a D2H copy, if needed
if (it->second.has_cpu)
return;
OpSpec copy_to_host_spec =
OpSpec("CopyD2H")
.AddArg("device", "cpu")
.AddInput(it->first, "gpu")
.AddOutput(it->first, "cpu");
// don't put it into op_specs_for_serialization_, only op_specs_
AddToOpSpecs("__Copy_GpuToCpu_" + it->first, copy_to_host_spec, GetNextInternalLogicalId());
it->second.has_cpu = true;
it->second.has_contiguous_cpu = true; // the result is always contiguous
}

void Pipeline::ToGPU(std::map<string, EdgeMeta>::iterator it) {
// Insert a H2D copy, if needed
if (it->second.has_gpu)
return;
OpSpec copy_to_dev_spec =
OpSpec("MakeContiguous")
.AddArg("device", "mixed")
Expand All @@ -687,6 +655,7 @@ void Pipeline::SetupGPUInput(std::map<string, EdgeMeta>::iterator it) {
// don't put it into op_specs_for_serialization_, only op_specs_
AddToOpSpecs("__Copy_CpuToGpu_" + it->first, copy_to_dev_spec, GetNextInternalLogicalId());
it->second.has_gpu = true;
it->second.has_contiguous_gpu = true; // the result is always contiguous
}

void Pipeline::PrepareOpSpec(OpSpec *spec, int logical_id) {
Expand Down Expand Up @@ -1048,8 +1017,8 @@ std::string Pipeline::AddMakeContiguousNode(EdgeMeta &meta, const std::string &i
}

// Add a make contiguous op to produce this output
PrepareOpSpec(&spec, GetNextInternalLogicalId());
graph_builder_.Add(op_name, std::move(spec));
auto id = GetNextInternalLogicalId();
AddToOpSpecs(op_name, std::move(spec), id);

if (output_dev == "cpu") {
meta.has_make_contiguous_cpu = true;
Expand Down
Loading

0 comments on commit 99b44cd

Please sign in to comment.