diff --git a/conftest.py b/conftest.py index 93a74f1a3..8d63777e0 100644 --- a/conftest.py +++ b/conftest.py @@ -241,22 +241,22 @@ def pytest_configure(config): # at the end, so the combination is now safe. -def pytest_collection_modifyitems(session, config, items): +def pytest_collection_modifyitems(session, config, items): # noqa: PLR0912 """Skip ST tests based on --platform, --runtime, --level filters; order L3 before L2.""" platform = config.getoption("--platform") runtime_filter = config.getoption("--runtime") level_filter = config.getoption("--level") - # Orchestrator L3 children set PTO_TARGET_NODEID to the single case they - # were dispatched for. Pytest's --case filter runs inside test_run (too - # late — other classes' st_worker fixtures already fired at setup). Skip - # everything except the target nodeid so the child stays narrow even when - # the parent invocation had broad positional args like ``examples tests/st``. - target_nodeid = os.environ.get("PTO_TARGET_NODEID") - if target_nodeid: + # When --level is active, only SceneTestCase items with a matching + # _st_level should run. Skip every non-SceneTestCase item — resource + # tests run in their own Resource phase, and other standalone tests + # (e.g. test_hello_worker) must not leak into level-filtered runs. + if level_filter is not None: for item in items: - if item.nodeid != target_nodeid: - item.add_marker(pytest.mark.skip(reason=f"dispatcher target is {target_nodeid}")) + if any(m.name == "skip" for m in item.iter_markers()): + continue + if getattr(item, "cls", None) is None: + item.add_marker(pytest.mark.skip(reason=f"standalone test, not level {level_filter}")) # Sort: L3 tests first (they fork child processes that inherit main process CANN state, # so they must run before L2 tests pollute the CANN context). @@ -341,25 +341,19 @@ def _collect_st_runtimes(items, level=None): def _collect_l3_cases(items, platform): - """Collect one job per L3 class (not per case). + """Collect one job per L3 ``SceneTestCase`` class (not per case). Returns a list of tuples ``(nodeid, cls_name, runtime, max_device_count)`` where ``max_device_count`` is the maximum ``device_count`` across the - class's matching cases. Per-class dispatch matches the ``st_worker`` - fixture's contract (it allocates ``max(CASES.device_count)`` for the whole - class) — dispatching per-case with a smaller device budget would trip the - fixture whenever the class also has a case that needs more devices. - - Cases within a class still run in the child process via the existing - ``test_run`` case loop, reusing the Worker (layer-4 reuse). + class's matching cases. """ by_nodeid: dict[str, tuple[str, str, int]] = {} for item in items: + if any(m.name == "skip" for m in item.iter_markers()): + continue cls = getattr(item, "cls", None) if not cls or getattr(cls, "_st_level", None) != 3: continue - if any(m.name == "skip" for m in item.iter_markers()): - continue rt = getattr(cls, "_st_runtime", None) if not rt: continue @@ -369,7 +363,7 @@ def _collect_l3_cases(items, platform): if platform and platform not in case.get("platforms", []): continue if case.get("manual"): - continue # --manual exclude is the default; children honor the flag + continue saw_case = True max_dev = max(max_dev, int(case.get("config", {}).get("device_count", 1))) if saw_case: @@ -377,6 +371,35 @@ def _collect_l3_cases(items, platform): return [(nodeid, cls_name, rt, dev) for nodeid, (cls_name, rt, dev) in by_nodeid.items()] +def _collect_resource_cases(items, platform): + """Collect non-``SceneTestCase`` pytest functions that declare resource needs. + + Returns a list of tuples ``(nodeid, func_name, runtime, device_count)``. + These run in their own dispatch phase — they don't participate in + level-based dispatch. A function must carry both + ``@pytest.mark.device_count(n)`` and ``@pytest.mark.runtime("...")``. + """ + by_nodeid: dict[str, tuple[str, str, int]] = {} + for item in items: + if any(m.name == "skip" for m in item.iter_markers()): + continue + cls = getattr(item, "cls", None) + if cls is not None: + continue + dev_marker = item.get_closest_marker("device_count") + if dev_marker is None: + continue + rt_marker = item.get_closest_marker("runtime") + if rt_marker is None or not rt_marker.args: + continue + platforms_marker = item.get_closest_marker("platforms") + if platforms_marker and platform and platform not in platforms_marker.args[0]: + continue + dev_count = int(dev_marker.args[0]) if dev_marker.args else 1 + by_nodeid[item.nodeid] = (item.name, rt_marker.args[0], dev_count) + return [(nodeid, label, rt, dev) for nodeid, (label, rt, dev) in by_nodeid.items()] + + def _base_pytest_argv(session): """Inherit the user's original pytest invocation args.""" base = [sys.executable, "-m", "pytest"] @@ -401,8 +424,8 @@ def _resolve_max_parallel(cfg, platform: str, device_ids: list[int]) -> int: return val -def _dispatch_test_phases(session): - """Run L3 phase (device-parallel) then L2 phase (per-runtime subprocess).""" +def _dispatch_test_phases(session): # noqa: PLR0912 + """Run L3 → Standalone → L2 phases.""" from simpler_setup import parallel_scheduler as _ps # noqa: PLC0415 cfg = session.config @@ -426,7 +449,16 @@ def _dispatch_test_phases(session): label = f"L3 {cls_name} (rt={rt}, dev={dev_count})" def _build(ids, _nodeid=nodeid, _rt=rt): - return base_args + [ + # L3 subprocess: only the specific test, not the inherited + # directory args (examples tests/st). Passing the directories + # would collect every same-level SceneTestCase and run them + # inside this subprocess, which has only dev_count devices — + # TestL3Group (needs 2) would fail inside TestL3ChildMemory's + # subprocess (allocated 1). + cmd = [ + sys.executable, + "-m", + "pytest", _nodeid, "--runtime", _rt, @@ -435,21 +467,15 @@ def _build(ids, _nodeid=nodeid, _rt=rt): "--device", _ps.format_device_range(ids), ] + if platform: + cmd.extend(["--platform", platform]) + return cmd - # PTO_TARGET_NODEID makes the child skip every item except this - # nodeid — defends against inherited positional args (``examples``, - # ``tests/st``) collecting unrelated classes whose fixtures would - # then fire at setup and fail on the narrower child device pool. # SIMPLER_PERF_OUTPUT_DIR scopes this L3 case's perf files to its own - # subdir so concurrent L3 cases can't collide on filename (the - # runtime's timestamp is second-precision). Anchor to cfg.rootpath - # so the C++ runtime and Python post-processing agree regardless - # of the child's CWD. Use a nodeid-derived sanitized label so the - # dir name stays readable for post-mortem. + # subdir so concurrent L3 cases can't collide on filename. safe_nodeid = nodeid.replace("/", "_").replace(":", "_").replace(".", "_") child_env = { **os.environ, - "PTO_TARGET_NODEID": nodeid, "SIMPLER_PERF_OUTPUT_DIR": str(cfg.rootpath / "outputs" / f"perf_l3_{safe_nodeid}"), } jobs.append(_ps.Job(label=label, device_count=dev_count, build_cmd=_build, cwd=str(cwd), env=child_env)) @@ -527,6 +553,72 @@ def _on_done(res): else: print(f"\n--- L2 runtime {rt}: PASSED ---\n", flush=True) + # ----- Phase 3: Resource (non-SceneTestCase functions with device_count) ----- + resource_cases = _collect_resource_cases(session.items, platform) + resource_failed = False + if resource_cases: + jobs = [] + for nodeid, func_name, rt, dev_count in resource_cases: + label = f"resource {func_name} (rt={rt}, dev={dev_count})" + + def _build(ids, _nodeid=nodeid, _rt=rt): + # Resource subprocess: only the specific test, not the + # inherited directory args (examples tests/st). Passing the + # directories would collect every SceneTestCase as well and + # run them alongside the resource test inside the subprocess, + # causing isolation failures (e.g. test_explicit_fatal_reports + # wasn't designed to share a process with other tests). + cmd = [ + sys.executable, + "-m", + "pytest", + _nodeid, + "--runtime", + _rt, + "--device", + _ps.format_device_range(ids), + ] + if platform: + cmd.extend(["--platform", platform]) + return cmd + + safe_nodeid = nodeid.replace("/", "_").replace(":", "_").replace(".", "_") + child_env = { + **os.environ, + "SIMPLER_PERF_OUTPUT_DIR": str(cfg.rootpath / "outputs" / f"perf_rc_{safe_nodeid}"), + } + jobs.append(_ps.Job(label=label, device_count=dev_count, build_cmd=_build, cwd=str(cwd), env=child_env)) + + def _on_rc_done(res): + tag = "PASSED" if res.returncode == 0 else f"FAILED (rc={res.returncode})" + print(f"\n--- {res.label}: {tag} on devices {res.device_ids} ---\n", flush=True) + + print( + f"\n{'=' * 60}\n Resource phase: {len(jobs)} case(s), " + f"pool={device_ids}, max_parallel={max_parallel}\n{'=' * 60}\n", + flush=True, + ) + try: + results = _ps.run_jobs( + jobs, + device_ids, + max_parallel=max_parallel, + fail_fast=fail_fast, + on_job_done=_on_rc_done, + ) + except ValueError as e: + print(f"\n*** Resource phase ABORTED: {e} ***\n", flush=True) + session.testsfailed = 1 + return True + resource_failed = any(r.returncode != 0 for r in results) + if any(r.returncode == TIMEOUT_EXIT_CODE for r in results): + print("\n*** Resource phase: TIMED OUT ***\n", flush=True) + os._exit(TIMEOUT_EXIT_CODE) + + if resource_failed and fail_fast: + session.testsfailed = 1 + return True + # Flatten per-subprocess outputs/perf_*/ subdirs back to outputs/ so # downstream tools (swimlane_converter.py, CI artifact upload) find # everything in the historical location. Anchor to config.rootpath (not @@ -534,8 +626,8 @@ def _on_done(res): # still flushes files into the project's top-level outputs/. _ps.flatten_perf_subdirs(cfg.rootpath / "outputs") - session.testsfailed = 1 if (l3_failed or l2_failed) else 0 - if not (l3_failed or l2_failed): + session.testsfailed = 1 if (l3_failed or l2_failed or resource_failed) else 0 + if not (l3_failed or l2_failed or resource_failed): session.testscollected = sum(1 for _ in session.items) return True # returning True prevents default runtestloop @@ -549,8 +641,10 @@ def pytest_runtestloop(session): runtime_filter = session.config.getoption("--runtime") level_filter = session.config.getoption("--level") - # Child mode: the dispatcher's spawned subprocesses carry both flags. - if runtime_filter is not None and level_filter is not None: + # Child mode: if the caller filters by runtime or level, it wants direct + # control — don't re-enter the multi-phase dispatcher (which would cause + # nested dispatch, device pool exhaustion, and timeout). + if runtime_filter is not None or level_filter is not None: return # User explicitly asked for collect-only / scoped-run — don't orchestrate. diff --git a/examples/workers/l2/hello_worker/main.py b/examples/workers/l2/hello_worker/main.py index 5c486778b..fd5b56b9f 100644 --- a/examples/workers/l2/hello_worker/main.py +++ b/examples/workers/l2/hello_worker/main.py @@ -49,23 +49,23 @@ def parse_args() -> argparse.Namespace: return parser.parse_args() -def main() -> int: - args = parse_args() +def run(platform: str, device_id: int) -> int: + """Core logic — callable from both CLI and pytest.""" # Worker(level=2, ...) wraps a single C++ ChipWorker. Construction does NOT # load any binaries or touch the device — it just stashes config. The heavy # work happens in init(). worker = Worker( level=2, - platform=args.platform, + platform=platform, runtime="tensormap_and_ringbuffer", - device_id=args.device, + device_id=device_id, ) # init() resolves ``build/lib//tensormap_and_ringbuffer/*`` via # RuntimeBuilder, dlopens host_runtime.so, loads aicpu.so + aicore.o, and # calls aclrtSetDevice(device_id). If any of those fails this raises. - print(f"[hello_worker] init on {args.platform} device={args.device} ...") + print(f"[hello_worker] init on {platform} device={device_id} ...") worker.init() try: @@ -90,5 +90,10 @@ def main() -> int: return 0 +def main() -> int: + args = parse_args() + return run(args.platform, args.device) + + if __name__ == "__main__": sys.exit(main()) diff --git a/examples/workers/l2/hello_worker/test_hello_worker.py b/examples/workers/l2/hello_worker/test_hello_worker.py new file mode 100644 index 000000000..f93b7e67c --- /dev/null +++ b/examples/workers/l2/hello_worker/test_hello_worker.py @@ -0,0 +1,24 @@ +# Copyright (c) PyPTO Contributors. +# This program is free software, you can redistribute it and/or modify it under the terms and conditions of +# CANN Open Software License Agreement Version 2.0 (the "License"). +# Please refer to the License for details. You may not use this file except in compliance with the License. +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, +# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. +# See LICENSE in the root of the software repository for the full text of the License. +# ----------------------------------------------------------------------------------------------------------- +"""Hardware ST for examples/workers/l2/hello_worker.""" + +import os +from importlib.machinery import SourceFileLoader + +import pytest + +_main = SourceFileLoader("hello_worker_main", os.path.join(os.path.dirname(__file__), "main.py")).load_module() +run = _main.run + + +@pytest.mark.platforms(["a2a3sim", "a2a3", "a5sim", "a5"]) +@pytest.mark.runtime("tensormap_and_ringbuffer") +def test_hello_worker(st_platform, st_device_ids): + rc = run(st_platform, int(st_device_ids[0])) + assert rc == 0 diff --git a/examples/workers/l2/vector_add/main.py b/examples/workers/l2/vector_add/main.py index 008a76038..e5ba142b9 100644 --- a/examples/workers/l2/vector_add/main.py +++ b/examples/workers/l2/vector_add/main.py @@ -7,7 +7,7 @@ # INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. # See LICENSE in the root of the software repository for the full text of the License. # ----------------------------------------------------------------------------------------------------------- -"""L2 Worker API demo — compile one AIV kernel, run it, verify against numpy. +"""L2 Worker API demo — compile one AIV kernel, run it, verify against torch. Pipeline (what the @scene_test framework normally does for you): @@ -21,7 +21,7 @@ ▼ worker.run(chip_callable, task_args, cfg) │ - device result ──[worker.copy_from]──► host array ──[numpy compare] + device result ──[worker.copy_from]──► host array ──[torch compare] The code below walks through each stage explicitly so you can see what the ``@scene_test`` decorator hides. @@ -34,7 +34,9 @@ import os import sys -import numpy as np +os.environ.setdefault("KMP_DUPLICATE_LIB_OK", "TRUE") + +import torch # noqa: E402 from simpler.task_interface import ( ArgDirection, ChipCallable, @@ -56,7 +58,7 @@ N_ROWS = 128 N_COLS = 128 N_ELEMS = N_ROWS * N_COLS -NBYTES = N_ELEMS * np.dtype(np.float32).itemsize +NBYTES = N_ELEMS * 4 # float32 def parse_args() -> argparse.Namespace: @@ -124,22 +126,22 @@ def build_chip_callable(platform: str) -> ChipCallable: ) -def run(worker: Worker, chip_callable: ChipCallable) -> None: +def _run(worker: Worker, chip_callable: ChipCallable) -> None: """Allocate device memory, copy inputs, execute, copy outputs back, verify.""" # --- 1. Prepare host arrays --- - rng = np.random.default_rng(seed=42) - host_a = rng.standard_normal((N_ROWS, N_COLS), dtype=np.float32) - host_b = rng.standard_normal((N_ROWS, N_COLS), dtype=np.float32) + torch.manual_seed(42) + host_a = torch.randn(N_ROWS, N_COLS, dtype=torch.float32) + host_b = torch.randn(N_ROWS, N_COLS, dtype=torch.float32) expected = host_a + host_b - host_out = np.zeros((N_ROWS, N_COLS), dtype=np.float32) + host_out = torch.zeros(N_ROWS, N_COLS, dtype=torch.float32) # --- 2. Allocate device buffers + H2D copy --- # malloc returns a uint64 device pointer. copy_to takes (dst_dev, src_host, nbytes). dev_a = worker.malloc(NBYTES) dev_b = worker.malloc(NBYTES) dev_out = worker.malloc(NBYTES) - worker.copy_to(dev_a, host_a.ctypes.data, NBYTES) - worker.copy_to(dev_b, host_b.ctypes.data, NBYTES) + worker.copy_to(dev_a, host_a.data_ptr(), NBYTES) + worker.copy_to(dev_b, host_b.data_ptr(), NBYTES) # --- 3. Build TaskArgs describing the tensors visible to the orchestration --- # Each tensor is a ContinuousTensor(data_ptr, shape, dtype). Order must @@ -155,40 +157,45 @@ def run(worker: Worker, chip_callable: ChipCallable) -> None: worker.run(chip_callable, args, config) # --- 5. D2H copy back + verify --- - worker.copy_from(host_out.ctypes.data, dev_out, NBYTES) + worker.copy_from(host_out.data_ptr(), dev_out, NBYTES) # --- 6. Free device buffers. Order doesn't matter, but leaking is bad. --- worker.free(dev_a) worker.free(dev_b) worker.free(dev_out) - max_diff = float(np.max(np.abs(host_out - expected))) + max_diff = float(torch.max(torch.abs(host_out - expected))) print(f"[vector_add] max |host_out - expected| = {max_diff:.3e}") - np.testing.assert_allclose(host_out, expected, rtol=1e-5, atol=1e-5) - print("[vector_add] golden check PASSED ✅") + assert torch.allclose(host_out, expected, rtol=1e-5, atol=1e-5) + print("[vector_add] golden check PASSED") -def main() -> int: - args = parse_args() +def run(platform: str, device_id: int) -> int: + """Core logic — callable from both CLI and pytest.""" worker = Worker( level=2, - platform=args.platform, + platform=platform, runtime="tensormap_and_ringbuffer", - device_id=args.device, + device_id=device_id, ) - print(f"[vector_add] compiling kernels for {args.platform}...") - chip_callable = build_chip_callable(args.platform) + print(f"[vector_add] compiling kernels for {platform}...") + chip_callable = build_chip_callable(platform) print(f"[vector_add] compiled. binary_size={chip_callable.binary_size} bytes") - print(f"[vector_add] init worker (device={args.device})...") + print(f"[vector_add] init worker (device={device_id})...") worker.init() try: - run(worker, chip_callable) + _run(worker, chip_callable) finally: worker.close() return 0 +def main() -> int: + args = parse_args() + return run(args.platform, args.device) + + if __name__ == "__main__": sys.exit(main()) diff --git a/examples/workers/l2/vector_add/test_vector_add.py b/examples/workers/l2/vector_add/test_vector_add.py new file mode 100644 index 000000000..96de0e7c0 --- /dev/null +++ b/examples/workers/l2/vector_add/test_vector_add.py @@ -0,0 +1,24 @@ +# Copyright (c) PyPTO Contributors. +# This program is free software, you can redistribute it and/or modify it under the terms and conditions of +# CANN Open Software License Agreement Version 2.0 (the "License"). +# Please refer to the License for details. You may not use this file except in compliance with the License. +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, +# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. +# See LICENSE in the root of the software repository for the full text of the License. +# ----------------------------------------------------------------------------------------------------------- +"""Hardware ST for examples/workers/l2/vector_add.""" + +import os +from importlib.machinery import SourceFileLoader + +import pytest + +_main = SourceFileLoader("vector_add_main", os.path.join(os.path.dirname(__file__), "main.py")).load_module() +run = _main.run + + +@pytest.mark.platforms(["a2a3sim", "a2a3", "a5sim", "a5"]) +@pytest.mark.runtime("tensormap_and_ringbuffer") +def test_vector_add(st_platform, st_device_ids): + rc = run(st_platform, int(st_device_ids[0])) + assert rc == 0 diff --git a/examples/workers/l3/allreduce_distributed/kernels/aiv/allreduce_kernel.cpp b/examples/workers/l3/allreduce_distributed/kernels/aiv/allreduce_kernel.cpp new file mode 100644 index 000000000..75cfa6b24 --- /dev/null +++ b/examples/workers/l3/allreduce_distributed/kernels/aiv/allreduce_kernel.cpp @@ -0,0 +1,104 @@ +/* + * Copyright (c) PyPTO Contributors. + * This program is free software, you can redistribute it and/or modify it under the terms and conditions of + * CANN Open Software License Agreement Version 2.0 (the "License"). + * Please refer to the License for details. You may not use this file except in compliance with the License. + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, + * INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. + * See LICENSE in the root of the software repository for the full text of the License. + * ----------------------------------------------------------------------------------------------------------- + */ +/** + * AllReduce kernel for simpler's kernel_entry signature. + * + * Every rank independently reads all ranks' inputs from the RDMA window, + * computes the element-wise sum, and writes the result to its own output. + * This is a symmetric allreduce — no designated root, all ranks active. + * + * args layout (all uint64_t, cast as needed): + * args[0] = __gm__ float* input (device addr in RDMA window) + * args[1] = __gm__ float* output (device addr, local) + * args[2] = int nranks + * args[3] = (unused, kept for ABI compatibility) + * args[4] = __gm__ CommContext* ctx (device addr) + */ + +#include +#include +#include "pto/comm/comm_types.hpp" +#include "pto/comm/pto_comm_inst.hpp" +#include "platform_comm/comm_context.h" + +#ifndef __gm__ +#define __gm__ +#endif + +#ifndef __aicore__ +#define __aicore__ [aicore] +#endif + +static constexpr size_t ALLREDUCE_COUNT = 256; +static constexpr int kMaxSupportedRanks = 16; + +template +AICORE inline __gm__ T *CommRemotePtr(__gm__ CommContext *ctx, __gm__ T *localPtr, int pe) { + uint64_t localBase = ctx->windowsIn[ctx->rankId]; + uint64_t offset = (uint64_t)localPtr - localBase; + return (__gm__ T *)(ctx->windowsIn[pe] + offset); +} + +extern "C" __aicore__ __attribute__((always_inline)) void kernel_entry(__gm__ int64_t *args) { + __gm__ float *input = reinterpret_cast<__gm__ float *>(args[0]); + __gm__ float *output = reinterpret_cast<__gm__ float *>(args[1]); + int nranks = static_cast(args[2]); + int root = static_cast(args[3]); + __gm__ CommContext *commCtx = reinterpret_cast<__gm__ CommContext *>(args[4]); + + using ShapeDyn = pto::Shape; + using StrideDyn = pto::Stride; + using Global = pto::GlobalTensor; + using TileData = pto::Tile; + + int my_rank = static_cast(commCtx->rankId); + + ShapeDyn shape(1, 1, 1, 1, ALLREDUCE_COUNT); + StrideDyn stride(ALLREDUCE_COUNT, ALLREDUCE_COUNT, ALLREDUCE_COUNT, ALLREDUCE_COUNT, 1); + + TileData accTile(1, ALLREDUCE_COUNT); + TileData recvTile(1, ALLREDUCE_COUNT); + TASSIGN(accTile, 0x0); + TASSIGN(recvTile, 0x10000); + + if (nranks <= 0 || nranks > kMaxSupportedRanks) { + pipe_barrier(PIPE_ALL); + return; + } + + // Every rank reads all inputs and sums them into its own output. + Global outputG(output, shape, stride); + + __gm__ float *firstInput = CommRemotePtr(commCtx, input, 0); + Global firstG(firstInput, shape, stride); + TLOAD(accTile, firstG); + set_flag(PIPE_MTE2, PIPE_V, EVENT_ID0); + wait_flag(PIPE_MTE2, PIPE_V, EVENT_ID0); + + for (int r = 1; r < nranks; ++r) { + __gm__ float *remoteInput = CommRemotePtr(commCtx, input, r); + Global remoteG(remoteInput, shape, stride); + TLOAD(recvTile, remoteG); + set_flag(PIPE_MTE2, PIPE_V, EVENT_ID1); + wait_flag(PIPE_MTE2, PIPE_V, EVENT_ID1); + TADD(accTile, accTile, recvTile); + set_flag(PIPE_V, PIPE_MTE2, EVENT_ID0); + wait_flag(PIPE_V, PIPE_MTE2, EVENT_ID0); + } + + set_flag(PIPE_V, PIPE_MTE3, EVENT_ID0); + wait_flag(PIPE_V, PIPE_MTE3, EVENT_ID0); + TSTORE(outputG, accTile); + set_flag(PIPE_MTE3, PIPE_MTE2, EVENT_ID0); + wait_flag(PIPE_MTE3, PIPE_MTE2, EVENT_ID0); + + pipe_barrier(PIPE_ALL); +} diff --git a/examples/workers/l3/allreduce_distributed/kernels/orchestration/allreduce_orch.cpp b/examples/workers/l3/allreduce_distributed/kernels/orchestration/allreduce_orch.cpp new file mode 100644 index 000000000..1dd8e8f45 --- /dev/null +++ b/examples/workers/l3/allreduce_distributed/kernels/orchestration/allreduce_orch.cpp @@ -0,0 +1,52 @@ +/* + * Copyright (c) PyPTO Contributors. + * This program is free software, you can redistribute it and/or modify it under the terms and conditions of + * CANN Open Software License Agreement Version 2.0 (the "License"). + * Please refer to the License for details. You may not use this file except in compliance with the License. + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, + * INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. + * See LICENSE in the root of the software repository for the full text of the License. + * ----------------------------------------------------------------------------------------------------------- + */ +/** + * AllReduce orchestration — all-scalar args path. + * + * The kernel reads raw uint64 values from args[] (device pointers into the + * HCCL window + a few ints) and does its own CommRemotePtr math. Wrapping + * the pointers as tensors would force the framework to rewrite them as + * Tensor-struct pointers, breaking that math. So every arg goes through + * add_scalar, and the orchestration forwards them 1:1. + * + * scalar layout (from Python orch_fn via ChipStorageTaskArgs): + * [0] input device pointer (HCCL window, remote-addressable) + * [1] output device pointer (HCCL window, local write) + * [2] nranks + * [3] root rank (unused in symmetric allreduce, kept for ABI) + * [4] CommContext device pointer + */ + +#include + +#include "pto_orchestration_api.h" + +extern "C" { + +__attribute__((visibility("default"))) PTO2OrchestrationConfig +allreduce_orchestration_config(const ChipStorageTaskArgs &orch_args) { + (void)orch_args; + return PTO2OrchestrationConfig{ + .expected_arg_count = 5, + }; +} + +__attribute__((visibility("default"))) void allreduce_orchestration(const ChipStorageTaskArgs &orch_args) { + Arg params; + params.add_scalar(orch_args.scalar(0)); + params.add_scalar(orch_args.scalar(1)); + params.add_scalar(orch_args.scalar(2)); + params.add_scalar(orch_args.scalar(3)); + params.add_scalar(orch_args.scalar(4)); + pto2_rt_submit_aiv_task(0, params); +} + +} // extern "C" diff --git a/examples/workers/l3/allreduce_distributed/main.py b/examples/workers/l3/allreduce_distributed/main.py new file mode 100644 index 000000000..8cb17a81c --- /dev/null +++ b/examples/workers/l3/allreduce_distributed/main.py @@ -0,0 +1,284 @@ +#!/usr/bin/env python3 +# Copyright (c) PyPTO Contributors. +# This program is free software, you can redistribute it and/or modify it under the terms and conditions of +# CANN Open Software License Agreement Version 2.0 (the "License"). +# Please refer to the License for details. You may not use this file except in compliance with the License. +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, +# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. +# See LICENSE in the root of the software repository for the full text of the License. +# ----------------------------------------------------------------------------------------------------------- +"""End-to-end distributed allreduce over the Worker(chip_bootstrap_configs=...) path. + +The kernel (ported verbatim from #307) reads every rank's contribution out of +the HCCL window via CommRemotePtr and sums them into each rank's own window +slot. This example exercises the full L1a..L6 stack: + + L1a HCCL backend comm_init / comm_alloc_windows + L1b ChipWorker.comm_* wrappers host-side bootstrap of the communicator + L2 ChipBootstrapChannel chip child publishes SUCCESS to the parent + L3 mailbox atomics parent/child sync without torn reads + L4 error propagation bootstrap failures raise from Worker.init() + L5 ChipWorker.bootstrap_context one-shot per-chip bring-up + L6 Worker(chip_bootstrap_configs=[...]) Worker-level orchestration + +Hardware only. The sim backend's CommRemotePtr uses a different addressing +scheme; sim support is out of scope for this demo. + +Run: + python examples/workers/l3/allreduce_distributed/main.py -d 0-1 +""" + +from __future__ import annotations + +import argparse +import os +import struct +import sys +from multiprocessing.shared_memory import SharedMemory + +from simpler.task_interface import ( + ChipBootstrapConfig, + ChipBufferSpec, + ChipCallable, + ChipCallConfig, + ChipCommBootstrapConfig, + ChipContext, + CoreCallable, + HostBufferStaging, + TaskArgs, +) +from simpler.worker import Worker + +from simpler_setup.elf_parser import extract_text_section +from simpler_setup.kernel_compiler import KernelCompiler +from simpler_setup.pto_isa import ensure_pto_isa_root + +HERE = os.path.dirname(os.path.abspath(__file__)) + +# Must match ALLREDUCE_COUNT in kernels/aiv/allreduce_kernel.cpp. +ALLREDUCE_COUNT = 256 +DTYPE_NBYTES = 4 # float32 + + +def parse_device_range(spec: str) -> list[int]: + if "-" in spec: + lo, hi = (int(x) for x in spec.split("-")) + ids = list(range(lo, hi + 1)) + else: + ids = [int(spec)] + if len(ids) != 2: + raise ValueError(f"allreduce_distributed needs exactly 2 devices, got {ids}") + return ids + + +def build_chip_callable(platform: str) -> ChipCallable: + """Compile the AIV allreduce kernel + its C++ orchestration shim. + + The orchestration forwards 5 scalars (input_ptr, output_ptr, nranks, root, + device_ctx) as-is, so the signature slot list is empty and all args flow + through TaskArgs.add_scalar at submission time. + """ + kc = KernelCompiler(platform=platform) + runtime = "tensormap_and_ringbuffer" + pto_isa_root = ensure_pto_isa_root(clone_protocol="https") + include_dirs = kc.get_orchestration_include_dirs(runtime) + + # The kernel resolves CommContext from "platform_comm/comm_context.h", + # which lives under src/common/. Add that directory on top of the runtime + # include set so the kernel compile can see it. + kernel_include_dirs = list(include_dirs) + [str(kc.project_root / "src" / "common")] + kernel_bytes = kc.compile_incore( + source_path=os.path.join(HERE, "kernels/aiv/allreduce_kernel.cpp"), + core_type="aiv", + pto_isa_root=pto_isa_root, + extra_include_dirs=kernel_include_dirs, + ) + # Hardware path: strip the ELF down to the .text section the loader wants. + kernel_bytes = extract_text_section(kernel_bytes) + + orch_bytes = kc.compile_orchestration( + runtime_name=runtime, + source_path=os.path.join(HERE, "kernels/orchestration/allreduce_orch.cpp"), + ) + core_callable = CoreCallable.build(signature=[], binary=kernel_bytes) + return ChipCallable.build( + signature=[], + func_name="allreduce_orchestration", + binary=orch_bytes, + children=[(0, core_callable)], + ) + + +def make_rank_input(rank: int) -> list[float]: + """Rank r contributes input[i] = i + r*100; matches PR #307's golden.""" + return [float(i + rank * 100) for i in range(ALLREDUCE_COUNT)] + + +def expected_output(nranks: int) -> list[float]: + """output[i] = sum_r (i + r*100) = nranks*i + 100 * nranks*(nranks-1)/2.""" + return [float(nranks * i + 100 * nranks * (nranks - 1) // 2) for i in range(ALLREDUCE_COUNT)] + + +def pack_f32(values: list[float]) -> bytes: + return struct.pack(f"<{len(values)}f", *values) + + +def run(device_ids: list[int]) -> int: + """Core logic — callable from both CLI and pytest.""" + nranks = len(device_ids) + buffer_nbytes = ALLREDUCE_COUNT * DTYPE_NBYTES + window_size = 4 * 1024 * 1024 # HCCL may round up; actual size surfaces via ChipContext. + + rootinfo_path = f"/tmp/pto_allreduce_distributed_rootinfo_{os.getpid()}.bin" + try: + os.unlink(rootinfo_path) + except FileNotFoundError: + pass + + print(f"[allreduce] devices={device_ids} nranks={nranks}") + + # Per-rank input SharedMemory — parent writes the bytes, child reads via + # HostBufferStaging during bootstrap_context(). Parent unlinks right + # after worker.init() returns (child has already finished copy_to at that + # point). + input_shms: list[SharedMemory] = [] + output_shms: list[SharedMemory] = [] + for rank in range(nranks): + shm = SharedMemory(create=True, size=buffer_nbytes) + assert shm.buf is not None + shm.buf[:buffer_nbytes] = pack_f32(make_rank_input(rank)) + input_shms.append(shm) + + out_shm = SharedMemory(create=True, size=buffer_nbytes) + output_shms.append(out_shm) + + cfgs = [ + ChipBootstrapConfig( + comm=ChipCommBootstrapConfig( + rank=rank, + nranks=nranks, + rootinfo_path=rootinfo_path, + window_size=window_size, + ), + buffers=[ + ChipBufferSpec( + name="input", + dtype="float32", + count=ALLREDUCE_COUNT, + placement="window", + nbytes=buffer_nbytes, + load_from_host=True, + ), + ChipBufferSpec( + name="output", + dtype="float32", + count=ALLREDUCE_COUNT, + placement="window", + nbytes=buffer_nbytes, + store_to_host=True, + ), + ], + host_inputs=[HostBufferStaging(name="input", shm_name=input_shms[rank].name, size=buffer_nbytes)], + host_outputs=[HostBufferStaging(name="output", shm_name=output_shms[rank].name, size=buffer_nbytes)], + ) + for rank in range(nranks) + ] + + print("[allreduce] compiling kernels...") + chip_callable = build_chip_callable("a2a3") + + worker = Worker( + level=3, + platform="a2a3", + runtime="tensormap_and_ringbuffer", + device_ids=device_ids, + num_sub_workers=0, + chip_bootstrap_configs=cfgs, + ) + + try: + print("[allreduce] init worker (forks chip children + bootstraps HCCL)...") + worker.init() + + # Child has copied input from shm into the window by now. Drop our + # copies so the shm segments don't outlive the run. + for shm in input_shms: + shm.close() + shm.unlink() + input_shms.clear() + + contexts: list[ChipContext] = worker.chip_contexts + assert len(contexts) == nranks + for i, ctx in enumerate(contexts): + print( + f"[allreduce] chip {i}: device={ctx.device_id} rank={ctx.rank}/{ctx.nranks} " + f"window=[0x{ctx.local_window_base:x} +{ctx.actual_window_size}B] " + f"buffers={ {k: hex(v) for k, v in ctx.buffer_ptrs.items()} }" + ) + + def orch_fn(orch, _args, cfg): + # One chip task per rank. All args pass as scalars because the + # kernel reinterpret_casts args[i] as raw device pointers — an + # approach the Tensor path would corrupt (it rewrites pointers + # into Tensor-struct addresses). + for i, ctx in enumerate(contexts): + chip_args = TaskArgs() + chip_args.add_scalar(ctx.buffer_ptrs["input"]) + chip_args.add_scalar(ctx.buffer_ptrs["output"]) + chip_args.add_scalar(ctx.nranks) + chip_args.add_scalar(0) # root (symmetric allreduce ignores it) + chip_args.add_scalar(ctx.device_ctx) + orch.submit_next_level(chip_callable, chip_args, cfg, worker=i) + + print("[allreduce] running 2-chip allreduce DAG...") + worker.run(orch_fn, args=None, config=ChipCallConfig()) + + # Child has flushed store_to_host buffers to SharedMemory by now. + expected = expected_output(nranks) + ok = True + for i in range(nranks): + out_shm = output_shms[i] + assert out_shm.buf is not None + got = list(struct.unpack(f"<{ALLREDUCE_COUNT}f", bytes(out_shm.buf[:buffer_nbytes]))) + + max_diff = max(abs(a - b) for a, b in zip(got, expected)) + print(f"[allreduce] chip {i}: max |out - expected| = {max_diff:.3e}") + if max_diff > 1e-3: + ok = False + for j in range(min(4, ALLREDUCE_COUNT)): + print(f" output[{j}]={got[j]!r} expected={expected[j]!r}") + + if not ok: + print("[allreduce] golden check FAILED") + return 1 + print("[allreduce] all ranks matched golden ✅") + return 0 + finally: + worker.close() + for shm in input_shms: + try: + shm.close() + shm.unlink() + except FileNotFoundError: + pass + for shm in output_shms: + try: + shm.close() + shm.unlink() + except FileNotFoundError: + pass + try: + os.unlink(rootinfo_path) + except FileNotFoundError: + pass + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument("-d", "--device", default="0-1", help="Device range, e.g. '0-1'. Two chips required.") + cli = parser.parse_args() + return run(parse_device_range(cli.device)) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/examples/workers/l3/allreduce_distributed/test_allreduce.py b/examples/workers/l3/allreduce_distributed/test_allreduce.py new file mode 100644 index 000000000..02b3d06c2 --- /dev/null +++ b/examples/workers/l3/allreduce_distributed/test_allreduce.py @@ -0,0 +1,26 @@ +# Copyright (c) PyPTO Contributors. +# This program is free software, you can redistribute it and/or modify it under the terms and conditions of +# CANN Open Software License Agreement Version 2.0 (the "License"). +# Please refer to the License for details. You may not use this file except in compliance with the License. +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, +# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. +# See LICENSE in the root of the software repository for the full text of the License. +# ----------------------------------------------------------------------------------------------------------- +"""Hardware ST for examples/workers/l3/allreduce_distributed.""" + +import os +from importlib.machinery import SourceFileLoader + +import pytest + +_main = SourceFileLoader("allreduce_distributed_main", os.path.join(os.path.dirname(__file__), "main.py")).load_module() +run = _main.run + + +@pytest.mark.requires_hardware +@pytest.mark.platforms(["a2a3"]) +@pytest.mark.runtime("tensormap_and_ringbuffer") +@pytest.mark.device_count(2) +def test_allreduce_distributed(st_device_ids): + rc = run([int(d) for d in st_device_ids]) + assert rc == 0 diff --git a/examples/workers/l3/multi_chip_dispatch/main.py b/examples/workers/l3/multi_chip_dispatch/main.py index abcde9668..fc812da80 100644 --- a/examples/workers/l3/multi_chip_dispatch/main.py +++ b/examples/workers/l3/multi_chip_dispatch/main.py @@ -107,9 +107,8 @@ def build_chip_callable(platform: str) -> ChipCallable: ) -def main() -> int: - cli = parse_args() - device_ids = parse_device_range(cli.device) +def run(platform: str, device_ids: list[int]) -> int: + """Core logic — callable from both CLI and pytest.""" print(f"[multi_chip_dispatch] devices={device_ids}") # --- 1. Allocate shared-memory tensors (visible to forked chip processes). @@ -125,7 +124,7 @@ def main() -> int: # --- 2. Worker(level=3, ...) construction. No fork / no ACL yet. worker = Worker( level=3, - platform=cli.platform, + platform=platform, runtime="tensormap_and_ringbuffer", device_ids=device_ids, num_sub_workers=1, @@ -144,8 +143,8 @@ def subworker(sub_args: TaskArgs) -> None: sub_cid = worker.register(subworker) # --- 4. Compile the ChipCallable once, reused on both chips. - print(f"[multi_chip_dispatch] compiling kernels for {cli.platform}...") - chip_callable = build_chip_callable(cli.platform) + print(f"[multi_chip_dispatch] compiling kernels for {platform}...") + chip_callable = build_chip_callable(platform) # --- 5. init() forks chip + sub child processes, starts C++ scheduler. print("[multi_chip_dispatch] init worker...") @@ -195,5 +194,10 @@ def orch_fn(orch, _args, cfg): return 0 +def main() -> int: + cli = parse_args() + return run(cli.platform, parse_device_range(cli.device)) + + if __name__ == "__main__": sys.exit(main()) diff --git a/examples/workers/l3/multi_chip_dispatch/test_multi_chip_dispatch.py b/examples/workers/l3/multi_chip_dispatch/test_multi_chip_dispatch.py new file mode 100644 index 000000000..8c883ef25 --- /dev/null +++ b/examples/workers/l3/multi_chip_dispatch/test_multi_chip_dispatch.py @@ -0,0 +1,25 @@ +# Copyright (c) PyPTO Contributors. +# This program is free software, you can redistribute it and/or modify it under the terms and conditions of +# CANN Open Software License Agreement Version 2.0 (the "License"). +# Please refer to the License for details. You may not use this file except in compliance with the License. +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, +# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. +# See LICENSE in the root of the software repository for the full text of the License. +# ----------------------------------------------------------------------------------------------------------- +"""Hardware ST for examples/workers/l3/multi_chip_dispatch.""" + +import os +from importlib.machinery import SourceFileLoader + +import pytest + +_main = SourceFileLoader("multi_chip_dispatch_main", os.path.join(os.path.dirname(__file__), "main.py")).load_module() +run = _main.run + + +@pytest.mark.platforms(["a2a3sim", "a2a3", "a5sim", "a5"]) +@pytest.mark.runtime("tensormap_and_ringbuffer") +@pytest.mark.device_count(2) +def test_multi_chip_dispatch(st_platform, st_device_ids): + rc = run(st_platform, [int(d) for d in st_device_ids]) + assert rc == 0 diff --git a/python/simpler/worker.py b/python/simpler/worker.py index 2d8a53dd4..9f2fa9cfa 100644 --- a/python/simpler/worker.py +++ b/python/simpler/worker.py @@ -321,7 +321,7 @@ def _chip_process_loop( break -def _chip_process_loop_with_bootstrap( +def _chip_process_loop_with_bootstrap( # noqa: PLR0912 buf: memoryview, host_lib_path: str, device_id: int, @@ -354,7 +354,7 @@ def _chip_process_loop_with_bootstrap( return try: - cw.bootstrap_context(device_id, bootstrap_cfg, channel=channel) + result = cw.bootstrap_context(device_id, bootstrap_cfg, channel=channel) except Exception: # noqa: BLE001 # bootstrap_context already wrote the error payload. Release the # comm handle (if any) best-effort and return; finalize() is safe to @@ -366,6 +366,15 @@ def _chip_process_loop_with_bootstrap( pass return + # Build store_to_host mapping: (device_ptr, HostBufferStaging) for each + # buffer with store_to_host=True. Processed after every task completion + # so the parent can read results from SharedMemory without a cross-fork + # host-pointer copy_from (which is broken across processes). + _store_to_host: list[tuple[int, object]] = [] + for spec, ptr in zip(bootstrap_cfg.buffers, result.buffer_ptrs): + if spec.store_to_host: + _store_to_host.append((ptr, bootstrap_cfg.output_staging(spec.name))) + mailbox_addr = ctypes.addressof(ctypes.c_char.from_buffer(buf)) state_addr = mailbox_addr + _OFF_STATE args_ptr = mailbox_addr + _OFF_ARGS @@ -390,6 +399,17 @@ def _chip_process_loop_with_bootstrap( msg = _format_exc(f"chip_process dev={device_id}", e) _write_error(buf, code, msg) _mailbox_store_i32(state_addr, _TASK_DONE) + + # Post-task: flush store_to_host buffers to SharedMemory. + for dev_ptr, staging in _store_to_host: + shm = SharedMemory(name=staging.shm_name) + try: + shm_buf = shm.buf + assert shm_buf is not None + host_ptr = ctypes.addressof(ctypes.c_char.from_buffer(shm_buf)) + cw._impl.copy_from(host_ptr, dev_ptr, staging.size) + finally: + shm.close() elif state == _CONTROL_REQUEST: sub_cmd = struct.unpack_from("Q", buf, _OFF_CALLABLE)[0] code = 0