Skip to content
131 changes: 130 additions & 1 deletion tests/tools/pytest_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,136 @@ def chunk_graph(
)


def create_chunked_dataset(
def create_homo_chunked_dataset(
root_dir,
num_chunks,
data_fmt="numpy",
edges_fmt="csv",
vector_rows=False,
**kwargs,
):
"""
This function creates a sample homo dataset.

Parameters:
-----------
root_dir : string
directory in which all the files for the chunked dataset will be stored.
"""
# Step0: prepare chunked graph data format.
# A synthetic mini MAG240.
num_N = 1200

def rand_edges(num_src, num_dst, num_edges):
eids = np.random.choice(num_src * num_dst, num_edges, replace=False)
src = torch.from_numpy(eids // num_dst)
dst = torch.from_numpy(eids % num_dst)

return src, dst

num_E = 24 * 1000

# Structure.
data_dict = {("_N", "_E", "_N"): rand_edges(num_N, num_N, num_E)}
src, dst = data_dict[("_N", "_E", "_N")]
data_dict[("_N", "_E", "_N")] = (dst, src)
g = dgl.heterograph(data_dict)

# paper feat, label, year
num_paper_feats = 3
_N_feat = np.random.randn(num_N, num_paper_feats)
num_classes = 4
_N_label = np.random.choice(num_classes, num_N)
_N_year = np.random.choice(2022, num_N)
_N_orig_ids = np.arange(0, num_N)

# masks.
_N_train_mask = np.random.choice([True, False], num_N)
_N_test_mask = np.random.choice([True, False], num_N)
_N_val_mask = np.random.choice([True, False], num_N)

# Edge features.
_E_count = np.random.choice(10, num_E)

# Save features.
input_dir = os.path.join(root_dir, "data_test")
os.makedirs(input_dir)
for sub_d in ["_N", "_E"]:
os.makedirs(os.path.join(input_dir, sub_d))

_N_feat_path = os.path.join(input_dir, "_N/feat.npy")
with open(_N_feat_path, "wb") as f:
np.save(f, _N_feat)
g.nodes["_N"].data["feat"] = torch.from_numpy(_N_feat)

_N_label_path = os.path.join(input_dir, "_N/label.npy")
with open(_N_label_path, "wb") as f:
np.save(f, _N_label)
g.nodes["_N"].data["label"] = torch.from_numpy(_N_label)

_N_year_path = os.path.join(input_dir, "_N/year.npy")
with open(_N_year_path, "wb") as f:
np.save(f, _N_year)
g.nodes["_N"].data["year"] = torch.from_numpy(_N_year)

_N_orig_ids_path = os.path.join(input_dir, "_N/orig_ids.npy")
with open(_N_orig_ids_path, "wb") as f:
np.save(f, _N_orig_ids)
g.nodes["_N"].data["orig_ids"] = torch.from_numpy(_N_orig_ids)

_E_count_path = os.path.join(input_dir, "_E/count.npy")
with open(_E_count_path, "wb") as f:
np.save(f, _E_count)
g.edges["_E"].data["count"] = torch.from_numpy(_E_count)

_N_train_mask_path = os.path.join(input_dir, "_N/train_mask.npy")
with open(_N_train_mask_path, "wb") as f:
np.save(f, _N_train_mask)
g.nodes["_N"].data["train_mask"] = torch.from_numpy(_N_train_mask)

_N_test_mask_path = os.path.join(input_dir, "_N/test_mask.npy")
with open(_N_test_mask_path, "wb") as f:
np.save(f, _N_test_mask)
g.nodes["_N"].data["test_mask"] = torch.from_numpy(_N_test_mask)

_N_val_mask_path = os.path.join(input_dir, "_N/val_mask.npy")
with open(_N_val_mask_path, "wb") as f:
np.save(f, _N_val_mask)
g.nodes["_N"].data["val_mask"] = torch.from_numpy(_N_val_mask)

node_data = {
"_N": {
"feat": _N_feat_path,
"train_mask": _N_train_mask_path,
"test_mask": _N_test_mask_path,
"val_mask": _N_val_mask_path,
"label": _N_label_path,
"year": _N_year_path,
"orig_ids": _N_orig_ids_path,
}
}

edge_data = {"_E": {"count": _E_count_path}}

output_dir = os.path.join(root_dir, "chunked-data")
chunk_graph(
g,
"mag240m",
node_data,
edge_data,
num_chunks=num_chunks,
output_path=output_dir,
data_fmt=data_fmt,
edges_fmt=edges_fmt,
vector_rows=vector_rows,
**kwargs,
)
logging.debug("Done with creating chunked graph")

return g


def create_hetero_chunked_dataset(
root_dir,
num_chunks,
data_fmt="numpy",
Expand Down
4 changes: 2 additions & 2 deletions tests/tools/test_dist_lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import torch.distributed as dist
import torch.multiprocessing as mp

from pytest_utils import create_chunked_dataset
from pytest_utils import create_hetero_chunked_dataset
from tools.distpartitioning import constants, dist_lookup
from tools.distpartitioning.gloo_wrapper import allgather_sizes
from tools.distpartitioning.utils import (
Expand Down Expand Up @@ -210,7 +210,7 @@ def test_lookup_service(
):

with tempfile.TemporaryDirectory() as root_dir:
g = create_chunked_dataset(
g = create_hetero_chunked_dataset(
root_dir,
num_chunks,
data_fmt="numpy",
Expand Down
6 changes: 3 additions & 3 deletions tests/tools/test_dist_part.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from distpartitioning import array_readwriter
from distpartitioning.utils import generate_read_list
from pytest_utils import chunk_graph, create_chunked_dataset
from pytest_utils import chunk_graph, create_hetero_chunked_dataset
from scipy import sparse as spsp

from tools.verification_utils import (
Expand All @@ -41,7 +41,7 @@ def _test_chunk_graph(
num_chunks_edge_data=None,
):
with tempfile.TemporaryDirectory() as root_dir:
g = create_chunked_dataset(
g = create_hetero_chunked_dataset(
root_dir,
num_chunks,
data_fmt=data_fmt,
Expand Down Expand Up @@ -319,7 +319,7 @@ def _test_pipeline(
return

with tempfile.TemporaryDirectory() as root_dir:
g = create_chunked_dataset(
g = create_hetero_chunked_dataset(
root_dir,
num_chunks,
data_fmt=data_fmt,
Expand Down
68 changes: 50 additions & 18 deletions tests/tools/test_dist_partition_graphbolt.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@

from distpartitioning import array_readwriter
from distpartitioning.utils import generate_read_list
from pytest_utils import create_chunked_dataset
from pytest_utils import (
create_hetero_chunked_dataset,
create_homo_chunked_dataset,
)


def _verify_metadata_gb(gpb, g, num_parts, part_id, part_sizes):
Expand Down Expand Up @@ -829,24 +832,41 @@ def _test_pipeline_graphbolt(
store_eids=True,
store_inner_edge=True,
store_inner_node=True,
is_homogeneous=False,
):
if num_parts % world_size != 0:
# num_parts should be a multiple of world_size
return

with tempfile.TemporaryDirectory() as root_dir:
g = create_chunked_dataset(
root_dir,
num_chunks,
data_fmt=data_fmt,
num_chunks_nodes=num_chunks_nodes,
num_chunks_edges=num_chunks_edges,
num_chunks_node_data=num_chunks_node_data,
num_chunks_edge_data=num_chunks_edge_data,
)
graph_name = "test"
test_ntype = "paper"
test_etype = ("paper", "cites", "paper")
if is_homogeneous:
g = create_homo_chunked_dataset(
root_dir,
num_chunks,
data_fmt=data_fmt,
num_chunks_nodes=num_chunks_nodes,
num_chunks_edges=num_chunks_edges,
num_chunks_node_data=num_chunks_node_data,
num_chunks_edge_data=num_chunks_edge_data,
)
graph_name = "test"
test_ntype = "_N"
test_etype = ("_N", "_E", "_N")
ntypes = ["_N"]
else:
g = create_hetero_chunked_dataset(
root_dir,
num_chunks,
data_fmt=data_fmt,
num_chunks_nodes=num_chunks_nodes,
num_chunks_edges=num_chunks_edges,
num_chunks_node_data=num_chunks_node_data,
num_chunks_edge_data=num_chunks_edge_data,
)
graph_name = "test"
test_ntype = "paper"
test_etype = ("paper", "cites", "paper")
ntypes = ["author", "institution", "paper"]

# Step1: graph partition
in_dir = os.path.join(root_dir, "chunked-data")
Expand All @@ -857,7 +877,7 @@ def _test_pipeline_graphbolt(
in_dir, output_dir, num_parts
)
)
for ntype in ["author", "institution", "paper"]:
for ntype in ntypes:
fname = os.path.join(output_dir, "{}.txt".format(ntype))
with open(fname, "r") as f:
header = f.readline().rstrip()
Expand Down Expand Up @@ -952,14 +972,20 @@ def read_orig_ids(fname):
"num_chunks, num_parts, world_size",
[[4, 4, 4], [8, 4, 2], [8, 4, 4], [9, 6, 3], [11, 11, 1], [11, 4, 1]],
)
def test_pipeline_basics(num_chunks, num_parts, world_size):
@pytest.mark.parametrize("is_homogeneous", [False, True])
def test_pipeline_basics(num_chunks, num_parts, world_size, is_homogeneous):
_test_pipeline_graphbolt(
num_chunks,
num_parts,
world_size,
is_homogeneous=is_homogeneous,
)
_test_pipeline_graphbolt(
num_chunks, num_parts, world_size, use_verify_partitions=False
num_chunks,
num_parts,
world_size,
use_verify_partitions=False,
is_homogeneous=is_homogeneous,
)


Expand Down Expand Up @@ -1001,12 +1027,14 @@ def test_pipeline_attributes(store_inner_node, store_inner_edge, store_eids):
[1, 5, 3, 1, 1],
],
)
@pytest.mark.parametrize("is_homogeneous", [False, True])
def test_pipeline_arbitrary_chunks(
num_chunks,
num_parts,
world_size,
num_chunks_node_data,
num_chunks_edge_data,
is_homogeneous,
):

_test_pipeline_graphbolt(
Expand All @@ -1015,9 +1043,13 @@ def test_pipeline_arbitrary_chunks(
world_size,
num_chunks_node_data=num_chunks_node_data,
num_chunks_edge_data=num_chunks_edge_data,
is_homogeneous=is_homogeneous,
)


@pytest.mark.parametrize("data_fmt", ["numpy", "parquet"])
def test_pipeline_feature_format(data_fmt):
_test_pipeline_graphbolt(4, 4, 4, data_fmt=data_fmt)
@pytest.mark.parametrize("is_homogeneous", [False, True])
def test_pipeline_feature_format(data_fmt, is_homogeneous):
_test_pipeline_graphbolt(
4, 4, 4, data_fmt=data_fmt, is_homogeneous=is_homogeneous
)
8 changes: 4 additions & 4 deletions tests/tools/test_parmetis.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from dgl.data.utils import load_graphs, load_tensors
from partition_algo.base import load_partition_meta

from pytest_utils import create_chunked_dataset
from pytest_utils import create_hetero_chunked_dataset

"""
TODO: skipping this test case since the dependency, mpirun, is
Expand All @@ -23,7 +23,7 @@
def test_parmetis_preprocessing():
with tempfile.TemporaryDirectory() as root_dir:
num_chunks = 2
g = create_chunked_dataset(root_dir, num_chunks)
g = create_hetero_chunked_dataset(root_dir, num_chunks)

# Trigger ParMETIS pre-processing here.
input_dir = os.path.join(root_dir, "chunked-data")
Expand Down Expand Up @@ -117,7 +117,7 @@ def test_parmetis_preprocessing():
def test_parmetis_postprocessing():
with tempfile.TemporaryDirectory() as root_dir:
num_chunks = 2
g = create_chunked_dataset(root_dir, num_chunks)
g = create_hetero_chunked_dataset(root_dir, num_chunks)

num_nodes = g.num_nodes()
num_institutions = g.num_nodes("institution")
Expand Down Expand Up @@ -188,7 +188,7 @@ def test_parmetis_wrapper():
with tempfile.TemporaryDirectory() as root_dir:
num_chunks = 2
graph_name = "mag240m"
g = create_chunked_dataset(root_dir, num_chunks)
g = create_hetero_chunked_dataset(root_dir, num_chunks)
all_ntypes = g.ntypes
all_etypes = g.etypes
num_constraints = len(all_ntypes) + 3
Expand Down
2 changes: 2 additions & 0 deletions tools/distpartitioning/convert_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,8 @@ def _process_partition_gb(
sorted_idx = (
th.repeat_interleave(indptr[:-1], split_size, dim=0) + sorted_idx
)
else:
sorted_idx = th.arange(len(edge_ids))

return indptr, indices[sorted_idx], edge_ids[sorted_idx]

Expand Down