diff --git a/python/simpler/task_interface.py b/python/simpler/task_interface.py index 0210be31a..7cbd2edc2 100644 --- a/python/simpler/task_interface.py +++ b/python/simpler/task_interface.py @@ -16,12 +16,20 @@ from task_interface import DataType, ContinuousTensor, ChipStorageTaskArgs, make_tensor_arg """ +import ctypes +from dataclasses import dataclass, field +from multiprocessing.shared_memory import SharedMemory +from typing import Optional + from _task_interface import ( # pyright: ignore[reportMissingImports] + CHIP_BOOTSTRAP_MAILBOX_SIZE, CONTINUOUS_TENSOR_MAX_DIMS, MAILBOX_ERROR_MSG_SIZE, MAILBOX_OFF_ERROR_MSG, MAILBOX_SIZE, ArgDirection, + ChipBootstrapChannel, + ChipBootstrapMailboxState, ChipCallable, ChipCallConfig, ChipStorageTaskArgs, @@ -70,6 +78,15 @@ "MAILBOX_OFF_ERROR_MSG", "MAILBOX_ERROR_MSG_SIZE", "read_args_from_blob", + # Chip bootstrap (L5) + "CHIP_BOOTSTRAP_MAILBOX_SIZE", + "ChipBootstrapChannel", + "ChipBootstrapMailboxState", + "ChipCommBootstrapConfig", + "ChipBufferSpec", + "HostBufferStaging", + "ChipBootstrapConfig", + "ChipBootstrapResult", ] @@ -143,6 +160,95 @@ def scalar_to_uint64(value) -> int: return int(value) & 0xFFFFFFFFFFFFFFFF +@dataclass +class ChipCommBootstrapConfig: + """Per-chip communicator bring-up knobs consumed by `ChipWorker.bootstrap_context`. + + A ``ChipBootstrapConfig`` with ``comm=None`` skips the communicator step + entirely; in that mode ``cfg.buffers`` must be empty because + ``placement="window"`` is the only supported placement in L5 and the + window only exists once a communicator has been brought up. Comm-less + configs are used by validation / error-path tests that need to trip + ``bootstrap_context`` before it reaches any communicator call. + """ + + rank: int + nranks: int + rootinfo_path: str + window_size: int + """Requested per-rank window size in bytes. HCCL may round this up — the + actual allocation is reported back via + ``ChipBootstrapResult.actual_window_size`` and must be what callers use + when slicing the window.""" + + +@dataclass +class ChipBufferSpec: + """A named slice of the per-rank communicator window. + + Buffers are placed sequentially inside the window in declaration order — + ``ChipBootstrapResult.buffer_ptrs`` is 1:1 aligned with the ``buffers`` + list so downstream code (L6's ``ChipContext``) can build a ``name → ptr`` + dict by zipping the two. + """ + + name: str + dtype: str + count: int + placement: str + nbytes: int + load_from_host: bool = False + store_to_host: bool = False + + +@dataclass +class HostBufferStaging: + """A POSIX shared-memory region staged by the parent for one named buffer. + + The parent creates the ``SharedMemory`` object and fills it with the input + bytes *before* forking; the child attaches read-only via + ``SharedMemory(name=shm_name)`` and does not unlink it. + """ + + name: str + shm_name: str + size: int + + +@dataclass +class ChipBootstrapConfig: + """Inputs to `ChipWorker.bootstrap_context` for one chip child.""" + + comm: Optional[ChipCommBootstrapConfig] = None + buffers: list[ChipBufferSpec] = field(default_factory=list) + host_inputs: list[HostBufferStaging] = field(default_factory=list) + host_outputs: list[HostBufferStaging] = field(default_factory=list) + + def input_staging(self, buffer_name: str) -> HostBufferStaging: + for s in self.host_inputs: + if s.name == buffer_name: + return s + raise KeyError(buffer_name) + + def output_staging(self, buffer_name: str) -> HostBufferStaging: + for s in self.host_outputs: + if s.name == buffer_name: + return s + raise KeyError(buffer_name) + + +@dataclass +class ChipBootstrapResult: + """Return value of `ChipWorker.bootstrap_context` — and the tuple the + `ChipBootstrapChannel` publishes to the parent on success. + """ + + device_ctx: int + local_window_base: int + actual_window_size: int + buffer_ptrs: list[int] + + class ChipWorker: """Unified execution interface wrapping the host runtime C API. @@ -267,6 +373,110 @@ def comm_destroy(self, comm_handle: int) -> None: """Destroy the communicator and release its resources.""" self._impl.comm_destroy(int(comm_handle)) + def bootstrap_context( + self, + device_id: int, + cfg: ChipBootstrapConfig, + channel: Optional[ChipBootstrapChannel] = None, + ) -> ChipBootstrapResult: + """One-shot per-chip bootstrap: set device, build communicator, slice window, + stage inputs from host shared memory, and (optionally) publish the result. + + Runs inside a forked chip child. If ``channel`` is provided (the L6 + integration path), the result is written as SUCCESS or — on any + exception — as ERROR (code=1, ``": "``) before + the exception is re-raised. Standalone callers can pass + ``channel=None`` and consume the return value directly. + + The HCCL comm handle produced by ``comm_init`` is stashed on + ``self._comm_handle`` so ``shutdown_bootstrap()`` can release it later; + ``finalize()`` is intentionally *not* wired to this handle — teardown + ordering is the caller's (L6's) responsibility. + """ + try: + self.set_device(device_id) + + device_ctx = 0 + local_base = 0 + actual_size = 0 + if cfg.comm is not None: + handle = self.comm_init(cfg.comm.rank, cfg.comm.nranks, cfg.comm.rootinfo_path) + if handle == 0: + raise RuntimeError(f"comm_init returned 0 handle (rank={cfg.comm.rank}, nranks={cfg.comm.nranks})") + self._comm_handle = handle + device_ctx = self.comm_alloc_windows(handle, cfg.comm.window_size) + if device_ctx == 0: + raise RuntimeError("comm_alloc_windows returned null device_ctx") + local_base = self.comm_get_local_window_base(handle) + actual_size = self.comm_get_window_size(handle) + + offset = 0 + buffer_ptrs: list[int] = [] + for spec in cfg.buffers: + if spec.placement != "window": + raise ValueError(f"ChipBufferSpec.placement={spec.placement!r}; only 'window' is supported") + if cfg.comm is None: + raise ValueError("ChipBufferSpec requires comm; cfg.comm is None") + if offset + spec.nbytes > actual_size: + raise ValueError( + f"buffer '{spec.name}' (nbytes={spec.nbytes}) at offset={offset} " + f"overflows window size {actual_size}" + ) + buffer_ptrs.append(local_base + offset) + offset += spec.nbytes + + for spec, ptr in zip(cfg.buffers, buffer_ptrs): + if not spec.load_from_host: + continue + staging = cfg.input_staging(spec.name) + if staging.size != spec.nbytes: + raise ValueError(f"host_inputs[{spec.name!r}].size={staging.size} != buffer.nbytes={spec.nbytes}") + if staging.size == 0: + continue + shm = SharedMemory(name=staging.shm_name) + try: + buf = shm.buf + assert buf is not None + host_ptr = ctypes.addressof(ctypes.c_char.from_buffer(buf)) + self.copy_to(ptr, host_ptr, staging.size) + finally: + shm.close() + + result = ChipBootstrapResult( + device_ctx=device_ctx, + local_window_base=local_base, + actual_window_size=actual_size, + buffer_ptrs=buffer_ptrs, + ) + if channel is not None: + channel.write_success( + result.device_ctx, + result.local_window_base, + result.actual_window_size, + result.buffer_ptrs, + ) + return result + except Exception as e: + if channel is not None: + channel.write_error(1, f"{type(e).__name__}: {e}") + raise + + def shutdown_bootstrap(self) -> None: + """Release the communicator handle stashed by ``bootstrap_context``. + + Idempotent — safe to call multiple times, and safe to call if + ``bootstrap_context`` was never invoked. ``finalize()`` does *not* + chain into this method, so L6 must call ``shutdown_bootstrap()`` + before ``finalize()`` (or after, if the comm handle was already + destroyed — the zero-handle guard makes a second call a no-op). + """ + handle = getattr(self, "_comm_handle", 0) + if handle != 0: + try: + self.comm_destroy(handle) + finally: + self._comm_handle = 0 + @property def device_id(self): return self._impl.device_id diff --git a/src/common/platform_comm/comm_sim.cpp b/src/common/platform_comm/comm_sim.cpp index 90ef8fc3c..be26259ab 100644 --- a/src/common/platform_comm/comm_sim.cpp +++ b/src/common/platform_comm/comm_sim.cpp @@ -38,6 +38,7 @@ #include #include +#include #include #include #include @@ -56,6 +57,16 @@ constexpr int FTRUNCATE_POLL_INTERVAL_US = 1000; constexpr int BARRIER_POLL_INTERVAL_US = 50; constexpr int DESTROY_POLL_INTERVAL_US = 1000; +// macOS's PSHMNAMLEN is 31 (name length excluding the null terminator). Linux +// accepts up to NAME_MAX (255), but we pick the tighter value so the same +// backend runs on both. The name layout below is fully constant-width so we +// can static_assert on it at compile time. +constexpr size_t SHM_NAME_MAX_LEN = 31; +constexpr size_t SHM_NAME_PREFIX_LEN = 9; // "/simpler_" +constexpr size_t SHM_NAME_HEX_FIELD = 8; // %08x: exactly 8 hex chars +constexpr size_t SHM_NAME_LEN = SHM_NAME_PREFIX_LEN + SHM_NAME_HEX_FIELD + 1 /*underscore*/ + SHM_NAME_HEX_FIELD; +static_assert(SHM_NAME_LEN <= SHM_NAME_MAX_LEN, "shm name exceeds macOS PSHMNAMLEN"); + struct SharedHeader { volatile int nranks; volatile int alloc_done; @@ -80,10 +91,28 @@ struct SharedHeader { // parent PID and therefore a fresh name. Cross-node / cross-parent launches // on sim are out of scope; callers relying on those topologies must use the // HCCL backend. +// +// Name layout is fixed-width `"/simpler_%08x_%08x"` = 26 bytes (plus NUL), well +// under macOS's PSHMNAMLEN=31. The width is constant-propagated into +// SHM_NAME_LEN above so a future format-string change gets caught by the +// static_assert at compile time rather than by an EFILENAMEMAXEXCEEDED at +// runtime on macOS. PID is truncated to its low 32 bits (pid_t is int32_t on +// every target we support) and the 64-bit rootinfo-path hash is xor-folded to +// 32 bits; both are still collision-resistant for the canonical +// "one driver spawns N ranks" launch pattern. std::string make_shm_name(const char *rootinfo_path) { size_t h = std::hash{}(rootinfo_path ? rootinfo_path : "default"); - char buf[96]; - std::snprintf(buf, sizeof(buf), "/simpler_comm_%d_%zx", static_cast(getppid()), h); + uint32_t h32 = static_cast(h ^ (h >> 32)); + char buf[SHM_NAME_LEN + 1]; + int written = std::snprintf(buf, sizeof(buf), "/simpler_%08x_%08x", static_cast(getppid()), h32); + // Defensive runtime check: snprintf returns -1 only on I/O / encoding + // errors, and the static_assert above already pins the upper bound of a + // successful write, so this is really an "impossible path" guard for the + // libc-misbehaving edge case. + if (written < 0 || static_cast(written) != SHM_NAME_LEN) { + std::fprintf(stderr, "[comm_sim] snprintf produced unexpected length %d for shm name\n", written); + return {}; + } return {buf}; } diff --git a/tests/ut/py/test_worker/test_bootstrap_context_hw.py b/tests/ut/py/test_worker/test_bootstrap_context_hw.py new file mode 100644 index 000000000..4c20a60c6 --- /dev/null +++ b/tests/ut/py/test_worker/test_bootstrap_context_hw.py @@ -0,0 +1,179 @@ +# Copyright (c) PyPTO Contributors. +# This program is free software, you can redistribute it and/or modify it under the terms and conditions of +# CANN Open Software License Agreement Version 2.0 (the "License"). +# Please refer to the License for details. You may not use this file except in compliance with the License. +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, +# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. +# See LICENSE in the root of the software repository for the full text of the License. +# ----------------------------------------------------------------------------------------------------------- +# ruff: noqa: PLC0415 +"""Hardware smoke test for ``ChipWorker.bootstrap_context`` (L5). + +Drives the L5 one-shot bring-up against the real ``tensormap_and_ringbuffer`` +runtime on 2 Ascend devices. The critical assertions are: + + 1. ``bootstrap_context`` returns a non-null ``device_ctx`` and + ``local_window_base`` (HCCL actually allocated GVA-visible windows). + 2. ``actual_window_size`` is at least the requested size. + 3. A single ``ChipBufferSpec`` slices the window so + ``buffer_ptrs[0] == local_window_base``. + +Deliberately **no** ``comm_barrier``. The paired L1b UT +(``test_platform_comm.py``) already shows the known HCCL 507018 path fails +after ~52 s on some CANN builds; ``bootstrap_context`` does not issue a +barrier, so this test completes on any build. Cross-rank synchronization +between the two ranks is already enforced inside +``HcclCommInitRootInfo`` / the L1a root-info handshake that ``comm_init`` +performs, so the non-barrier invariants above are enough to prove the L5 +bring-up crossed both ranks. +""" + +from __future__ import annotations + +import multiprocessing as mp +import os +import traceback + +import pytest + + +def _bootstrap_rank_entry( # noqa: PLR0913 + rank: int, + nranks: int, + device_id: int, + host_lib: str, + aicpu_path: str, + aicore_path: str, + sim_context_path: str, + rootinfo_path: str, + window_size: int, + buffer_nbytes: int, + result_queue: mp.Queue, # type: ignore[type-arg] +) -> None: + """Per-rank worker: drives bootstrap_context against HCCL and reports fields.""" + result: dict[str, object] = {"rank": rank, "stage": "start", "ok": False} + try: + from simpler.task_interface import ( + ChipBootstrapConfig, + ChipBufferSpec, + ChipCommBootstrapConfig, + ChipWorker, + ) + + worker = ChipWorker() + worker.init(host_lib, aicpu_path, aicore_path, sim_context_path) + result["stage"] = "init" + + cfg = ChipBootstrapConfig( + comm=ChipCommBootstrapConfig( + rank=rank, + nranks=nranks, + rootinfo_path=rootinfo_path, + window_size=window_size, + ), + buffers=[ + ChipBufferSpec( + name="x", + dtype="float32", + count=buffer_nbytes // 4, + placement="window", + nbytes=buffer_nbytes, + ) + ], + ) + + res = worker.bootstrap_context(device_id=device_id, cfg=cfg) + result["stage"] = "bootstrap" + result["device_ctx"] = int(res.device_ctx) + result["local_window_base"] = int(res.local_window_base) + result["actual_window_size"] = int(res.actual_window_size) + result["buffer_ptrs"] = list(res.buffer_ptrs) + + # Teardown mirrors the L6 ordering: shutdown_bootstrap (releases the + # HCCL comm handle) then finalize (releases ACL / unloads runtime). + worker.shutdown_bootstrap() + worker.finalize() + result["ok"] = True + except Exception: # noqa: BLE001 + result["error"] = traceback.format_exc() + finally: + result_queue.put(result) + + +@pytest.mark.requires_hardware +@pytest.mark.platforms(["a2a3"]) +@pytest.mark.device_count(2) +def test_two_rank_bootstrap_context(st_device_ids): + """End-to-end 2-rank hardware bootstrap_context smoke test. + + No barrier is issued — see the module docstring for why that dodges + HCCL 507018. The test still gates on every field ``bootstrap_context`` + is supposed to populate. + """ + from simpler_setup.runtime_builder import RuntimeBuilder + + build = bool(os.environ.get("PTO_UT_BUILD")) + bins = RuntimeBuilder(platform="a2a3").get_binaries("tensormap_and_ringbuffer", build=build) + host_lib = str(bins.host_path) + aicpu_path = str(bins.aicpu_path) + aicore_path = str(bins.aicore_path) + sim_context_path = str(bins.sim_context_path) if bins.sim_context_path else "" + + assert len(st_device_ids) >= 2, "device_count(2) fixture must yield >= 2 ids" + nranks = 2 + rootinfo_path = f"/tmp/pto_bootstrap_hw_rootinfo_{os.getpid()}.bin" + window_size = 4096 + buffer_nbytes = 64 + + ctx = mp.get_context("fork") + result_queue: mp.Queue = ctx.Queue() # type: ignore[type-arg] + procs = [] + for rank in range(nranks): + p = ctx.Process( + target=_bootstrap_rank_entry, + args=( + rank, + nranks, + int(st_device_ids[rank]), + host_lib, + aicpu_path, + aicore_path, + sim_context_path, + rootinfo_path, + window_size, + buffer_nbytes, + result_queue, + ), + daemon=False, + ) + p.start() + procs.append(p) + + results: dict[int, dict] = {} + for _ in range(nranks): + r = result_queue.get(timeout=180) + results[int(r["rank"])] = r + for p in procs: + p.join(timeout=60) + + try: + os.unlink(rootinfo_path) + except FileNotFoundError: + pass + + for rank in range(nranks): + r = results.get(rank) + if r is None: + pytest.fail(f"rank {rank} never reported a result") + if not r.get("ok"): + pytest.fail(f"rank {rank} failed at {r.get('stage')!r}:\n{r.get('error', '(no traceback)')}") + + assert r["device_ctx"] != 0, f"rank {rank}: device_ctx is 0" + assert r["local_window_base"] != 0, f"rank {rank}: local_window_base is 0" + assert r["actual_window_size"] >= window_size, ( + f"rank {rank}: actual_window_size={r['actual_window_size']} < requested {window_size}" + ) + # 1:1 buffer-to-spec invariant — the contract L6's ChipContext relies on. + assert r["buffer_ptrs"] == [r["local_window_base"]], ( + f"rank {rank}: buffer_ptrs={r['buffer_ptrs']} != [{r['local_window_base']}]" + ) diff --git a/tests/ut/py/test_worker/test_bootstrap_context_sim.py b/tests/ut/py/test_worker/test_bootstrap_context_sim.py new file mode 100644 index 000000000..c76377ffe --- /dev/null +++ b/tests/ut/py/test_worker/test_bootstrap_context_sim.py @@ -0,0 +1,487 @@ +# Copyright (c) PyPTO Contributors. +# This program is free software, you can redistribute it and/or modify it under the terms and conditions of +# CANN Open Software License Agreement Version 2.0 (the "License"). +# Please refer to the License for details. You may not use this file except in compliance with the License. +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, +# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. +# See LICENSE in the root of the software repository for the full text of the License. +# ----------------------------------------------------------------------------------------------------------- +# ruff: noqa: PLC0415 +"""Simulation-backend tests for ``ChipWorker.bootstrap_context`` (L5). + +These tests run without any Ascend NPU. They drive the sim backend of the +``tensormap_and_ringbuffer`` runtime, whose ``comm_*`` lifecycle is backed by +POSIX shared memory + atomic counters. The sim ``comm_alloc_windows`` has an +internal ready-count barrier: **all** ``nranks`` must call it before any +return. So anything that exercises the communicator path is written as a +2-process fork with a small mp.Queue used to report results back to the test +runner. + +The error-path case is deliberately single-process — it triggers a validation +error that raises *before* any communicator work, so no peer rank is needed. +""" + +from __future__ import annotations + +import ctypes +import multiprocessing as mp +import os +import struct +import traceback +from multiprocessing.shared_memory import SharedMemory + +import pytest + + +def _shm_addr(shm: SharedMemory) -> int: + """Return the raw address of a SharedMemory region (asserts buf is mapped).""" + buf = shm.buf + assert buf is not None + return ctypes.addressof(ctypes.c_char.from_buffer(buf)) + + +def _sim_binaries(): + """Resolve pre-built a2a3sim runtime binaries, or skip if unavailable. + + Respects ``PTO_UT_BUILD=1`` for local runs where the binaries have not + been compiled yet — matches the pattern in ``test_platform_comm.py``. + """ + from simpler_setup.runtime_builder import RuntimeBuilder + + build = bool(os.environ.get("PTO_UT_BUILD")) + try: + bins = RuntimeBuilder(platform="a2a3sim").get_binaries("tensormap_and_ringbuffer", build=build) + except FileNotFoundError as e: + pytest.skip(f"a2a3sim runtime binaries unavailable: {e}") + return bins + + +def _rank_entry( # noqa: PLR0913 + rank: int, + nranks: int, + rootinfo_path: str, + window_size: int, + host_lib: str, + aicpu_path: str, + aicore_path: str, + sim_context_path: str, + buffer_specs: list[dict], + host_input_specs: list[dict], + channel_shm_name: str | None, + result_queue: mp.Queue, # type: ignore[type-arg] + readback_nbytes: int, +) -> None: + """Forked-rank body: init ChipWorker, run bootstrap_context, report fields. + + ``buffer_specs`` / ``host_input_specs`` are plain dicts (picklable) that + the child converts into the real dataclasses after import. The test + orchestrates everything through the result queue so a crashed child + surfaces as a missing result (timeout) rather than a silent hang. + """ + result: dict[str, object] = {"rank": rank, "stage": "start", "ok": False} + try: + from simpler.task_interface import ( + ChipBootstrapChannel, + ChipBootstrapConfig, + ChipBufferSpec, + ChipCommBootstrapConfig, + ChipWorker, + HostBufferStaging, + ) + + worker = ChipWorker() + worker.init(host_lib, aicpu_path, aicore_path, sim_context_path) + result["stage"] = "init" + + cfg = ChipBootstrapConfig( + comm=ChipCommBootstrapConfig( + rank=rank, + nranks=nranks, + rootinfo_path=rootinfo_path, + window_size=window_size, + ), + buffers=[ChipBufferSpec(**s) for s in buffer_specs], + host_inputs=[HostBufferStaging(**s) for s in host_input_specs], + ) + + channel: ChipBootstrapChannel | None = None + shm_attach: SharedMemory | None = None + if channel_shm_name is not None: + shm_attach = SharedMemory(name=channel_shm_name) + channel = ChipBootstrapChannel(_shm_addr(shm_attach), max_buffer_count=376) + + try: + res = worker.bootstrap_context(device_id=rank, cfg=cfg, channel=channel) + result["stage"] = "bootstrap" + result["device_ctx"] = int(res.device_ctx) + result["local_window_base"] = int(res.local_window_base) + result["actual_window_size"] = int(res.actual_window_size) + result["buffer_ptrs"] = list(res.buffer_ptrs) + + # Read back the first buffer if the test asked for it. Uses the + # worker's device-to-host DMA so the test can assert on what + # ``load_from_host`` actually wrote at ``buffer_ptrs[0]``. + if readback_nbytes > 0 and res.buffer_ptrs: + host_buf = (ctypes.c_char * readback_nbytes)() + worker.copy_from(ctypes.addressof(host_buf), res.buffer_ptrs[0], readback_nbytes) + result["readback"] = bytes(host_buf) + + # shutdown_bootstrap + finalize — matches the L6 teardown order + # and leaves the sim shm segment clean for the next test. + worker.shutdown_bootstrap() + worker.finalize() + result["ok"] = True + finally: + if shm_attach is not None: + shm_attach.close() + except Exception: # noqa: BLE001 + result["error"] = traceback.format_exc() + finally: + result_queue.put(result) + + +def _run_two_rank( + *, + window_size: int, + buffer_specs: list[dict], + host_inputs_for_rank: dict[int, tuple[list[dict], int]], + rootinfo_suffix: str, + channel_shm_names: dict[int, str] | None = None, +) -> dict[int, dict]: + """Orchestrate a 2-rank fork test. + + ``host_inputs_for_rank[r]`` is a ``(staging_specs, readback_nbytes)`` pair + so each rank can advertise its own inputs + ask for a device-to-host + round-trip check. + """ + bins = _sim_binaries() + host_lib = str(bins.host_path) + aicpu_path = str(bins.aicpu_path) + aicore_path = str(bins.aicore_path) + sim_context_path = str(bins.sim_context_path) if bins.sim_context_path else "" + + nranks = 2 + rootinfo_path = f"/tmp/pto_bootstrap_sim_{os.getpid()}_{rootinfo_suffix}.bin" + + ctx = mp.get_context("fork") + result_queue: mp.Queue = ctx.Queue() # type: ignore[type-arg] + procs = [] + for rank in range(nranks): + staging, readback = host_inputs_for_rank.get(rank, ([], 0)) + channel_name = None if channel_shm_names is None else channel_shm_names.get(rank) + p = ctx.Process( + target=_rank_entry, + args=( + rank, + nranks, + rootinfo_path, + window_size, + host_lib, + aicpu_path, + aicore_path, + sim_context_path, + buffer_specs, + staging, + channel_name, + result_queue, + readback, + ), + daemon=False, + ) + p.start() + procs.append(p) + + results: dict[int, dict] = {} + for _ in range(nranks): + r = result_queue.get(timeout=180) + results[int(r["rank"])] = r + for p in procs: + p.join(timeout=60) + + try: + os.unlink(rootinfo_path) + except FileNotFoundError: + pass + + return results + + +# --------------------------------------------------------------------------- +# 1. Happy path — bootstrap returns a populated result and window is carved. +# --------------------------------------------------------------------------- + + +class TestBootstrapContextHappyPath: + def test_two_rank_no_host_inputs(self): + buffer_specs = [ + {"name": "x", "dtype": "float32", "count": 16, "placement": "window", "nbytes": 64}, + ] + results = _run_two_rank( + window_size=4096, + buffer_specs=buffer_specs, + host_inputs_for_rank={}, + rootinfo_suffix="happy", + ) + for rank in (0, 1): + r = results.get(rank) + assert r is not None and r.get("ok"), f"rank {rank} failed: {r and r.get('error')}" + assert r["local_window_base"] != 0, f"rank {rank} local_window_base is 0" + assert r["actual_window_size"] >= 4096 + # Single buffer at window base — the 1:1 contract L6 relies on. + assert r["buffer_ptrs"] == [r["local_window_base"]] + + +# --------------------------------------------------------------------------- +# 2. load_from_host — staged bytes end up at buffer_ptrs[0]. +# --------------------------------------------------------------------------- + + +class TestBootstrapContextHostStaging: + def test_load_from_host_round_trip(self): + nbytes = 64 + payload = bytes(range(nbytes)) + + shm = SharedMemory(create=True, size=nbytes) + try: + buf = shm.buf + assert buf is not None + buf[:nbytes] = payload + + buffer_specs = [ + { + "name": "x", + "dtype": "float32", + "count": 16, + "placement": "window", + "nbytes": nbytes, + "load_from_host": True, + }, + ] + # Only rank 0 consumes a host input; rank 1 still needs a buffer of + # matching size so the two ranks carve identical windows. Rank 1 + # is not asked to read back, which keeps the test focused on the + # H2D staging path. + host_inputs_by_rank = { + 0: ([{"name": "x", "shm_name": shm.name, "size": nbytes}], nbytes), + } + buffer_specs_r1 = [ + { + "name": "x", + "dtype": "float32", + "count": 16, + "placement": "window", + "nbytes": nbytes, + "load_from_host": False, + }, + ] + + bins = _sim_binaries() + host_lib = str(bins.host_path) + aicpu_path = str(bins.aicpu_path) + aicore_path = str(bins.aicore_path) + sim_context_path = str(bins.sim_context_path) if bins.sim_context_path else "" + + rootinfo_path = f"/tmp/pto_bootstrap_sim_{os.getpid()}_staging.bin" + ctx = mp.get_context("fork") + result_queue: mp.Queue = ctx.Queue() # type: ignore[type-arg] + procs = [] + for rank, specs in ((0, buffer_specs), (1, buffer_specs_r1)): + staging, readback = host_inputs_by_rank.get(rank, ([], 0)) + p = ctx.Process( + target=_rank_entry, + args=( + rank, + 2, + rootinfo_path, + 4096, + host_lib, + aicpu_path, + aicore_path, + sim_context_path, + specs, + staging, + None, + result_queue, + readback, + ), + daemon=False, + ) + p.start() + procs.append(p) + + results: dict[int, dict] = {} + for _ in range(2): + r = result_queue.get(timeout=180) + results[int(r["rank"])] = r + for p in procs: + p.join(timeout=60) + try: + os.unlink(rootinfo_path) + except FileNotFoundError: + pass + finally: + shm.close() + shm.unlink() + + assert results[0].get("ok"), f"rank 0 failed: {results[0].get('error')}" + assert results[1].get("ok"), f"rank 1 failed: {results[1].get('error')}" + assert results[0].get("readback") == payload, "round-trip payload mismatch" + + +# --------------------------------------------------------------------------- +# 3. Channel integration — parent reads SUCCESS fields from the mailbox. +# --------------------------------------------------------------------------- + + +class TestBootstrapContextChannel: + def test_channel_publishes_success_fields(self): + from _task_interface import ( # pyright: ignore[reportMissingImports] + CHIP_BOOTSTRAP_MAILBOX_SIZE, + ChipBootstrapChannel, + ChipBootstrapMailboxState, + ) + + # One mailbox per rank — the parent owns both, forwards the shm name + # to each child so the child can attach and publish its result. + channels_shm = {rank: SharedMemory(create=True, size=CHIP_BOOTSTRAP_MAILBOX_SIZE) for rank in range(2)} + try: + buffer_specs = [ + {"name": "x", "dtype": "float32", "count": 16, "placement": "window", "nbytes": 64}, + ] + channel_shm_names = {rank: shm.name for rank, shm in channels_shm.items()} + results = _run_two_rank( + window_size=4096, + buffer_specs=buffer_specs, + host_inputs_for_rank={}, + rootinfo_suffix="channel", + channel_shm_names=channel_shm_names, + ) + + for rank in (0, 1): + r = results[rank] + assert r.get("ok"), f"rank {rank} failed: {r.get('error')}" + + channel = ChipBootstrapChannel(_shm_addr(channels_shm[rank]), max_buffer_count=376) + assert channel.state == ChipBootstrapMailboxState.SUCCESS + assert channel.device_ctx == r["device_ctx"] + assert channel.local_window_base == r["local_window_base"] + assert channel.actual_window_size == r["actual_window_size"] + assert channel.buffer_ptrs == r["buffer_ptrs"] + finally: + for shm in channels_shm.values(): + shm.close() + shm.unlink() + + +# --------------------------------------------------------------------------- +# 4. Error path — invalid placement raises ValueError and writes ERROR. +# --------------------------------------------------------------------------- + + +def _error_rank_entry( + host_lib: str, + aicpu_path: str, + aicore_path: str, + sim_context_path: str, + channel_shm_name: str, + result_queue: mp.Queue, # type: ignore[type-arg] +) -> None: + result: dict[str, object] = {"raised": False, "state": None, "message": None} + try: + from simpler.task_interface import ( + ChipBootstrapChannel, + ChipBootstrapConfig, + ChipBufferSpec, + ChipWorker, + ) + + worker = ChipWorker() + worker.init(host_lib, aicpu_path, aicore_path, sim_context_path) + + shm = SharedMemory(name=channel_shm_name) + try: + channel = ChipBootstrapChannel(_shm_addr(shm), max_buffer_count=376) + + # placement="bogus" + comm=None → ValueError on the placement + # check, before any communicator work runs. Single-process is + # fine because we never reach comm_alloc_windows. + cfg = ChipBootstrapConfig( + comm=None, + buffers=[ + ChipBufferSpec( + name="x", + dtype="float32", + count=1, + placement="bogus", + nbytes=4, + ) + ], + ) + try: + worker.bootstrap_context(device_id=0, cfg=cfg, channel=channel) + except ValueError as e: + result["raised"] = True + result["exc_msg"] = str(e) + + # Read back the channel state from the child's side too — the + # parent will also read it, but this catches "did the except-block + # actually run" bugs before we cross the process boundary. + result["state"] = int(channel.state) + result["message"] = channel.error_message + finally: + shm.close() + worker.shutdown_bootstrap() + worker.finalize() + except Exception: # noqa: BLE001 + result["error"] = traceback.format_exc() + finally: + result_queue.put(result) + + +class TestBootstrapContextError: + def test_invalid_placement_publishes_error(self): + from _task_interface import ( # pyright: ignore[reportMissingImports] + CHIP_BOOTSTRAP_MAILBOX_SIZE, + ChipBootstrapChannel, + ChipBootstrapMailboxState, + ) + + bins = _sim_binaries() + host_lib = str(bins.host_path) + aicpu_path = str(bins.aicpu_path) + aicore_path = str(bins.aicore_path) + sim_context_path = str(bins.sim_context_path) if bins.sim_context_path else "" + + shm = SharedMemory(create=True, size=CHIP_BOOTSTRAP_MAILBOX_SIZE) + # Zero-init the mailbox so state reads IDLE before the child writes. + # SharedMemory does not zero the region on attach in all libc variants + # — struct.pack_into is explicit and cheap. + buf = shm.buf + assert buf is not None + for off in range(0, CHIP_BOOTSTRAP_MAILBOX_SIZE, 8): + struct.pack_into("Q", buf, off, 0) + try: + ctx = mp.get_context("fork") + result_queue: mp.Queue = ctx.Queue() # type: ignore[type-arg] + p = ctx.Process( + target=_error_rank_entry, + args=(host_lib, aicpu_path, aicore_path, sim_context_path, shm.name, result_queue), + daemon=False, + ) + p.start() + r = result_queue.get(timeout=60) + p.join(timeout=30) + + assert r.get("raised"), f"expected ValueError; got {r}" + assert "bogus" in str(r.get("exc_msg", "")), f"exc_msg missing 'bogus': {r.get('exc_msg')}" + + # Parent-side channel read — verifies the mailbox ERROR state + # survived the fork and is visible in a fresh ChipBootstrapChannel. + channel = ChipBootstrapChannel(_shm_addr(shm), max_buffer_count=376) + assert channel.state == ChipBootstrapMailboxState.ERROR + assert channel.error_code == 1 + assert "bogus" in channel.error_message + assert channel.error_message.startswith("ValueError: ") + finally: + shm.close() + shm.unlink()