Skip to content

Commit

Permalink
change partition
Browse files Browse the repository at this point in the history
  • Loading branch information
Ubuntu committed Sep 23, 2024
2 parents 45ddaa6 + 5ae6400 commit 97d000b
Show file tree
Hide file tree
Showing 21 changed files with 3,036 additions and 310 deletions.
485 changes: 485 additions & 0 deletions examples/graphbolt/pyg/multigpu/node_classification.py

Large diffs are not rendered by default.

23 changes: 20 additions & 3 deletions graphbolt/src/cuda/cooperative_minibatching_utils.cu
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
* implementations in CUDA.
*/
#include <graphbolt/cuda_ops.h>
#include <thrust/scatter.h>
#include <thrust/transform.h>

#include <cub/cub.cuh>
#include <cuda/functional>

#include "../utils.h"
#include "./common.h"
#include "./cooperative_minibatching_utils.cuh"
#include "./cooperative_minibatching_utils.h"
Expand Down Expand Up @@ -62,8 +64,7 @@ RankSortImpl(
auto part_ids2 = part_ids.clone();
auto part_ids2_sorted = torch::empty_like(part_ids2);
auto nodes_sorted = torch::empty_like(nodes);
auto index = ops::IndptrEdgeIdsImpl(
offsets_dev, nodes.scalar_type(), torch::nullopt, nodes.numel());
auto index = torch::arange(nodes.numel(), nodes.options());
auto index_sorted = torch::empty_like(index);
return AT_DISPATCH_INDEX_TYPES(
nodes.scalar_type(), "RankSortImpl", ([&] {
Expand Down Expand Up @@ -100,8 +101,14 @@ RankSortImpl(
index.data_ptr<index_t>(), index_sorted.data_ptr<index_t>(),
nodes.numel(), num_batches, offsets_dev_ptr, offsets_dev_ptr + 1, 0,
num_bits);
auto values = ops::IndptrEdgeIdsImpl(
offsets_dev, nodes.scalar_type(), torch::nullopt, nodes.numel());
THRUST_CALL(
scatter, values.data_ptr<index_t>(),
values.data_ptr<index_t>() + values.numel(),
index_sorted.data_ptr<index_t>(), index.data_ptr<index_t>());
return std::make_tuple(
nodes_sorted, index_sorted, offsets, std::move(offsets_event));
nodes_sorted, index, offsets, std::move(offsets_event));
}));
}

Expand Down Expand Up @@ -138,5 +145,15 @@ std::vector<std::tuple<torch::Tensor, torch::Tensor, torch::Tensor>> RankSort(
return results;
}

c10::intrusive_ptr<Future<
std::vector<std::tuple<torch::Tensor, torch::Tensor, torch::Tensor>>>>
RankSortAsync(
const std::vector<torch::Tensor>& nodes_list, const int64_t rank,
const int64_t world_size) {
return async(
[=] { return RankSort(nodes_list, rank, world_size); },
utils::is_on_gpu(nodes_list.at(0)));
}

} // namespace cuda
} // namespace graphbolt
36 changes: 22 additions & 14 deletions graphbolt/src/cuda/cooperative_minibatching_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#define GRAPHBOLT_CUDA_COOPERATIVE_MINIBATCHING_UTILS_H_

#include <ATen/cuda/CUDAEvent.h>
#include <graphbolt/async.h>
#include <torch/script.h>

namespace graphbolt {
Expand All @@ -42,21 +43,21 @@ torch::Tensor RankAssignment(

/**
* @brief Given node ids, the ranks they belong, the offsets to separate
* different node types and num_bits indicating the world size is <= 2^num_bits,
* returns node ids sorted w.r.t. the ranks that the given ids belong along with
* the original positions.
* different node types and world size, returns node ids sorted w.r.t. the ranks
* that the given ids belong along with their new positions.
*
* @param nodes Node id tensor to be mapped to a rank in [0, world_size).
* @param part_ids Rank tensor the nodes belong to.
* @param offsets_dev Offsets to separate different node types.
* @param world_size World size, the total number of cooperating GPUs.
*
* @return (sorted_nodes, original_positions, rank_offsets, rank_offsets_event),
* where the first one includes sorted nodes, the second contains original
* positions of the sorted nodes and the third contains the offsets of the
* sorted_nodes indicating sorted_nodes[rank_offsets[i]: rank_offsets[i + 1]]
* contains nodes that belongs to the `i`th rank. Before accessing rank_offsets
* on the CPU, `rank_offsets_event.synchronize()` is required.
* @return (sorted_nodes, new_positions, rank_offsets, rank_offsets_event),
* where the first one includes sorted nodes, the second contains new positions
* of the given nodes, so that sorted_nodes[new_positions] == nodes, and the
* third contains the offsets of the sorted_nodes indicating
* sorted_nodes[rank_offsets[i]: rank_offsets[i + 1]] contains nodes that
* belongs to the `i`th rank. Before accessing rank_offsets on the CPU,
* `rank_offsets_event.synchronize()` is required.
*/
std::tuple<torch::Tensor, torch::Tensor, torch::Tensor, at::cuda::CUDAEvent>
RankSortImpl(
Expand All @@ -72,16 +73,23 @@ RankSortImpl(
* @param rank Rank of the current GPU.
* @param world_size World size, the total number of cooperating GPUs.
*
* @return vector of (sorted_nodes, original_positions, rank_offsets), where the
* first one includes sorted nodes, the second contains original positions of
* the sorted nodes and the third contains the offsets of the sorted_nodes
* indicating sorted_nodes[rank_offsets[i]: rank_offsets[i + 1]] contains nodes
* that belongs to the `i`th rank.
* @return vector of (sorted_nodes, new_positions, rank_offsets), where the
* first one includes sorted nodes, the second contains new positions of the
* given nodes, so that sorted_nodes[new_positions] == nodes, and the third
* contains the offsets of the sorted_nodes indicating
* sorted_nodes[rank_offsets[i]: rank_offsets[i + 1]] contains nodes that
* belongs to the `i`th rank.
*/
std::vector<std::tuple<torch::Tensor, torch::Tensor, torch::Tensor>> RankSort(
const std::vector<torch::Tensor>& nodes_list, int64_t rank,
int64_t world_size);

c10::intrusive_ptr<Future<
std::vector<std::tuple<torch::Tensor, torch::Tensor, torch::Tensor>>>>
RankSortAsync(
const std::vector<torch::Tensor>& nodes_list, const int64_t rank,
const int64_t world_size);

} // namespace cuda
} // namespace graphbolt

Expand Down
16 changes: 12 additions & 4 deletions graphbolt/src/cuda/extension/unique_and_compact_map.cu
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,12 @@ __global__ void _MapIdsBatched(

auto slot = map.find(key);
auto new_id = slot->second;
if (index) new_id = index[new_id];
mapped_ids[i] = new_id - unique_ids_offsets[batch_index];
if (index) {
new_id = index[new_id];
} else {
new_id -= unique_ids_offsets[batch_index];
}
mapped_ids[i] = new_id;
}

i += stride;
Expand Down Expand Up @@ -284,14 +288,18 @@ UniqueAndCompactBatchedHashMapBased(
unique_ids_offsets_dev.data_ptr<int64_t>();
}
at::cuda::CUDAEvent unique_ids_offsets_event;
unique_ids_offsets_event.record();
torch::optional<torch::Tensor> index;
if (part_ids) {
unique_ids_offsets_event.synchronize();
const auto num_unique =
unique_ids_offsets.data_ptr<int64_t>()[num_batches];
unique_ids = unique_ids.slice(0, 0, num_unique);
part_ids = part_ids->slice(0, 0, num_unique);
std::tie(
unique_ids, index, unique_ids_offsets, unique_ids_offsets_event) =
cuda::RankSortImpl(
unique_ids, *part_ids, unique_ids_offsets_dev, world_size);
} else {
unique_ids_offsets_event.record();
}
auto mapped_ids =
torch::empty(offsets_ptr[3 * num_batches], unique_ids.options());
Expand Down
8 changes: 8 additions & 0 deletions graphbolt/src/python_binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ TORCH_LIBRARY(graphbolt, m) {
&Future<std::vector<std::tuple<
torch::Tensor, torch::Tensor, torch::Tensor, torch::Tensor>>>::
Wait);
m.class_<Future<
std::vector<std::tuple<torch::Tensor, torch::Tensor, torch::Tensor>>>>(
"RankSortFuture")
.def(
"wait",
&Future<std::vector<
std::tuple<torch::Tensor, torch::Tensor, torch::Tensor>>>::Wait);
m.class_<Future<std::tuple<torch::Tensor, torch::Tensor, int64_t, int64_t>>>(
"GpuGraphCacheQueryFuture")
.def(
Expand Down Expand Up @@ -198,6 +205,7 @@ TORCH_LIBRARY(graphbolt, m) {
#ifdef GRAPHBOLT_USE_CUDA
m.def("set_max_uva_threads", &cuda::set_max_uva_threads);
m.def("rank_sort", &cuda::RankSort);
m.def("rank_sort_async", &cuda::RankSortAsync);
#endif
#ifdef HAS_IMPL_ABSTRACT_PYSTUB
m.impl_abstract_pystub("dgl.graphbolt.base", "//dgl.graphbolt.base");
Expand Down
Loading

0 comments on commit 97d000b

Please sign in to comment.