Skip to content
Open
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
dd8b301
Scale native allreduce/allgather algos for MNNVL/MNNVLS
Binyang2014 Apr 27, 2026
893a08e
Enable MNNVL allreduce tuning
Binyang2014 Apr 28, 2026
dded5e0
Improve MNNVL allreduce tuning performance
Binyang2014 Apr 28, 2026
865c2bc
Optimize MNNVL allreduce without symmetric memory
Binyang2014 Apr 28, 2026
3bc00cb
Enable NVLS zero-copy without symmetric memory flag
Binyang2014 Apr 28, 2026
533f329
Tune no-sym MNNVL with RSAG zero-copy
Binyang2014 Apr 28, 2026
45a651b
Decouple IPC-domain hint from bootstrap nRanksPerNode
Binyang2014 May 1, 2026
2a2fca8
Rename collective ctx/kernel param nRanksPerNode to ipcDomainNranks
Binyang2014 May 1, 2026
2efda4d
Restore compile-time templated NRanksPerNode for rsag_zero_copy
Binyang2014 May 1, 2026
1c29817
Revert AllreduceRsAgZeroCopy non-symmetric ctx key tag back to ++tag
Binyang2014 May 1, 2026
7bc5e04
Reset GPU tokens before reuse
Binyang2014 May 2, 2026
9a36884
Rename gpuMemset wrapper and zero TokenPool slots in deleter
Binyang2014 May 2, 2026
987f800
Merge remote-tracking branch 'origin/main' into binyli/mnnvl
Binyang2014 May 4, 2026
6296803
Make NVLS non-zero-copy allreduce algorithms MNNVL-ready
Binyang2014 May 5, 2026
9aeeaf0
Simplify torch-integration tuning example for MPI-only multi-node tes…
Binyang2014 May 6, 2026
905b23d
Drop non-MNNVL multi_node regime from torch-integration example
Binyang2014 May 6, 2026
4a0d5b2
Simplify torch-integration tuning example
Binyang2014 May 6, 2026
307a471
Shorten verbose comments and use THROW in validateIpcDomainSpansWorld
Binyang2014 May 6, 2026
f0c6ac0
Fold validateIpcDomainSpansWorld into getIpcDomainNranks
Binyang2014 May 6, 2026
bde23ce
Revert verbose RSAG zero-copy comment; rename NRanksPerNode template …
Binyang2014 May 6, 2026
095cfff
Revert RSAG nBlocks default to 64
Binyang2014 May 6, 2026
639b80d
Tie AllreduceAllpairPacket maxBlockNum_ to MAX_IPC_DOMAIN_NRANKS - 1
Binyang2014 May 6, 2026
e8caab7
Strip preflight validation blocks from NVLS pipeline allreduce kernels
Binyang2014 May 6, 2026
7d80a33
Default torch example SYMMETRIC_MEMORY env to 1
Binyang2014 May 6, 2026
d1b04a3
NVLS zero-copy allreduce: support FP16 accumulator for FP8 inputs
Binyang2014 May 7, 2026
113d859
fix
Binyang2014 May 8, 2026
9ff7e1c
update
Binyang2014 May 8, 2026
654bcfa
update
Binyang2014 May 8, 2026
5516bdb
fix
Binyang2014 May 8, 2026
e208cc3
WIP
Binyang2014 May 8, 2026
825fc12
address hang issue
Binyang2014 May 9, 2026
224b3de
Clean up completed communicator receives
Binyang2014 May 13, 2026
0c09239
Merge branch 'main' into binyli/mnnvl
Binyang2014 May 13, 2026
7724e49
Fix lint and ROCm error alias
Copilot May 13, 2026
24850ef
Merge branch 'main' into binyli/mnnvl
Binyang2014 May 15, 2026
ee82cc4
Merge branch 'main' into binyli/mnnvl
Binyang2014 May 15, 2026
dbebde2
Configure IPC domain per communicator
Binyang2014 May 15, 2026
93b4354
temp solution
Binyang2014 May 15, 2026
0744e80
detect ipc domain automaticlly
Binyang2014 May 16, 2026
94af88d
Fix tuning example hang
Binyang2014 May 16, 2026
f32cfb1
update
Binyang2014 May 16, 2026
594dc79
Address NVLS review feedback
seagater May 16, 2026
18d3737
Tighten NVML IPC domain hash lookup
seagater May 16, 2026
4db71b9
Move barrier into setupNvlsChannels and clean up NVLS pipeline state
Binyang2014 May 18, 2026
35331cf
Fix collective topology sizing
Binyang2014 May 20, 2026
ac44e98
update
Binyang2014 May 20, 2026
7308c32
merge main
Binyang2014 May 22, 2026
42ece40
Fix memory leak
Binyang2014 May 24, 2026
641420d
increase nvls memory size to 64 GB
Binyang2014 May 26, 2026
ea73a1e
WIP
Binyang2014 May 26, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ if(MSCCLPP_USE_CUDA)
else()
set(GPU_LIBRARIES CUDA::cudart CUDA::cuda_driver)
endif()
list(APPEND GPU_LIBRARIES CUDA::nvml)
else()
set(CMAKE_HIP_STANDARD 17)
set(CMAKE_HIP_FLAGS "${CMAKE_HIP_FLAGS} -Wall -Wextra")
Expand Down
139 changes: 68 additions & 71 deletions examples/torch-integration/customized_comm_with_tuning.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

# torchrun --nnodes=1 --nproc_per_node=8 examples/torch-integration/customized_comm_with_tuning.py
# mpirun -np 8 python3 examples/torch-integration/customized_comm_with_tuning.py
# mpirun -np 16 --hostfile <hostfile> python3 examples/torch-integration/customized_comm_with_tuning.py

import os
import ipaddress

import netifaces as ni
from mpi4py import MPI

import torch
import mscclpp
import mscclpp.ext
Expand Down Expand Up @@ -35,17 +36,6 @@ def _load_algorithms(scratch: torch.Tensor, rank: int):
)


def _interfaces_for_ip(ip: str):
target = ipaddress.ip_address(ip)
for iface in ni.interfaces():
addrs = ni.ifaddresses(iface)
if ni.AF_INET in addrs:
for link in addrs[ni.AF_INET]:
if "addr" in link and ipaddress.ip_address(link["addr"]) == target:
return iface
return None


def _to_mscclpp_op(op) -> mscclpp.ReduceOp:
if op == torch.distributed.ReduceOp.SUM:
return mscclpp.ReduceOp.SUM
Expand All @@ -67,23 +57,48 @@ def _round_pow2(size: int) -> int:
class CustomizedComm:
"""Exposes all_reduce, all_gather, barrier with lazy per-size tuning."""

_TUNE_N_WARMUP = 5
_TUNE_N_GRAPH_LAUNCHES = 10
_TUNE_N_OPS_PER_GRAPH = 100
_CANDIDATE_NBLOCKS = [4, 8, 16, 24, 32, 48, 64, 128]
_TUNE_N_WARMUP = 3
_TUNE_N_GRAPH_LAUNCHES = 5
_TUNE_N_OPS_PER_GRAPH = 50
_CANDIDATE_NBLOCKS = [4, 8, 16, 24, 32, 48, 64, 112, 128]
_CANDIDATE_NTHREADS = [512, 768, 1024]
_NBLOCKS_LIMIT = {
"default_allreduce_nvls_packet": 16,
"default_allreduce_packet": 56,
"default_allreduce_nvls_zero_copy": 32,
"default_allreduce_packet": 112,
"default_allreduce_allpair_packet": 56,
"default_allreduce_rsag": 128,
"default_allreduce_rsag_zero_copy": 128,
"default_allreduce_fullmesh": 64,
"default_allgather_fullmesh2": 32,
}
# (algo_name, min_size, max_size, predicate)
# Boundaries are inclusive on both ends. max_size=None means unbounded.
# predicate=None means always applicable; otherwise a callable taking `self`.
_AR_CANDIDATES_MNNVL = [
("default_allreduce_allpair_packet", 0, 128 << 10, None),
("default_allreduce_nvls_packet", 0, 64 << 10, lambda c: c._nvls),
("default_allreduce_packet", 128 << 10, 512 << 10, None),
("default_allreduce_nvls_zero_copy", 512 << 10, None, lambda c: c._nvls and c.symmetric_memory),
("default_allreduce_rsag_zero_copy", 512 << 10, None, None),
("default_allreduce_rsag", 512 << 10, None, None),
]
_AR_CANDIDATES_SINGLE = [
("default_allreduce_packet", 0, 4 << 20, None),
("default_allreduce_allpair_packet", 0, 512 << 10, None),
("default_allreduce_nvls_packet", 0, 512 << 10, lambda c: c._nvls),
("default_allreduce_rsag_zero_copy", 512 << 10, None, None),
("default_allreduce_nvls_zero_copy", 512 << 10, None, lambda c: c._nvls and c.symmetric_memory),
("default_allreduce_fullmesh", 0, None, lambda c: torch.version.hip is not None),
]

def __init__(self, comm: mscclpp.CommGroup, symmetric_memory: bool = False):
self.comm = comm
self.rank = comm.my_rank
self.world_size = comm.nranks
self.nranks_per_node = comm.nranks_per_node
self.ipc_domain_n_ranks = comm.ipc_domain_n_ranks
self.multi_host_mnnvl = self.ipc_domain_n_ranks >= self.world_size and self.world_size > self.nranks_per_node
self.symmetric_memory = symmetric_memory
self._nvls = mscclpp.is_nvls_supported()

Expand All @@ -99,13 +114,12 @@ def __init__(self, comm: mscclpp.CommGroup, symmetric_memory: bool = False):
self._time_buf = None

def _algo(self, collective: str, name: str):
return self._algos.get((collective, name))
return self._algos[(collective, name)]

def _default_ar_config(self):
"""Fallback allreduce config for barrier / timing sync."""
pkt = self._algo("allreduce", "default_allreduce_nvls_packet")
if self._nvls and pkt:
return (pkt, 0, 0)
if self._nvls:
return (self._algo("allreduce", "default_allreduce_nvls_packet"), 0, 0)
return (self._algo("allreduce", "default_allreduce_packet"), 0, 0)

# -- low-level execute --
Expand Down Expand Up @@ -165,33 +179,17 @@ def _ensure_tune_bufs(self):
return self._tune_buf

def _ar_candidates(self, size: int):
out = []
if size <= 4 << 20:
a = self._algo("allreduce", "default_allreduce_nvls_packet")
if self._nvls and a:
out.append(a)
a = self._algo("allreduce", "default_allreduce_packet")
if a:
out.append(a)
a = self._algo("allreduce", "default_allreduce_allpair_packet")
if a:
out.append(a)
if size >= 512 << 10:
a = self._algo("allreduce", "default_allreduce_nvls_zero_copy")
if self._nvls and self.symmetric_memory and a:
out.append(a)
a = self._algo("allreduce", "default_allreduce_rsag_zero_copy")
if a:
out.append(a)
if torch.version.hip is not None:
a = self._algo("allreduce", "default_allreduce_fullmesh")
if a:
out.append(a)
return out
table = self._AR_CANDIDATES_MNNVL if self.multi_host_mnnvl else self._AR_CANDIDATES_SINGLE
return [
self._algo("allreduce", name)
for name, lo, hi, pred in table
if size >= lo and (hi is None or size <= hi) and (pred is None or pred(self))
]

def _ag_candidates(self):
a = self._algo("allgather", "default_allgather_fullmesh2")
return [a] if a else []
if self.multi_host_mnnvl:
return []
return [self._algo("allgather", "default_allgather_fullmesh2")]

def _run_tune(self, collective, algo, buf, size, nb, nt):
"""Single tune invocation for either collective."""
Expand All @@ -207,7 +205,7 @@ def _run_tune(self, collective, algo, buf, size, nb, nt):
stream=torch.cuda.current_stream().cuda_stream,
nblocks=nb,
nthreads_per_block=nt,
symmetric_memory=True,
symmetric_memory=self.symmetric_memory,
)
else:
total = size * self.world_size
Expand Down Expand Up @@ -245,7 +243,7 @@ def _tune_size(self, collective: str, target_size: int):
ret = run(algo, nb, nt)
torch.cuda.synchronize()
self._time_buf[0] = float(ret)
self._exec_ar(self._time_buf[:1], *self._default_ar_config(), sym=True)
self._exec_ar(self._time_buf[:1], *self._default_ar_config(), sym=self.symmetric_memory)
if self._time_buf[0].item() != 0:
continue
used.add(algo)
Expand Down Expand Up @@ -274,7 +272,7 @@ def _tune_size(self, collective: str, target_size: int):
# Cross-rank timing sync
self._time_buf.fill_(elapsed)
torch.cuda.current_stream().wait_stream(cs)
self._exec_ar(self._time_buf, *self._default_ar_config(), sym=True)
self._exec_ar(self._time_buf, *self._default_ar_config(), sym=self.symmetric_memory)
avg = self._time_buf[self.rank].item() / self.world_size

if avg < best_time:
Expand Down Expand Up @@ -314,6 +312,8 @@ def all_reduce(self, tensor, op=torch.distributed.ReduceOp.SUM, stream=None, acc
)

def all_gather(self, output_tensor, input_tensor, stream=None):
if self.multi_host_mnnvl:
raise RuntimeError("all_gather in this example currently supports only single-node runs")
sz = _round_pow2(input_tensor.nbytes)
if sz not in self._tune_cache["allgather"]:
self._tune_size("allgather", sz)
Expand Down Expand Up @@ -341,7 +341,7 @@ def _bench_sizes(low=5 * 1024, high=80 << 20):


def benchmark_allreduce(
comm: CustomizedComm, dtype=torch.float16, accum_dtype=None, n_warmup=10, n_graph_launches=10, n_iter=100
comm: CustomizedComm, dtype=torch.float16, accum_dtype=None, n_warmup=5, n_graph_launches=5, n_iter=50
):
sizes = _bench_sizes()
if comm.rank == 0:
Expand Down Expand Up @@ -382,7 +382,7 @@ def benchmark_allreduce(
print(f"{nelems:<18} {size:<18} {ms*1000:<18.2f} {size/(ms*1e-3)/1e9:<18.2f}")


def benchmark_allgather(comm: CustomizedComm, dtype=torch.float16, n_warmup=10, n_graph_launches=10, n_iter=100):
def benchmark_allgather(comm: CustomizedComm, dtype=torch.float16, n_warmup=5, n_graph_launches=5, n_iter=50):
sizes = _bench_sizes()
if comm.rank == 0:
print(f"\n{'='*60}\nAllgather Benchmark\n{'='*60}")
Expand Down Expand Up @@ -432,41 +432,38 @@ def benchmark_allgather(comm: CustomizedComm, dtype=torch.float16, n_warmup=10,


def init_dist() -> mscclpp.CommGroup:
addr = os.environ.get("MSCCLPP_MASTER_ADDR")
if addr:
rank, world = int(os.environ["RANK"]), int(os.environ["WORLD_SIZE"])
port = os.environ["MSCCLPP_MASTER_PORT"]
iface = _interfaces_for_ip(addr)
if not iface:
raise ValueError(f"No interface for {addr}")
return mscclpp.CommGroup(interfaceIpPortTrio=f"{iface}:{addr}:{port}", rank=rank, size=world)
import torch.distributed as dist

dist.init_process_group(backend="gloo")
return mscclpp.CommGroup(torch_group=dist.group.WORLD)
return mscclpp.CommGroup(mpi_comm=MPI.COMM_WORLD)


def main():
local = int(os.environ["LOCAL_RANK"])
local = MPI.COMM_WORLD.Split_type(MPI.COMM_TYPE_SHARED).Get_rank()
torch.cuda.set_device(local)

dtype_str = os.environ.get("DTYPE", "float16")
dtype = getattr(torch, dtype_str, torch.float16)
accum_map = {"float32": mscclpp.DataType.float32, "float16": mscclpp.DataType.float16}
accum_str = os.environ.get("ACCUM_DTYPE")
accum_dtype = accum_map.get(accum_str) if accum_str else None
symmetric_memory = os.environ.get("SYMMETRIC_MEMORY", "1") == "1"

comm_group = init_dist()
cc = CustomizedComm(comm_group)
cc = CustomizedComm(comm_group, symmetric_memory=symmetric_memory)

print(f"rank {local} starting benchmarks with dtype={dtype} accum_dtype={accum_dtype}...")
print(
f"rank {local} starting benchmarks with dtype={dtype} "
f"accum_dtype={accum_dtype} symmetric_memory={symmetric_memory}..."
)
benchmark_allreduce(cc, dtype=dtype, accum_dtype=accum_dtype)
cc.barrier()
torch.cuda.synchronize()

benchmark_allgather(cc, dtype=dtype)
cc.barrier()
torch.cuda.synchronize()
if cc.multi_host_mnnvl:
if cc.rank == 0:
print("Skipping allgather benchmark on multi-node: this example's allgather path is single-node only.")
else:
benchmark_allgather(cc, dtype=dtype)
cc.barrier()
torch.cuda.synchronize()

cc.destroy()
print(f"rank {local} completed successfully.")
Expand Down
7 changes: 7 additions & 0 deletions include/mscclpp/core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ class Bootstrap {
/// @return The total number of ranks per node.
virtual int getNranksPerNode() const = 0;

/// Return the number of ranks in this rank's GPU IPC domain.
/// @return The number of ranks in the GPU IPC domain.
virtual int getNranksPerIpcDomain() const;

Comment thread
Binyang2014 marked this conversation as resolved.
/// Send arbitrary data to another process.
///
/// Data sent via `send(senderBuff, size, receiverRank, tag)` can be received via `recv(receiverBuff, size,
Expand Down Expand Up @@ -144,6 +148,9 @@ class TcpBootstrap : public Bootstrap {
/// Return the total number of ranks per node.
int getNranksPerNode() const override;

/// Return the number of ranks in this rank's GPU IPC domain.
int getNranksPerIpcDomain() const override;

/// Send arbitrary data to another process.
///
/// Data sent via `send(senderBuff, size, receiverRank, tag)` can be received via `recv(receiverBuff, size,
Expand Down
1 change: 1 addition & 0 deletions include/mscclpp/gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ using CUmemorytype = hipMemoryType;
constexpr auto cudaErrorPeerAccessAlreadyEnabled = hipErrorPeerAccessAlreadyEnabled;
constexpr auto cudaErrorContextIsDestroyed = hipErrorContextIsDestroyed;
constexpr auto cudaErrorInvalidDevice = hipErrorInvalidDevice;
constexpr auto cudaErrorInvalidValue = hipErrorInvalidValue;
constexpr auto cudaSuccess = hipSuccess;
constexpr auto cudaErrorNotSupported = hipErrorNotSupported;
constexpr auto cudaStreamNonBlocking = hipStreamNonBlocking;
Expand Down
Loading
Loading