Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[distGB] fix local variable 'sorted_idx' referenced before assignment in convert_partition.py #7830

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
131 changes: 130 additions & 1 deletion tests/tools/pytest_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,136 @@ def chunk_graph(
)


def create_chunked_dataset(
def create_homo_chunked_dataset(
root_dir,
num_chunks,
data_fmt="numpy",
edges_fmt="csv",
vector_rows=False,
**kwargs,
):
"""
This function creates a sample homo dataset.

Parameters:
-----------
root_dir : string
directory in which all the files for the chunked dataset will be stored.
"""
# Step0: prepare chunked graph data format.
# A synthetic mini MAG240.
num_N = 1200

def rand_edges(num_src, num_dst, num_edges):
eids = np.random.choice(num_src * num_dst, num_edges, replace=False)
src = torch.from_numpy(eids // num_dst)
dst = torch.from_numpy(eids % num_dst)

return src, dst

num_E = 24 * 1000

# Structure.
data_dict = {("_N", "_E", "_N"): rand_edges(num_N, num_N, num_E)}
src, dst = data_dict[("_N", "_E", "_N")]
data_dict[("_N", "_E", "_N")] = (dst, src)
g = dgl.heterograph(data_dict)

# paper feat, label, year
num_paper_feats = 3
_N_feat = np.random.randn(num_N, num_paper_feats)
num_classes = 4
_N_label = np.random.choice(num_classes, num_N)
_N_year = np.random.choice(2022, num_N)
_N_orig_ids = np.arange(0, num_N)

# masks.
_N_train_mask = np.random.choice([True, False], num_N)
_N_test_mask = np.random.choice([True, False], num_N)
_N_val_mask = np.random.choice([True, False], num_N)

# Edge features.
_E_count = np.random.choice(10, num_E)

# Save features.
input_dir = os.path.join(root_dir, "data_test")
os.makedirs(input_dir)
for sub_d in ["_N", "_E"]:
os.makedirs(os.path.join(input_dir, sub_d))

_N_feat_path = os.path.join(input_dir, "_N/feat.npy")
with open(_N_feat_path, "wb") as f:
np.save(f, _N_feat)
g.nodes["_N"].data["feat"] = torch.from_numpy(_N_feat)

_N_label_path = os.path.join(input_dir, "_N/label.npy")
with open(_N_label_path, "wb") as f:
np.save(f, _N_label)
g.nodes["_N"].data["label"] = torch.from_numpy(_N_label)

_N_year_path = os.path.join(input_dir, "_N/year.npy")
with open(_N_year_path, "wb") as f:
np.save(f, _N_year)
g.nodes["_N"].data["year"] = torch.from_numpy(_N_year)

_N_orig_ids_path = os.path.join(input_dir, "_N/orig_ids.npy")
with open(_N_orig_ids_path, "wb") as f:
np.save(f, _N_orig_ids)
g.nodes["_N"].data["orig_ids"] = torch.from_numpy(_N_orig_ids)

_E_count_path = os.path.join(input_dir, "_E/count.npy")
with open(_E_count_path, "wb") as f:
np.save(f, _E_count)
g.edges["_E"].data["count"] = torch.from_numpy(_E_count)

_N_train_mask_path = os.path.join(input_dir, "_N/train_mask.npy")
with open(_N_train_mask_path, "wb") as f:
np.save(f, _N_train_mask)
g.nodes["_N"].data["train_mask"] = torch.from_numpy(_N_train_mask)

_N_test_mask_path = os.path.join(input_dir, "_N/test_mask.npy")
with open(_N_test_mask_path, "wb") as f:
np.save(f, _N_test_mask)
g.nodes["_N"].data["test_mask"] = torch.from_numpy(_N_test_mask)

_N_val_mask_path = os.path.join(input_dir, "_N/val_mask.npy")
with open(_N_val_mask_path, "wb") as f:
np.save(f, _N_val_mask)
g.nodes["_N"].data["val_mask"] = torch.from_numpy(_N_val_mask)

node_data = {
"_N": {
"feat": _N_feat_path,
"train_mask": _N_train_mask_path,
"test_mask": _N_test_mask_path,
"val_mask": _N_val_mask_path,
"label": _N_label_path,
"year": _N_year_path,
"orig_ids": _N_orig_ids_path,
}
}

edge_data = {"_E": {"count": _E_count_path}}

output_dir = os.path.join(root_dir, "chunked-data")
chunk_graph(
g,
"mag240m",
node_data,
edge_data,
num_chunks=num_chunks,
output_path=output_dir,
data_fmt=data_fmt,
edges_fmt=edges_fmt,
vector_rows=vector_rows,
**kwargs,
)
logging.debug("Done with creating chunked graph")

return g


def create_hetero_chunked_dataset(
root_dir,
num_chunks,
data_fmt="numpy",
Expand Down
4 changes: 2 additions & 2 deletions tests/tools/test_dist_lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import torch.distributed as dist
import torch.multiprocessing as mp

from pytest_utils import create_chunked_dataset
from pytest_utils import create_hetero_chunked_dataset
from tools.distpartitioning import constants, dist_lookup
from tools.distpartitioning.gloo_wrapper import allgather_sizes
from tools.distpartitioning.utils import (
Expand Down Expand Up @@ -210,7 +210,7 @@ def test_lookup_service(
):

with tempfile.TemporaryDirectory() as root_dir:
g = create_chunked_dataset(
g = create_hetero_chunked_dataset(
root_dir,
num_chunks,
data_fmt="numpy",
Expand Down
6 changes: 3 additions & 3 deletions tests/tools/test_dist_part.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from distpartitioning import array_readwriter
from distpartitioning.utils import generate_read_list
from pytest_utils import chunk_graph, create_chunked_dataset
from pytest_utils import chunk_graph, create_hetero_chunked_dataset
from scipy import sparse as spsp

from tools.verification_utils import (
Expand All @@ -41,7 +41,7 @@ def _test_chunk_graph(
num_chunks_edge_data=None,
):
with tempfile.TemporaryDirectory() as root_dir:
g = create_chunked_dataset(
g = create_hetero_chunked_dataset(
root_dir,
num_chunks,
data_fmt=data_fmt,
Expand Down Expand Up @@ -319,7 +319,7 @@ def _test_pipeline(
return

with tempfile.TemporaryDirectory() as root_dir:
g = create_chunked_dataset(
g = create_hetero_chunked_dataset(
root_dir,
num_chunks,
data_fmt=data_fmt,
Expand Down
68 changes: 50 additions & 18 deletions tests/tools/test_dist_partition_graphbolt.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@

from distpartitioning import array_readwriter
from distpartitioning.utils import generate_read_list
from pytest_utils import create_chunked_dataset
from pytest_utils import (
create_hetero_chunked_dataset,
create_homo_chunked_dataset,
)


def _verify_metadata_gb(gpb, g, num_parts, part_id, part_sizes):
Expand Down Expand Up @@ -829,24 +832,41 @@ def _test_pipeline_graphbolt(
store_eids=True,
store_inner_edge=True,
store_inner_node=True,
is_homogeneous=False,
):
if num_parts % world_size != 0:
# num_parts should be a multiple of world_size
return

with tempfile.TemporaryDirectory() as root_dir:
g = create_chunked_dataset(
root_dir,
num_chunks,
data_fmt=data_fmt,
num_chunks_nodes=num_chunks_nodes,
num_chunks_edges=num_chunks_edges,
num_chunks_node_data=num_chunks_node_data,
num_chunks_edge_data=num_chunks_edge_data,
)
graph_name = "test"
test_ntype = "paper"
test_etype = ("paper", "cites", "paper")
if is_homogeneous:
g = create_homo_chunked_dataset(
root_dir,
num_chunks,
data_fmt=data_fmt,
num_chunks_nodes=num_chunks_nodes,
num_chunks_edges=num_chunks_edges,
num_chunks_node_data=num_chunks_node_data,
num_chunks_edge_data=num_chunks_edge_data,
)
graph_name = "test"
test_ntype = "_N"
test_etype = ("_N", "_E", "_N")
ntypes = ["_N"]
else:
g = create_hetero_chunked_dataset(
root_dir,
num_chunks,
data_fmt=data_fmt,
num_chunks_nodes=num_chunks_nodes,
num_chunks_edges=num_chunks_edges,
num_chunks_node_data=num_chunks_node_data,
num_chunks_edge_data=num_chunks_edge_data,
)
graph_name = "test"
test_ntype = "paper"
test_etype = ("paper", "cites", "paper")
ntypes = ["author", "institution", "paper"]

# Step1: graph partition
in_dir = os.path.join(root_dir, "chunked-data")
Expand All @@ -857,7 +877,7 @@ def _test_pipeline_graphbolt(
in_dir, output_dir, num_parts
)
)
for ntype in ["author", "institution", "paper"]:
for ntype in ntypes:
fname = os.path.join(output_dir, "{}.txt".format(ntype))
with open(fname, "r") as f:
header = f.readline().rstrip()
Expand Down Expand Up @@ -952,14 +972,20 @@ def read_orig_ids(fname):
"num_chunks, num_parts, world_size",
[[4, 4, 4], [8, 4, 2], [8, 4, 4], [9, 6, 3], [11, 11, 1], [11, 4, 1]],
)
def test_pipeline_basics(num_chunks, num_parts, world_size):
@pytest.mark.parametrize("is_homogeneous", [False, True])
def test_pipeline_basics(num_chunks, num_parts, world_size, is_homogeneous):
_test_pipeline_graphbolt(
num_chunks,
num_parts,
world_size,
is_homogeneous=is_homogeneous,
)
_test_pipeline_graphbolt(
num_chunks, num_parts, world_size, use_verify_partitions=False
num_chunks,
num_parts,
world_size,
use_verify_partitions=False,
is_homogeneous=is_homogeneous,
)


Expand Down Expand Up @@ -1001,12 +1027,14 @@ def test_pipeline_attributes(store_inner_node, store_inner_edge, store_eids):
[1, 5, 3, 1, 1],
],
)
@pytest.mark.parametrize("is_homogeneous", [False, True])
def test_pipeline_arbitrary_chunks(
num_chunks,
num_parts,
world_size,
num_chunks_node_data,
num_chunks_edge_data,
is_homogeneous,
):

_test_pipeline_graphbolt(
Expand All @@ -1015,9 +1043,13 @@ def test_pipeline_arbitrary_chunks(
world_size,
num_chunks_node_data=num_chunks_node_data,
num_chunks_edge_data=num_chunks_edge_data,
is_homogeneous=is_homogeneous,
)


@pytest.mark.parametrize("data_fmt", ["numpy", "parquet"])
def test_pipeline_feature_format(data_fmt):
_test_pipeline_graphbolt(4, 4, 4, data_fmt=data_fmt)
@pytest.mark.parametrize("is_homogeneous", [False, True])
def test_pipeline_feature_format(data_fmt, is_homogeneous):
_test_pipeline_graphbolt(
4, 4, 4, data_fmt=data_fmt, is_homogeneous=is_homogeneous
)
8 changes: 4 additions & 4 deletions tests/tools/test_parmetis.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from dgl.data.utils import load_graphs, load_tensors
from partition_algo.base import load_partition_meta

from pytest_utils import create_chunked_dataset
from pytest_utils import create_hetero_chunked_dataset

"""
TODO: skipping this test case since the dependency, mpirun, is
Expand All @@ -23,7 +23,7 @@
def test_parmetis_preprocessing():
with tempfile.TemporaryDirectory() as root_dir:
num_chunks = 2
g = create_chunked_dataset(root_dir, num_chunks)
g = create_hetero_chunked_dataset(root_dir, num_chunks)

# Trigger ParMETIS pre-processing here.
input_dir = os.path.join(root_dir, "chunked-data")
Expand Down Expand Up @@ -117,7 +117,7 @@ def test_parmetis_preprocessing():
def test_parmetis_postprocessing():
with tempfile.TemporaryDirectory() as root_dir:
num_chunks = 2
g = create_chunked_dataset(root_dir, num_chunks)
g = create_hetero_chunked_dataset(root_dir, num_chunks)

num_nodes = g.num_nodes()
num_institutions = g.num_nodes("institution")
Expand Down Expand Up @@ -188,7 +188,7 @@ def test_parmetis_wrapper():
with tempfile.TemporaryDirectory() as root_dir:
num_chunks = 2
graph_name = "mag240m"
g = create_chunked_dataset(root_dir, num_chunks)
g = create_hetero_chunked_dataset(root_dir, num_chunks)
all_ntypes = g.ntypes
all_etypes = g.etypes
num_constraints = len(all_ntypes) + 3
Expand Down
2 changes: 2 additions & 0 deletions tools/distpartitioning/convert_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,8 @@ def _process_partition_gb(
sorted_idx = (
th.repeat_interleave(indptr[:-1], split_size, dim=0) + sorted_idx
)
else:
sorted_idx = th.arange(len(edge_ids))

return indptr, indices[sorted_idx], edge_ids[sorted_idx]

Expand Down
Loading