Skip to content

Commit

Permalink
[WIP]
Browse files Browse the repository at this point in the history
Signed-off-by: Michał Zientkiewicz <[email protected]>
  • Loading branch information
mzient committed Jul 4, 2024
1 parent 20819f7 commit 48c39e2
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 33 deletions.
129 changes: 120 additions & 9 deletions dali/pipeline/executor/executor2/exec2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,114 @@ class Executor2::Impl {
void Build(const graph::OpGraph &graph) {
DeviceGuard dg(config_.device.value_or(CPU_ONLY_DEVICE_ID));
graph_.Lower(graph);
AnalyzeGraph();
CheckNodeTypes();
CalculatePrefetchDepth();
SetupStreams();
//InitThreadPool();
InitThreadPool();
InitExecutor();
}

void Run() {
if (!exec_)
throw std::runtime_error("The executor is not initialized.");
InitIteration();
graph_.Launch(*exec_);
}

void Prefetch() {
if (!exec_)
throw std::runtime_error("The executor is not initialized.");
for (int i = 0; i < prefetch_depth_; i++) {
Run();
}
}

Workspace PopOutputs() {
if (pending_outputs_.empty())
throw std::out_of_range("All pending outputs were already popped.");
auto fut = std::move(pending_outputs_.front());
pending_outputs_.pop();
return fut.Value<Workspace>();
}

void InitIteration() {
WorkspaceParams params{};
params.batch_size = InferBatchSize();
graph_.PrepareIteration(InitIterationData(), params);
}

private:
auto InitIterationData() {
auto iter_data = std::make_shared<IterationData>();
iter_data->iteration_index = iter_index++;
}

void Run();

void Prefetch();
int InferBatchSize() {

}

Workspace PopOutputs();
void AnalyzeGraph() {
for (auto &n : graph_.nodes) {
switch (NodeType(&n)) {
case OpType::CPU:
graph_info_.num_cpu++;
if (n.inputs.empty())
graph_info_.num_cpu_roots++;
break;
case OpType::GPU:
graph_info_.num_gpu++;
if (n.inputs.empty())
graph_info_.num_gpu_roots++;
break;
case OpType::MIXED:
graph_info_.num_mixed++;
if (n.inputs.empty())
graph_info_.num_mixed_roots++;
break;
}
}
}

void CheckNodeTypes() {
if (graph_info_.num_gpu + graph_info_.num_mixed > 0 && !config_.device.has_value())
throw std::invalid_argument("The graph contains nodes that require a GPU but the config "
"doesn't specify a device id.");
}

void CalculatePrefechDepth() {
int depth = 1;
if (graph_info_.num_cpu_roots > 0)
depth = std::max(depth, config_.cpu_queue_depth);
if (graph_info_.num_mixed_roots + graph_info_.num_gpu_roots > 0)
depth = std::max(depth, config_.gpu_queue_depth);
for (auto &node : graph_.nodes) {
if (node.inputs.empty() && node.def) {
int op_depth;
if (node.def->spec.TryGetArgument(depth, "queue_depth"))
depth = std::max(depth, op_depth);
}
}
prefetch_depth_ = depth;
}

void InitThreadPool() {
if (graph_info_.num_cpu > 0) {
tp_ = std::make_unique<ThreadPool>(
config_.thread_pool_threads,
config_.device.value_or(CPU_ONLY_DEVICE_ID),
config_.set_affinity,
"Executorv_v2");
} else {
tp_.reset();
}
}

void InitExecutor() {
exec_ = std::make_unique<tasking::Executor>(config_.operator_threads);
exec_->Start();
}

void SetupStreams() {
switch (config_.stream_policy) {
Expand All @@ -54,29 +153,41 @@ class Executor2::Impl {
}
}

private:
template <StreamPolicy policy>
void SetupStreamsImpl() {
StreamAssignment<policy> assignment(graph_);
int num_streams = assignment.NumStreams();
if (num_streams == 0)
return;
for (int i = 0; i < num_streams; i++)
streams_.push_back(CUDAStreamPool::Get());
streams_.push_back(CUDAStreamPool::instance().Get());
for (auto &node : graph_.nodes) {
auto stream_idx = assignment.GetStreamIdx(&node);

node.env.order = stream_idx.has_value()
? AccessOrder(streams[stream_idx])
? AccessOrder(streams_[*stream_idx])
: AccessOrder::host();
}
}

std::unique_ptr<ThreadPool> tp_;
ExecGraph graph_;
Config config_;
int prefetch_depth_ = 1;

struct GraphInfo {
int num_cpu = 0;
int num_mixed = 0;
int num_gpu = 0;
int num_cpu_roots = 0;
int num_mixed_roots = 0;
int num_gpu_roots = 0;
} graph_info_;

std::unique_ptr<ThreadPool> tp_;
std::queue<tasking::TaskFuture> pending_outputs_;
std::vector<CUDAStreamLease> streams_;

ExecGraph graph_;
std::unique_ptr<tasking::Executor> exec_;
};


Expand Down
15 changes: 11 additions & 4 deletions dali/pipeline/executor/executor2/exec2.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ namespace dali {
namespace exec2 {

enum class QueueDepthPolicy : int {
Uniform, //< All operators maintain a queue
FullyBuffered, //< All operators maintain a queue
BackendChange, //< Only operators followed by one with a different backend have a queue
Legacy = BackendChange,
OutputOnly, //< Only the pipeline output has multiple buffers
Legacy = BackendChange,
};

enum class OperatorParallelism : int {
Expand All @@ -52,8 +52,15 @@ class DLL_PUBLIC Executor2 : public ExecutorBase {
struct Config {
/** Device identifier */
std::optional<int> device;
/** The number of threads */
int num_threads = 0;
/** The number of threads used for running operators Run function
*
* TODO(michalz): Consider unification of the threading engines.
*/
int operator_threads = 0;
/** The number of threads in the thread pool passed to the operators */
int thread_pool_threads = 0;
/** Whether the thread pool should set thread affinity with NVML */
bool set_affinity = false;
/** The number of pending results CPU operators produce */
int cpu_queue_depth = 2;
/** The number of pending results GPU (and mixed) operators produce */
Expand Down
59 changes: 39 additions & 20 deletions dali/pipeline/executor/executor2/stream_assignment.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,25 @@ inline bool NeedsStream(const ExecNode *node) {
return false;
}

inline OpType NodeType(const ExecNode *node) {
if (node->is_pipeline_output) {
OpType type = OpType::CPU;
for (auto &pipe_out : node->inputs) {
if (pipe_out->device == StorageDevice::GPU) {
auto producer_type = pipe_out->producer->def->op_type;
if (producer_type == OpType::GPU) {
return OpType::GPU;
} else if (producer_type == OpType::MIXED) {
type = OpType::MIXED;
}
}
}
return type;
}
assert(node->def);
return node->def->op_type;
}

template <>
class StreamAssignment<StreamPolicy::Single> {
public:
Expand Down Expand Up @@ -73,31 +92,31 @@ class StreamAssignment<StreamPolicy::PerBackend> {
public:
StreamAssignment(ExecGraph &graph) {
for (auto &node : graph.nodes) {
if (NeedsStream(&node)) {
needs_stream_ = true;
switch (NodeType(&node)) {
case OpType::GPU:
has_gpu_ = true;
if (has_mixed_)
return; // we already have both, nothing more can happen
break;
case OpType::MIXED:
has_mixed_ = true;
if (has_gpu_)
return; // we already have both, nothing more can happen
break;
default:
break;
}
}
}

/** Returns a stream index for a non-CPU operator.
*
* If the node is a Mixed node, it gets stream index 0.
* If the node is a GPU node it gets stream index 1 if there are any mixed nodes, otherwise
* the only stream is the GPU stream and the returned index is 0.
*/
std::optional<int> GetStreamIdx(const ExecNode *node) {
OpType type = OpType::CPU;
if (node->is_pipeline_output) {
for (auto &pipe_out : node->inputs) {
if (pipe_out->device == StorageDevice::GPU) {
auto producer_type = pipe_out->producer->def->op_type;
if (producer_type == OpType::GPU) {
type = OpType::GPU;
break;
} else if (producer_type == OpType::MIXED) {
type = OpType::MIXED;
}
}
}
} else {
type = node->def->op_type;
}

switch (type) {
switch (NodeType(node)) {
case OpType::CPU:
return std::nullopt;
case OpType::GPU:
Expand Down

0 comments on commit 48c39e2

Please sign in to comment.