diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 20fd988ba..7ab002325 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -289,7 +289,7 @@ jobs: source .venv/bin/activate source ${ASCEND_HOME_PATH}/bin/setenv.bash set -e - python -m pytest tests -m requires_hardware --platform a2a3 -v + python -m pytest tests -m requires_hardware --platform a2a3 --device ${DEVICE_RANGE} -v - name: Build and run C++ hardware unit tests run: | diff --git a/python/bindings/task_interface.cpp b/python/bindings/task_interface.cpp index a51c52f5b..2da0afc03 100644 --- a/python/bindings/task_interface.cpp +++ b/python/bindings/task_interface.cpp @@ -618,7 +618,30 @@ NB_MODULE(_task_interface, m) { .def("malloc", &ChipWorker::malloc, nb::arg("size")) .def("free", &ChipWorker::free, nb::arg("ptr")) .def("copy_to", &ChipWorker::copy_to, nb::arg("dst"), nb::arg("src"), nb::arg("size")) - .def("copy_from", &ChipWorker::copy_from, nb::arg("dst"), nb::arg("src"), nb::arg("size")); + .def("copy_from", &ChipWorker::copy_from, nb::arg("dst"), nb::arg("src"), nb::arg("size")) + .def( + "comm_init", &ChipWorker::comm_init, nb::arg("rank"), nb::arg("nranks"), nb::arg("rootinfo_path"), + "Initialize a communicator for this rank. ChipWorker owns ACL + stream " + "lifetime internally (onboard drives ensure_acl_ready + aclrtCreateStream; " + "sim ignores both). Pair with comm_destroy for cleanup." + ) + .def( + "comm_alloc_windows", &ChipWorker::comm_alloc_windows, nb::arg("comm_handle"), nb::arg("win_size"), + "Allocate per-rank windows and return the device CommContext pointer." + ) + .def( + "comm_get_local_window_base", &ChipWorker::comm_get_local_window_base, nb::arg("comm_handle"), + "Return this rank's local window base address." + ) + .def( + "comm_get_window_size", &ChipWorker::comm_get_window_size, nb::arg("comm_handle"), + "Return the actual per-rank window size (may differ from the hint)." + ) + .def("comm_barrier", &ChipWorker::comm_barrier, nb::arg("comm_handle"), "Synchronize all ranks.") + .def( + "comm_destroy", &ChipWorker::comm_destroy, nb::arg("comm_handle"), + "Destroy the communicator and release its resources." + ); // --- Standalone blob helpers --- m.def( diff --git a/python/simpler/task_interface.py b/python/simpler/task_interface.py index b3e1fb065..0c3503f6a 100644 --- a/python/simpler/task_interface.py +++ b/python/simpler/task_interface.py @@ -225,6 +225,44 @@ def copy_from(self, dst, src, size): """Copy *size* bytes from worker *src* to host *dst*.""" self._impl.copy_from(int(dst), int(src), int(size)) + def comm_init(self, rank: int, nranks: int, rootinfo_path: str) -> int: + """Initialize a distributed communicator for this rank. + + ChipWorker owns ACL bring-up and the aclrtStream internally, so + callers never touch ``aclInit`` / ``aclrtSetDevice`` / stream + lifetimes. On sim, ACL / stream are not used. Pair with + ``comm_destroy`` for teardown. + + Args: + rank: This process's rank (0-based). + nranks: Total number of ranks. + rootinfo_path: Filesystem path used for rank handshake. + + Returns: + Opaque communicator handle (uint64) for the other ``comm_*`` calls. + """ + return int(self._impl.comm_init(int(rank), int(nranks), str(rootinfo_path))) + + def comm_alloc_windows(self, comm_handle: int, win_size: int) -> int: + """Allocate per-rank windows. Returns a device CommContext pointer (uint64).""" + return int(self._impl.comm_alloc_windows(int(comm_handle), int(win_size))) + + def comm_get_local_window_base(self, comm_handle: int) -> int: + """Return this rank's local window base address (uint64).""" + return int(self._impl.comm_get_local_window_base(int(comm_handle))) + + def comm_get_window_size(self, comm_handle: int) -> int: + """Return the actual per-rank window size in bytes.""" + return int(self._impl.comm_get_window_size(int(comm_handle))) + + def comm_barrier(self, comm_handle: int) -> None: + """Synchronize all ranks.""" + self._impl.comm_barrier(int(comm_handle)) + + def comm_destroy(self, comm_handle: int) -> None: + """Destroy the communicator and release its resources.""" + self._impl.comm_destroy(int(comm_handle)) + @property def device_id(self): return self._impl.device_id diff --git a/src/a2a3/platform/include/host/memory_allocator.h b/src/a2a3/platform/include/host/memory_allocator.h index d3c181f63..217fc37f2 100644 --- a/src/a2a3/platform/include/host/memory_allocator.h +++ b/src/a2a3/platform/include/host/memory_allocator.h @@ -97,7 +97,7 @@ class MemoryAllocator { * @return Number of currently tracked pointers */ size_t get_allocation_count() const { - std::lock_guard lk(mu_); + std::scoped_lock lk(mu_); return ptr_set_.size(); } diff --git a/src/a2a3/platform/onboard/host/CMakeLists.txt b/src/a2a3/platform/onboard/host/CMakeLists.txt index f259cfa22..9cf0f3d35 100644 --- a/src/a2a3/platform/onboard/host/CMakeLists.txt +++ b/src/a2a3/platform/onboard/host/CMakeLists.txt @@ -77,6 +77,10 @@ target_include_directories(host_runtime PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/../../include + # Shared platform_comm headers (comm.h / comm_context.h) live in + # src/common so a2a3 (HCCL) and a5/a2a3 sim (POSIX-shm) can use the + # same contract. + ${CMAKE_CURRENT_SOURCE_DIR}/../../../../common ${CMAKE_CUSTOM_INCLUDE_DIRS} PRIVATE ${ASCEND_HOME_PATH}/include diff --git a/src/a2a3/platform/onboard/host/comm_hccl.cpp b/src/a2a3/platform/onboard/host/comm_hccl.cpp index 56019b3ef..60382c5ff 100644 --- a/src/a2a3/platform/onboard/host/comm_hccl.cpp +++ b/src/a2a3/platform/onboard/host/comm_hccl.cpp @@ -16,8 +16,8 @@ * when extracting per-rank RDMA window addresses. */ -#include "host/comm.h" -#include "common/comm_context.h" +#include "platform_comm/comm.h" +#include "platform_comm/comm_context.h" #include #include diff --git a/src/a2a3/platform/onboard/host/device_runner.cpp b/src/a2a3/platform/onboard/host/device_runner.cpp index 8996d5e1f..ed348d818 100644 --- a/src/a2a3/platform/onboard/host/device_runner.cpp +++ b/src/a2a3/platform/onboard/host/device_runner.cpp @@ -299,6 +299,38 @@ int DeviceRunner::ensure_acl_ready(int device_id) { return 0; } +void *DeviceRunner::create_comm_stream() { + aclrtStream stream = nullptr; + aclError aRet = aclrtCreateStream(&stream); + if (aRet != ACL_SUCCESS) { + LOG_ERROR("aclrtCreateStream failed: %d", static_cast(aRet)); + return nullptr; + } + return stream; +} + +int DeviceRunner::destroy_comm_stream(void *stream) { + if (stream == nullptr) return 0; + + // Best-effort teardown. HcclBarrier submits async work on the stream; + // if the caller never blocked for completion (or hit the L1a 507018 + // barrier regression), aclrtDestroyStream will refuse with 507901 + // ("stream still has pending tasks"). We try to drain first, then + // destroy anyway, and log failures without propagating them — leaking + // a stream at teardown is strictly better than failing the teardown + // itself, which would block device finalization. This matches the + // cleanup behavior of the L1a C++ hardware UT. + aclError sync_rc = aclrtSynchronizeStream(static_cast(stream)); + if (sync_rc != ACL_SUCCESS) { + LOG_ERROR("aclrtSynchronizeStream during stream teardown failed: %d", static_cast(sync_rc)); + } + aclError destroy_rc = aclrtDestroyStream(static_cast(stream)); + if (destroy_rc != ACL_SUCCESS) { + LOG_ERROR("aclrtDestroyStream failed (leaking stream): %d", static_cast(destroy_rc)); + } + return 0; +} + int DeviceRunner::prepare_run_context(int device_id) { int rc = attach_current_thread(device_id); if (rc != 0) { diff --git a/src/a2a3/platform/onboard/host/device_runner.h b/src/a2a3/platform/onboard/host/device_runner.h index 1760964b5..f91cecdd2 100644 --- a/src/a2a3/platform/onboard/host/device_runner.h +++ b/src/a2a3/platform/onboard/host/device_runner.h @@ -379,6 +379,26 @@ class DeviceRunner { */ int ensure_acl_ready(int device_id); + /** + * Create a caller-owned aclrtStream for comm_* usage. + * + * Intended to back the ChipWorker Python wrapper's internal stream + * ownership for distributed comm — callers pair it with + * destroy_comm_stream() at teardown. The ACL context must already be + * ready on the calling thread (ensure_acl_ready()). + * + * @return aclrtStream pointer on success, NULL on failure. + */ + void *create_comm_stream(); + + /** + * Destroy a stream previously returned by create_comm_stream(). + * Tolerates a nullptr stream (returns 0). + * + * @return 0 on success, error code on failure. + */ + int destroy_comm_stream(void *stream); + /** * Ensure the current thread has fresh run-scoped streams. * diff --git a/src/a2a3/platform/onboard/host/pto_runtime_c_api.cpp b/src/a2a3/platform/onboard/host/pto_runtime_c_api.cpp index ff961fa65..f1b9d32c5 100644 --- a/src/a2a3/platform/onboard/host/pto_runtime_c_api.cpp +++ b/src/a2a3/platform/onboard/host/pto_runtime_c_api.cpp @@ -128,6 +128,30 @@ int ensure_acl_ready_ctx(DeviceContextHandle ctx, int device_id) { } } +/* + * Stream creation/destruction exposed so the ChipWorker Python wrapper can + * drive comm_init end-to-end without leaking aclrtStream lifetime (or ACL + * libs) into Python. Both entries go through the DeviceRunner so the ACL + * ready-flag and device bookkeeping stay consistent with the normal run path. + */ +void *create_comm_stream_ctx(DeviceContextHandle ctx) { + if (ctx == NULL) return NULL; + try { + return static_cast(ctx)->create_comm_stream(); + } catch (...) { + return NULL; + } +} + +int destroy_comm_stream_ctx(DeviceContextHandle ctx, void *stream) { + if (ctx == NULL) return -1; + try { + return static_cast(ctx)->destroy_comm_stream(stream); + } catch (...) { + return -1; + } +} + void *device_malloc_ctx(DeviceContextHandle ctx, size_t size) { if (ctx == NULL) return NULL; try { diff --git a/src/a2a3/platform/sim/host/CMakeLists.txt b/src/a2a3/platform/sim/host/CMakeLists.txt index b28b6b57a..417d68881 100644 --- a/src/a2a3/platform/sim/host/CMakeLists.txt +++ b/src/a2a3/platform/sim/host/CMakeLists.txt @@ -45,6 +45,7 @@ list(APPEND HOST_RUNTIME_SOURCES "${CMAKE_CURRENT_SOURCE_DIR}/../../src/host/performance_collector.cpp" "${CMAKE_CURRENT_SOURCE_DIR}/../../src/host/tensor_dump_collector.cpp" "${CMAKE_CURRENT_SOURCE_DIR}/../aicpu/platform_aicpu_affinity.cpp" + "${CMAKE_CURRENT_SOURCE_DIR}/../../../../common/platform_comm/comm_sim.cpp" ) if(DEFINED CUSTOM_SOURCE_DIRS) @@ -81,6 +82,9 @@ target_include_directories(host_runtime PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/../../include + # Shared platform_comm headers so comm_sim.cpp (in src/common) resolves + # its #include "platform_comm/comm.h" / "platform_comm/comm_context.h". + ${CMAKE_CURRENT_SOURCE_DIR}/../../../../common ${CMAKE_CURRENT_SOURCE_DIR}/../../../../common/sim_context ${CMAKE_CUSTOM_INCLUDE_DIRS} ) @@ -92,6 +96,11 @@ target_link_libraries(host_runtime dl ) +# POSIX shm_open / shm_unlink live in libSystem on macOS but require -lrt on Linux. +if(UNIX AND NOT APPLE) + target_link_libraries(host_runtime PRIVATE rt) +endif() + # Allow undefined symbols from libcpu_sim_context.so (loaded with RTLD_GLOBAL at runtime). # On macOS, the linker requires -undefined dynamic_lookup; on Linux/gcc this is the default. if(APPLE) diff --git a/src/a2a3/platform/sim/host/pto_runtime_c_api.cpp b/src/a2a3/platform/sim/host/pto_runtime_c_api.cpp index bed390db2..3936fa0c2 100644 --- a/src/a2a3/platform/sim/host/pto_runtime_c_api.cpp +++ b/src/a2a3/platform/sim/host/pto_runtime_c_api.cpp @@ -234,6 +234,30 @@ int finalize_device(DeviceContextHandle ctx) { } } +/* =========================================================================== + * ACL lifecycle stubs. Sim has no ACL / aclrtStream concept, so these + * no-op to satisfy the uniform host_runtime.so ABI (ChipWorker dlsym's the + * full extension surface unconditionally). The paired comm_init / barrier / + * destroy entry points already live in comm_sim.cpp. + * =========================================================================== */ + +int ensure_acl_ready_ctx(DeviceContextHandle ctx, int device_id) { + (void)ctx; + (void)device_id; + return 0; +} + +void *create_comm_stream_ctx(DeviceContextHandle ctx) { + (void)ctx; + return NULL; +} + +int destroy_comm_stream_ctx(DeviceContextHandle ctx, void *stream) { + (void)ctx; + (void)stream; + return 0; +} + /* =========================================================================== * Internal helpers called from runtime_maker.cpp via Runtime.host_api * =========================================================================== */ diff --git a/src/a5/platform/include/host/memory_allocator.h b/src/a5/platform/include/host/memory_allocator.h index a8ae268f8..b9c4307a1 100644 --- a/src/a5/platform/include/host/memory_allocator.h +++ b/src/a5/platform/include/host/memory_allocator.h @@ -97,7 +97,7 @@ class MemoryAllocator { * @return Number of currently tracked pointers */ size_t get_allocation_count() const { - std::lock_guard lk(mu_); + std::scoped_lock lk(mu_); return ptr_set_.size(); } diff --git a/src/a5/platform/onboard/host/pto_runtime_c_api.cpp b/src/a5/platform/onboard/host/pto_runtime_c_api.cpp index e2949edd4..8034142da 100644 --- a/src/a5/platform/onboard/host/pto_runtime_c_api.cpp +++ b/src/a5/platform/onboard/host/pto_runtime_c_api.cpp @@ -226,6 +226,71 @@ int finalize_device(DeviceContextHandle ctx) { } } +/* =========================================================================== + * ACL + comm_* placeholders (distributed runtime not yet implemented on a5) + * + * These exist only to satisfy ChipWorker's unconditional dlsym of the extension + * surface — the contract is "every host_runtime.so exports the full set; a + * runtime without a real implementation returns a not-supported result at + * call time" rather than having ChipWorker probe each symbol individually. + * When a5 grows real HCCL / sim distributed support these stubs get replaced + * wholesale; no ChipWorker changes are needed. + * =========================================================================== */ + +int ensure_acl_ready_ctx(DeviceContextHandle ctx, int device_id) { + (void)ctx; + (void)device_id; + return 0; +} + +void *create_comm_stream_ctx(DeviceContextHandle ctx) { + (void)ctx; + return NULL; +} + +int destroy_comm_stream_ctx(DeviceContextHandle ctx, void *stream) { + (void)ctx; + (void)stream; + return 0; +} + +void *comm_init(int rank, int nranks, void *stream, const char *rootinfo_path) { + (void)rank; + (void)nranks; + (void)stream; + (void)rootinfo_path; + return NULL; // distributed runtime not yet supported on a5 +} + +int comm_alloc_windows(void *handle, size_t win_size, uint64_t *device_ctx_out) { + (void)handle; + (void)win_size; + (void)device_ctx_out; + return -1; +} + +int comm_get_local_window_base(void *handle, uint64_t *base_out) { + (void)handle; + (void)base_out; + return -1; +} + +int comm_get_window_size(void *handle, size_t *size_out) { + (void)handle; + (void)size_out; + return -1; +} + +int comm_barrier(void *handle) { + (void)handle; + return -1; +} + +int comm_destroy(void *handle) { + (void)handle; + return -1; +} + /* =========================================================================== * Internal helpers called from runtime_maker.cpp via Runtime.host_api * =========================================================================== */ diff --git a/src/a5/platform/sim/host/CMakeLists.txt b/src/a5/platform/sim/host/CMakeLists.txt index 73b0029bd..42584e8da 100644 --- a/src/a5/platform/sim/host/CMakeLists.txt +++ b/src/a5/platform/sim/host/CMakeLists.txt @@ -45,6 +45,8 @@ list(APPEND HOST_RUNTIME_SOURCES "${CMAKE_CURRENT_SOURCE_DIR}/../../src/host/performance_collector.cpp" "${CMAKE_CURRENT_SOURCE_DIR}/../../src/host/tensor_dump_collector.cpp" "${CMAKE_CURRENT_SOURCE_DIR}/../aicpu/platform_aicpu_affinity.cpp" + # Shared POSIX-shm sim comm backend (same source as a2a3 sim). + "${CMAKE_CURRENT_SOURCE_DIR}/../../../../common/platform_comm/comm_sim.cpp" ) if(DEFINED CUSTOM_SOURCE_DIRS) @@ -81,6 +83,9 @@ target_include_directories(host_runtime PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/../../include + # Shared platform_comm headers so the common comm_sim.cpp resolves + # its #include "platform_comm/comm.h" / "platform_comm/comm_context.h". + ${CMAKE_CURRENT_SOURCE_DIR}/../../../../common ${CMAKE_CURRENT_SOURCE_DIR}/../../../../common/sim_context ${CMAKE_CUSTOM_INCLUDE_DIRS} ) @@ -92,6 +97,11 @@ target_link_libraries(host_runtime dl ) +# POSIX shm_open / shm_unlink live in libSystem on macOS but require -lrt on Linux. +if(UNIX AND NOT APPLE) + target_link_libraries(host_runtime PRIVATE rt) +endif() + # Allow undefined symbols from libcpu_sim_context.so (loaded with RTLD_GLOBAL at runtime). # On macOS, the linker requires -undefined dynamic_lookup; on Linux/gcc this is the default. if(APPLE) diff --git a/src/a5/platform/sim/host/pto_runtime_c_api.cpp b/src/a5/platform/sim/host/pto_runtime_c_api.cpp index bed390db2..b86c90dd3 100644 --- a/src/a5/platform/sim/host/pto_runtime_c_api.cpp +++ b/src/a5/platform/sim/host/pto_runtime_c_api.cpp @@ -234,6 +234,30 @@ int finalize_device(DeviceContextHandle ctx) { } } +/* =========================================================================== + * ACL lifecycle stubs. Sim has no ACL / aclrtStream concept, so these no-op + * to satisfy the uniform host_runtime.so ABI that ChipWorker dlsym's. The + * real comm_* entry points come from src/common/platform_comm/comm_sim.cpp, + * which is compiled into this runtime via CMakeLists. + * =========================================================================== */ + +int ensure_acl_ready_ctx(DeviceContextHandle ctx, int device_id) { + (void)ctx; + (void)device_id; + return 0; +} + +void *create_comm_stream_ctx(DeviceContextHandle ctx) { + (void)ctx; + return NULL; +} + +int destroy_comm_stream_ctx(DeviceContextHandle ctx, void *stream) { + (void)ctx; + (void)stream; + return 0; +} + /* =========================================================================== * Internal helpers called from runtime_maker.cpp via Runtime.host_api * =========================================================================== */ diff --git a/src/a2a3/platform/include/host/comm.h b/src/common/platform_comm/comm.h similarity index 100% rename from src/a2a3/platform/include/host/comm.h rename to src/common/platform_comm/comm.h diff --git a/src/a2a3/platform/include/common/comm_context.h b/src/common/platform_comm/comm_context.h similarity index 100% rename from src/a2a3/platform/include/common/comm_context.h rename to src/common/platform_comm/comm_context.h diff --git a/src/common/platform_comm/comm_sim.cpp b/src/common/platform_comm/comm_sim.cpp new file mode 100644 index 000000000..90ef8fc3c --- /dev/null +++ b/src/common/platform_comm/comm_sim.cpp @@ -0,0 +1,378 @@ +/* + * Copyright (c) PyPTO Contributors. + * This program is free software, you can redistribute it and/or modify it under the terms and conditions of + * CANN Open Software License Agreement Version 2.0 (the "License"). + * Please refer to the License for details. You may not use this file except in compliance with the License. + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, + * INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. + * See LICENSE in the root of the software repository for the full text of the License. + * ----------------------------------------------------------------------------------------------------------- + */ +/** + * Simulation backend for the comm_* distributed communication API. + * + * Uses POSIX shared memory (shm_open + mmap) so that multiple *processes* + * (one per rank) share the same window region. Synchronization primitives + * (barrier counters) live in the shared region itself, using GCC __atomic + * builtins which are lock-free-safe on mmap'd memory. + * + * Shared memory layout (page-aligned header + per-rank windows): + * [ SharedHeader (4096 bytes) ][ rank-0 window ][ rank-1 window ] ... + * + * L1a contract alignment notes: + * - comm_init takes (int rank, int nranks, void *stream, const char *rootinfo_path). + * The sim backend ignores `stream` (no ACL/device in simulation). + * - nranks is bounds-checked against COMM_MAX_RANK_NUM (64) because the + * CommContext windowsIn/windowsOut arrays are fixed-size. + * - windowsOut[i] is filled (mirrors windowsIn[i] in sim since there is no + * separate remote-write channel). Kernels that consume windowsOut on the + * HCCL backend must still compile-and-run on sim. + * - ftruncate wait + barrier + destroy all use an explicit timeout + * (SIM_COMM_TIMEOUT_SECONDS) so a dead peer cannot hang survivors forever. + * - extern "C" entry points allocate std::string so exceptions are wrapped + * in function-try-blocks to avoid escaping the C ABI. + */ + +#include "platform_comm/comm.h" +#include "platform_comm/comm_context.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace { + +constexpr size_t HEADER_SIZE = 4096; +constexpr int SIM_COMM_TIMEOUT_SECONDS = 120; +constexpr int FTRUNCATE_POLL_INTERVAL_US = 1000; +constexpr int BARRIER_POLL_INTERVAL_US = 50; +constexpr int DESTROY_POLL_INTERVAL_US = 1000; + +struct SharedHeader { + volatile int nranks; + volatile int alloc_done; + volatile int ready_count; + volatile int barrier_count; + volatile int barrier_phase; + volatile int destroy_count; + size_t per_rank_win_size; +}; + +// Build a session-scoped shm name. +// +// Hashing only the rootinfo_path produces a stable name across test re-runs +// with the same path, which means a crashed prior run that left its segment +// behind in /dev/shm would collide: the new rank 0 hits EEXIST, attaches to +// the dead segment, and reads a stale alloc_done=1 / ready_count that may +// desynchronize the barrier. +// +// Mixing in getppid() disambiguates by launching process tree: every fork of +// the same parent (the canonical sim launch pattern — one driver process +// spawns N ranks) agrees on the name, while a subsequent re-launch gets a new +// parent PID and therefore a fresh name. Cross-node / cross-parent launches +// on sim are out of scope; callers relying on those topologies must use the +// HCCL backend. +std::string make_shm_name(const char *rootinfo_path) { + size_t h = std::hash{}(rootinfo_path ? rootinfo_path : "default"); + char buf[96]; + std::snprintf(buf, sizeof(buf), "/simpler_comm_%d_%zx", static_cast(getppid()), h); + return {buf}; +} + +// Poll `check` until it returns true or the timeout elapses. Uses steady_clock +// so wall-clock NTP adjustments cannot desynchronize the wait. +bool wait_until(const std::function &check, int timeout_seconds, int poll_interval_us) { + auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(timeout_seconds); + while (!check()) { + if (std::chrono::steady_clock::now() >= deadline) { + return false; + } + usleep(poll_interval_us); + } + return true; +} + +} // namespace + +struct CommHandle_ { + int rank; + int nranks; + std::string shm_name; + + void *mmap_base = nullptr; + size_t mmap_size = 0; + bool is_creator = false; + + CommContext host_ctx{}; +}; + +extern "C" CommHandle comm_init(int rank, int nranks, void *stream, const char *rootinfo_path) try { + (void)stream; // sim has no ACL / stream concept + + if (rootinfo_path == nullptr) { + std::fprintf(stderr, "[comm_sim rank %d] comm_init: rootinfo_path is null\n", rank); + return nullptr; + } + if (rank < 0 || nranks <= 0 || rank >= nranks) { + std::fprintf(stderr, "[comm_sim] comm_init: invalid rank=%d nranks=%d\n", rank, nranks); + return nullptr; + } + if (static_cast(nranks) > COMM_MAX_RANK_NUM) { + std::fprintf( + stderr, "[comm_sim rank %d] comm_init: nranks=%d exceeds COMM_MAX_RANK_NUM=%u\n", rank, nranks, + COMM_MAX_RANK_NUM + ); + return nullptr; + } + + auto *h = new (std::nothrow) CommHandle_{}; + if (h == nullptr) { + std::fprintf(stderr, "[comm_sim rank %d] comm_init: allocation failed\n", rank); + return nullptr; + } + + h->rank = rank; + h->nranks = nranks; + h->shm_name = make_shm_name(rootinfo_path); + return h; +} catch (const std::exception &e) { + std::fprintf(stderr, "[comm_sim rank %d] comm_init: exception: %s\n", rank, e.what()); + return nullptr; +} catch (...) { + std::fprintf(stderr, "[comm_sim rank %d] comm_init: unknown exception\n", rank); + return nullptr; +} + +extern "C" int comm_alloc_windows(CommHandle h, size_t win_size, uint64_t *device_ctx_out) try { + if (h == nullptr || device_ctx_out == nullptr) return -1; + + size_t total = HEADER_SIZE + win_size * static_cast(h->nranks); + + int fd = shm_open(h->shm_name.c_str(), O_CREAT | O_EXCL | O_RDWR, 0600); + if (fd >= 0) { + h->is_creator = true; + if (ftruncate(fd, static_cast(total)) != 0) { + std::fprintf(stderr, "[comm_sim rank %d] ftruncate failed: %s\n", h->rank, std::strerror(errno)); + close(fd); + shm_unlink(h->shm_name.c_str()); + return -1; + } + } else if (errno == EEXIST) { + fd = shm_open(h->shm_name.c_str(), O_RDWR, 0600); + if (fd < 0) { + std::fprintf(stderr, "[comm_sim rank %d] shm_open: %s\n", h->rank, std::strerror(errno)); + return -1; + } + // Wait for creator to finish ftruncate by checking file size. + bool sized = wait_until( + [fd, total]() { + struct stat st; + return fstat(fd, &st) == 0 && static_cast(st.st_size) >= total; + }, + SIM_COMM_TIMEOUT_SECONDS, FTRUNCATE_POLL_INTERVAL_US + ); + if (!sized) { + std::fprintf( + stderr, "[comm_sim rank %d] ftruncate wait timed out after %ds\n", h->rank, SIM_COMM_TIMEOUT_SECONDS + ); + close(fd); + return -1; + } + } else { + std::fprintf(stderr, "[comm_sim rank %d] shm_open O_EXCL: %s\n", h->rank, std::strerror(errno)); + return -1; + } + + void *base = mmap(nullptr, total, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + close(fd); + if (base == MAP_FAILED) { + std::fprintf(stderr, "[comm_sim rank %d] mmap: %s\n", h->rank, std::strerror(errno)); + return -1; + } + + h->mmap_base = base; + h->mmap_size = total; + + auto *hdr = static_cast(base); + + if (h->is_creator) { + hdr->per_rank_win_size = win_size; + hdr->ready_count = 0; + hdr->barrier_count = 0; + hdr->barrier_phase = 0; + hdr->destroy_count = 0; + __atomic_store_n(&hdr->nranks, h->nranks, __ATOMIC_RELEASE); + __atomic_store_n(&hdr->alloc_done, 1, __ATOMIC_RELEASE); + } else { + bool ready = wait_until( + [hdr]() { + return __atomic_load_n(&hdr->alloc_done, __ATOMIC_ACQUIRE) != 0; + }, + SIM_COMM_TIMEOUT_SECONDS, FTRUNCATE_POLL_INTERVAL_US + ); + if (!ready) { + std::fprintf( + stderr, "[comm_sim rank %d] alloc_done wait timed out after %ds\n", h->rank, SIM_COMM_TIMEOUT_SECONDS + ); + return -1; + } + } + + auto *win_base = static_cast(base) + HEADER_SIZE; + + // Cross-process addressing contract (differs from HCCL's GVA model!): + // + // Each rank's CommContext.windowsIn/windowsOut[i] holds *this process's* + // pointer to rank i's slice of the shared mmap region. Because the + // underlying fd is MAP_SHARED, each rank's own writes to its own slice + // are visible to other ranks that read through *their own* windowsIn[i] + // entry — but the numerical addresses do NOT match across processes + // (ASLR + independent mmap placement). This is fine as long as kernels + // only dereference their own rank's CommContext; the hardware UT's + // cross-rank address-agreement assert is specifically an HCCL-GVA + // invariant and is not expected to hold (nor intended to run) under sim. + auto &ctx = h->host_ctx; + ctx.workSpace = 0; + ctx.workSpaceSize = 0; + ctx.rankId = static_cast(h->rank); + ctx.rankNum = static_cast(h->nranks); + ctx.winSize = win_size; + for (int i = 0; i < h->nranks; ++i) { + uint64_t addr = reinterpret_cast(win_base + static_cast(i) * win_size); + ctx.windowsIn[i] = addr; + // Sim has no separate remote-write channel; mirror windowsIn so kernels + // that read windowsOut still see a valid per-rank address. + ctx.windowsOut[i] = addr; + } + + *device_ctx_out = reinterpret_cast(&h->host_ctx); + + __atomic_add_fetch(&hdr->ready_count, 1, __ATOMIC_ACQ_REL); + bool all_ready = wait_until( + [hdr, h]() { + return __atomic_load_n(&hdr->ready_count, __ATOMIC_ACQUIRE) >= h->nranks; + }, + SIM_COMM_TIMEOUT_SECONDS, BARRIER_POLL_INTERVAL_US + ); + if (!all_ready) { + std::fprintf( + stderr, "[comm_sim rank %d] ready_count barrier timed out after %ds\n", h->rank, SIM_COMM_TIMEOUT_SECONDS + ); + return -1; + } + + return 0; +} catch (const std::exception &e) { + std::fprintf(stderr, "[comm_sim] comm_alloc_windows: exception: %s\n", e.what()); + return -1; +} catch (...) { + std::fprintf(stderr, "[comm_sim] comm_alloc_windows: unknown exception\n"); + return -1; +} + +extern "C" int comm_get_local_window_base(CommHandle h, uint64_t *base_out) { + if (h == nullptr || base_out == nullptr) return -1; + *base_out = h->host_ctx.windowsIn[h->rank]; + return 0; +} + +extern "C" int comm_get_window_size(CommHandle h, size_t *size_out) { + if (h == nullptr || size_out == nullptr) return -1; + *size_out = static_cast(h->host_ctx.winSize); + return 0; +} + +extern "C" int comm_barrier(CommHandle h) { + if (h == nullptr || h->mmap_base == nullptr) return -1; + + // Sense-reversing barrier. Each caller snapshots `phase` before + // incrementing `barrier_count`, then waits until `phase` advances. This + // ordering — snapshot → increment → wait-for-change — is what makes a + // back-to-back re-entry race-free: a fast rank that returns from this + // barrier and immediately re-enters for the NEXT one will read the + // already-advanced phase as its snapshot, so its own count increment + // is accounted for the new generation instead of the old. + // + // The last rank's (count=0 then phase+1) release-ordered pair ensures + // that any rank exiting the wait on the phase change also sees the + // reset count before it can contribute to the next barrier, so + // concurrent re-entry cannot corrupt the pending generation. + auto *hdr = static_cast(h->mmap_base); + int phase = __atomic_load_n(&hdr->barrier_phase, __ATOMIC_ACQUIRE); + int arrived = __atomic_add_fetch(&hdr->barrier_count, 1, __ATOMIC_ACQ_REL); + + if (arrived == h->nranks) { + __atomic_store_n(&hdr->barrier_count, 0, __ATOMIC_RELEASE); + __atomic_add_fetch(&hdr->barrier_phase, 1, __ATOMIC_ACQ_REL); + return 0; + } + + bool advanced = wait_until( + [hdr, phase]() { + return __atomic_load_n(&hdr->barrier_phase, __ATOMIC_ACQUIRE) != phase; + }, + SIM_COMM_TIMEOUT_SECONDS, BARRIER_POLL_INTERVAL_US + ); + if (!advanced) { + std::fprintf( + stderr, "[comm_sim rank %d] barrier timed out after %ds (phase=%d arrived=%d nranks=%d)\n", h->rank, + SIM_COMM_TIMEOUT_SECONDS, phase, arrived, h->nranks + ); + return -1; + } + return 0; +} + +extern "C" int comm_destroy(CommHandle h) try { + if (h == nullptr) return -1; + + int rc = 0; + if (h->mmap_base != nullptr) { + auto *hdr = static_cast(h->mmap_base); + int gone = __atomic_add_fetch(&hdr->destroy_count, 1, __ATOMIC_ACQ_REL); + + // Last rank out unlinks the shm segment. Earlier ranks wait a bounded + // time so that, on the common "all ranks destroy in lockstep" path, + // the unlink actually happens before the next test re-creates it. + // On a dead-peer path, the timeout elapses, we still munmap and exit, + // and the segment lingers until /dev/shm is cleared. + if (gone >= h->nranks) { + munmap(h->mmap_base, h->mmap_size); + h->mmap_base = nullptr; + shm_unlink(h->shm_name.c_str()); + } else { + bool drained = wait_until( + [hdr, h]() { + return __atomic_load_n(&hdr->destroy_count, __ATOMIC_ACQUIRE) >= h->nranks; + }, + SIM_COMM_TIMEOUT_SECONDS, DESTROY_POLL_INTERVAL_US + ); + munmap(h->mmap_base, h->mmap_size); + h->mmap_base = nullptr; + if (!drained) { + std::fprintf( + stderr, "[comm_sim rank %d] destroy barrier timed out after %ds; local teardown complete\n", + h->rank, SIM_COMM_TIMEOUT_SECONDS + ); + rc = -1; + } + } + } + + delete h; + return rc; +} catch (const std::exception &e) { + std::fprintf(stderr, "[comm_sim] comm_destroy: exception: %s\n", e.what()); + return -1; +} catch (...) { + std::fprintf(stderr, "[comm_sim] comm_destroy: unknown exception\n"); + return -1; +} diff --git a/src/common/worker/chip_worker.cpp b/src/common/worker/chip_worker.cpp index f6185feef..066e79a3f 100644 --- a/src/common/worker/chip_worker.cpp +++ b/src/common/worker/chip_worker.cpp @@ -121,6 +121,20 @@ void ChipWorker::init( get_runtime_size_fn_ = load_symbol(handle, "get_runtime_size"); run_runtime_fn_ = load_symbol(handle, "run_runtime"); finalize_device_fn_ = load_symbol(handle, "finalize_device"); + // ACL lifecycle + comm_* are part of the uniform host_runtime.so ABI. + // Every platform runtime exports all of them — runtimes that do not + // have a real backend (today: a5) ship not-supported stubs rather + // than omitting the symbols. This keeps ChipWorker.init platform- + // agnostic: no per-symbol probing, no half-loaded extension groups. + ensure_acl_ready_fn_ = load_symbol(handle, "ensure_acl_ready_ctx"); + create_comm_stream_fn_ = load_symbol(handle, "create_comm_stream_ctx"); + destroy_comm_stream_fn_ = load_symbol(handle, "destroy_comm_stream_ctx"); + comm_init_fn_ = load_symbol(handle, "comm_init"); + comm_alloc_windows_fn_ = load_symbol(handle, "comm_alloc_windows"); + comm_get_local_window_base_fn_ = load_symbol(handle, "comm_get_local_window_base"); + comm_get_window_size_fn_ = load_symbol(handle, "comm_get_window_size"); + comm_barrier_fn_ = load_symbol(handle, "comm_barrier"); + comm_destroy_fn_ = load_symbol(handle, "comm_destroy"); } catch (...) { dlclose(handle); throw; @@ -169,6 +183,14 @@ void ChipWorker::reset_device() { } void ChipWorker::finalize() { + // Defensive: if the user never called comm_destroy, reclaim the stream + // before we tear down the device context (otherwise the stream-backing + // ACL state outlives its owning context). + if (comm_stream_ != nullptr && device_ctx_ != nullptr && destroy_comm_stream_fn_ != nullptr) { + destroy_comm_stream_fn_(device_ctx_, comm_stream_); + } + comm_stream_ = nullptr; + reset_device(); if (device_ctx_ != nullptr && destroy_device_context_fn_ != nullptr) { destroy_device_context_fn_(device_ctx_); @@ -188,6 +210,15 @@ void ChipWorker::finalize() { get_runtime_size_fn_ = nullptr; run_runtime_fn_ = nullptr; finalize_device_fn_ = nullptr; + ensure_acl_ready_fn_ = nullptr; + create_comm_stream_fn_ = nullptr; + destroy_comm_stream_fn_ = nullptr; + comm_init_fn_ = nullptr; + comm_alloc_windows_fn_ = nullptr; + comm_get_local_window_base_fn_ = nullptr; + comm_get_window_size_fn_ = nullptr; + comm_barrier_fn_ = nullptr; + comm_destroy_fn_ = nullptr; runtime_buf_.clear(); aicpu_binary_.clear(); aicore_binary_.clear(); @@ -260,3 +291,94 @@ void ChipWorker::copy_from(uint64_t dst, uint64_t src, size_t size) { throw std::runtime_error("copy_from failed with code " + std::to_string(rc)); } } + +uint64_t ChipWorker::comm_init(int rank, int nranks, const std::string &rootinfo_path) { + if (!device_set_) { + throw std::runtime_error("ChipWorker device not set; call set_device() first"); + } + if (comm_stream_ != nullptr) { + throw std::runtime_error("comm_init: a comm session is already active on this ChipWorker"); + } + + // Bring ACL up on the calling thread before stream creation. Onboard + // runs aclInit (idempotent) + per-thread aclrtSetDevice; sim's stub is + // a no-op; platforms with no distributed backend (a5 today) also no-op. + int rc = ensure_acl_ready_fn_(device_ctx_, device_id_); + if (rc != 0) { + throw std::runtime_error("ensure_acl_ready failed with code " + std::to_string(rc)); + } + + // Create an aclrtStream owned by this ChipWorker. Sim / a5 stubs + // return NULL; their raw comm_init ignores the stream arg. A NULL + // from a runtime that has a real comm backend is a genuine failure — + // we can't distinguish "stub returned NULL on purpose" from "onboard + // create failed" here, so we defer the check to comm_init's own + // return value below (a stub returns NULL from comm_init anyway). + void *stream = create_comm_stream_fn_(device_ctx_); + + void *handle = comm_init_fn_(rank, nranks, stream, rootinfo_path.c_str()); + if (handle == nullptr) { + // Roll back the stream we just created — otherwise the ChipWorker + // leaks it and the next comm_init attempt trips the + // "session already active" guard above. + if (stream != nullptr) { + destroy_comm_stream_fn_(device_ctx_, stream); + } + throw std::runtime_error("comm_init failed"); + } + + comm_stream_ = stream; + return reinterpret_cast(handle); +} + +uint64_t ChipWorker::comm_alloc_windows(uint64_t comm_handle, size_t win_size) { + uint64_t device_ctx = 0; + int rc = comm_alloc_windows_fn_(reinterpret_cast(comm_handle), win_size, &device_ctx); + if (rc != 0) { + throw std::runtime_error("comm_alloc_windows failed with code " + std::to_string(rc)); + } + return device_ctx; +} + +uint64_t ChipWorker::comm_get_local_window_base(uint64_t comm_handle) { + uint64_t base = 0; + int rc = comm_get_local_window_base_fn_(reinterpret_cast(comm_handle), &base); + if (rc != 0) { + throw std::runtime_error("comm_get_local_window_base failed with code " + std::to_string(rc)); + } + return base; +} + +size_t ChipWorker::comm_get_window_size(uint64_t comm_handle) { + size_t win_size = 0; + int rc = comm_get_window_size_fn_(reinterpret_cast(comm_handle), &win_size); + if (rc != 0) { + throw std::runtime_error("comm_get_window_size failed with code " + std::to_string(rc)); + } + return win_size; +} + +void ChipWorker::comm_barrier(uint64_t comm_handle) { + int rc = comm_barrier_fn_(reinterpret_cast(comm_handle)); + if (rc != 0) { + throw std::runtime_error("comm_barrier failed with code " + std::to_string(rc)); + } +} + +void ChipWorker::comm_destroy(uint64_t comm_handle) { + int rc = comm_destroy_fn_(reinterpret_cast(comm_handle)); + + // Destroy our comm-owned stream regardless of the handle-destroy result — + // leaking the stream is the worse outcome (it keeps the device attached + // and blocks the next session). We still surface the underlying rc + // below so callers see the original failure. + if (comm_stream_ != nullptr) { + int srv = destroy_comm_stream_fn_(device_ctx_, comm_stream_); + if (srv != 0 && rc == 0) rc = srv; + } + comm_stream_ = nullptr; + + if (rc != 0) { + throw std::runtime_error("comm_destroy failed with code " + std::to_string(rc)); + } +} diff --git a/src/common/worker/chip_worker.h b/src/common/worker/chip_worker.h index affc73f40..74c4e4a91 100644 --- a/src/common/worker/chip_worker.h +++ b/src/common/worker/chip_worker.h @@ -61,6 +61,29 @@ class ChipWorker : public IWorker { void copy_to(uint64_t dst, uint64_t src, size_t size); void copy_from(uint64_t dst, uint64_t src, size_t size); + /// Distributed communication primitives (optional — only available when + /// the bound runtime exports comm_*). Wraps the backend-neutral C API + /// defined in src//platform/include/host/comm.h. + /// + /// Unlike the raw C API (which takes a caller-owned aclrtStream), + /// ChipWorker's comm_init owns ACL + stream lifetime internally: + /// - On onboard, comm_init drives ensure_acl_ready_ctx + creates an + /// aclrtStream via the DeviceRunner, stashes the stream, and pairs + /// it with comm_destroy which destroys it. This keeps ACL out of + /// the Python layer (matching the doc's L2-boundary contract: + /// device-side lifecycle stays in C++, not leaking up as + /// ensure_acl_ready / aclrtCreateStream surface area). + /// - On sim, ACL / stream are no-ops; the stashed stream is null. + /// + /// One active comm session per ChipWorker is supported. Users needing + /// multiple concurrent comms should instantiate multiple ChipWorkers. + uint64_t comm_init(int rank, int nranks, const std::string &rootinfo_path); + uint64_t comm_alloc_windows(uint64_t comm_handle, size_t win_size); + uint64_t comm_get_local_window_base(uint64_t comm_handle); + size_t comm_get_window_size(uint64_t comm_handle); + void comm_barrier(uint64_t comm_handle); + void comm_destroy(uint64_t comm_handle); + int device_id() const { return device_id_; } bool initialized() const { return initialized_; } bool device_set() const { return device_set_; } @@ -79,6 +102,15 @@ class ChipWorker : public IWorker { int, int ); using FinalizeDeviceFn = int (*)(void *); + using EnsureAclReadyFn = int (*)(void *, int); + using CreateCommStreamFn = void *(*)(void *); + using DestroyCommStreamFn = int (*)(void *, void *); + using CommInitFn = void *(*)(int, int, void *, const char *); + using CommAllocWindowsFn = int (*)(void *, size_t, uint64_t *); + using CommGetLocalWindowBaseFn = int (*)(void *, uint64_t *); + using CommGetWindowSizeFn = int (*)(void *, size_t *); + using CommBarrierFn = int (*)(void *); + using CommDestroyFn = int (*)(void *); void *lib_handle_ = nullptr; CreateDeviceContextFn create_device_context_fn_ = nullptr; @@ -91,7 +123,21 @@ class ChipWorker : public IWorker { GetRuntimeSizeFn get_runtime_size_fn_ = nullptr; RunRuntimeFn run_runtime_fn_ = nullptr; FinalizeDeviceFn finalize_device_fn_ = nullptr; + EnsureAclReadyFn ensure_acl_ready_fn_ = nullptr; + CreateCommStreamFn create_comm_stream_fn_ = nullptr; + DestroyCommStreamFn destroy_comm_stream_fn_ = nullptr; + CommInitFn comm_init_fn_ = nullptr; + CommAllocWindowsFn comm_alloc_windows_fn_ = nullptr; + CommGetLocalWindowBaseFn comm_get_local_window_base_fn_ = nullptr; + CommGetWindowSizeFn comm_get_window_size_fn_ = nullptr; + CommBarrierFn comm_barrier_fn_ = nullptr; + CommDestroyFn comm_destroy_fn_ = nullptr; void *device_ctx_ = nullptr; + // aclrtStream owned by the currently-active comm session (created inside + // comm_init on onboard via DeviceRunner::create_comm_stream, paired with + // destroy_comm_stream in comm_destroy). Null when no comm is active or + // when running on a backend without ACL (sim). + void *comm_stream_ = nullptr; std::vector runtime_buf_; std::vector aicpu_binary_; diff --git a/tests/ut/cpp/CMakeLists.txt b/tests/ut/cpp/CMakeLists.txt index d87099ed3..ced571d80 100644 --- a/tests/ut/cpp/CMakeLists.txt +++ b/tests/ut/cpp/CMakeLists.txt @@ -215,6 +215,9 @@ if(SIMPLER_ENABLE_HARDWARE_TESTS) target_include_directories(${name} PRIVATE ${GTEST_INCLUDE_DIRS} ${PROJECT_ROOT}/src/a2a3/platform/include + # Shared platform_comm headers (comm.h / comm_context.h) moved + # under src/common so both a2a3 HCCL and a5/a2a3 sim can use them. + ${PROJECT_ROOT}/src/common ${ASCEND_HOME_PATH}/include ) target_compile_options(${name} PRIVATE -D_GLIBCXX_USE_CXX11_ABI=0) diff --git a/tests/ut/cpp/test_hccl_comm.cpp b/tests/ut/cpp/test_hccl_comm.cpp index 1e597e3f9..858c488de 100644 --- a/tests/ut/cpp/test_hccl_comm.cpp +++ b/tests/ut/cpp/test_hccl_comm.cpp @@ -65,8 +65,8 @@ #include #include "acl/acl.h" -#include "common/comm_context.h" -#include "host/comm.h" +#include "platform_comm/comm.h" +#include "platform_comm/comm_context.h" namespace { diff --git a/tests/ut/py/test_worker/test_platform_comm.py b/tests/ut/py/test_worker/test_platform_comm.py new file mode 100644 index 000000000..0983ca576 --- /dev/null +++ b/tests/ut/py/test_worker/test_platform_comm.py @@ -0,0 +1,264 @@ +# Copyright (c) PyPTO Contributors. +# This program is free software, you can redistribute it and/or modify it under the terms and conditions of +# CANN Open Software License Agreement Version 2.0 (the "License"). +# Please refer to the License for details. You may not use this file except in compliance with the License. +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, +# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. +# See LICENSE in the root of the software repository for the full text of the License. +# ----------------------------------------------------------------------------------------------------------- +# ruff: noqa: PLC0415 +"""Hardware UT for ChipWorker.comm_* wrappers (Python surface of the L1a HCCL backend). + +This is the Python twin of tests/ut/cpp/test_hccl_comm.cpp. It drives the +full comm lifecycle entirely through ChipWorker's public Python API: + + ChipWorker.init → set_device → comm_init → comm_alloc_windows + → comm_get_local_window_base → comm_get_window_size + → copy_from (reads back CommContext) → comm_barrier (known-issue tolerant) + → comm_destroy → finalize + +ACL bring-up and aclrtStream lifetime are owned internally by +`ChipWorker.comm_init` / `comm_destroy` (matching the L2-boundary contract +in docs/hierarchical_level_runtime.md — device-side state stays in C++). +ctypes is used only to declare a `CommContext` layout mirror so the test +can inspect the struct returned by `comm_alloc_windows`; no CANN / libacl +symbols are loaded from Python. + +Each rank runs in a forked subprocess so HCCL sees a distinct device context +per rank. The parent only waits on exit codes plus a small result queue used +to surface CommContext field values. + +Known issue inherited from L1a (HCCL 507018): on certain CANN builds +`HcclBarrier` + `aclrtSynchronizeStream` report 507018 after ~52s of timeout. +That is a CANN-coupling bug tracked separately; this test treats a barrier +failure as a warning and still asserts the non-barrier invariants (init/alloc +/ctx-fields/destroy) succeeded. +""" + +from __future__ import annotations + +import ctypes +import multiprocessing as mp +import os +import sys +import traceback +import warnings +from pathlib import Path + +import pytest + +_ROOT = Path(__file__).resolve().parents[4] +for _p in (_ROOT, _ROOT / "python"): + _s = str(_p) + if _s not in sys.path: + sys.path.insert(0, _s) + + +# --------------------------------------------------------------------------- +# CommContext layout — must stay byte-compatible with +# src/a2a3/platform/include/common/comm_context.h (static_asserts there). +# If CANN / HCCL ever shifts these offsets, comm_hccl.cpp's build-time asserts +# will fail first; this struct mirrors them so the Python side can read back a +# CommContext without rebuilding nanobind just to expose the layout. +# --------------------------------------------------------------------------- +_COMM_MAX_RANK_NUM = 64 + + +class _CommContext(ctypes.Structure): + _fields_ = [ + ("workSpace", ctypes.c_uint64), + ("workSpaceSize", ctypes.c_uint64), + ("rankId", ctypes.c_uint32), + ("rankNum", ctypes.c_uint32), + ("winSize", ctypes.c_uint64), + ("windowsIn", ctypes.c_uint64 * _COMM_MAX_RANK_NUM), + ("windowsOut", ctypes.c_uint64 * _COMM_MAX_RANK_NUM), + ] + + +assert ctypes.sizeof(_CommContext) == 1056, "CommContext python mirror drifted from C++ header" + + +def _rank_entry( + rank: int, + nranks: int, + device_id: int, + host_lib: str, + aicpu_path: str, + aicore_path: str, + sim_context_path: str, + rootinfo_path: str, + result_queue: mp.Queue, # type: ignore[type-arg] +) -> None: + """Worker-process body: runs comm_* lifecycle for one rank and reports results.""" + result: dict[str, object] = {"rank": rank, "stage": "start", "ok": False} + try: + from simpler.task_interface import ChipWorker + + worker = ChipWorker() + worker.init(host_lib, aicpu_path, aicore_path, sim_context_path) + result["stage"] = "init" + + worker.set_device(device_id) + result["stage"] = "set_device" + + # ChipWorker.comm_init owns ACL bring-up and aclrtStream creation + # internally — Python never touches aclInit / aclrtSetDevice / + # aclrtCreateStream. This matches the L2-boundary contract in + # docs/hierarchical_level_runtime.md: device-side lifecycle stays + # in C++. + comm = worker.comm_init(rank, nranks, rootinfo_path) + result["stage"] = "comm_init" + + # 4 KiB is the same window hint the C++ UT uses. HCCL may round this + # up; we cross-check the returned winSize against comm_get_window_size. + device_ctx_ptr = worker.comm_alloc_windows(comm, 4096) + if device_ctx_ptr == 0: + raise RuntimeError("comm_alloc_windows returned null device_ctx") + result["stage"] = "alloc" + + local_base = worker.comm_get_local_window_base(comm) + win_size = worker.comm_get_window_size(comm) + if local_base == 0: + raise RuntimeError("comm_get_local_window_base returned 0") + if win_size < 4096: + raise RuntimeError(f"comm_get_window_size={win_size} < 4096") + result["stage"] = "query" + + # ABI guard: copy the CommContext that HCCL populates back to host + # and verify every field we consume in kernels matches what we asked + # for. This is the Python twin of EXIT_CTX_FIELDS from the C++ UT. + # worker.copy_from() is the ChipWorker's device-to-host DMA; we hand + # it the host address of a ctypes-backed _CommContext buffer. + host_ctx = _CommContext() + worker.copy_from(ctypes.addressof(host_ctx), device_ctx_ptr, ctypes.sizeof(host_ctx)) + result["stage"] = "memcpy" + + if host_ctx.rankId != rank: + raise AssertionError(f"rankId={host_ctx.rankId}, expected {rank}") + if host_ctx.rankNum != nranks: + raise AssertionError(f"rankNum={host_ctx.rankNum}, expected {nranks}") + if host_ctx.winSize != win_size: + raise AssertionError(f"winSize={host_ctx.winSize}, expected {win_size}") + if host_ctx.windowsIn[rank] != local_base: + raise AssertionError(f"windowsIn[{rank}]=0x{host_ctx.windowsIn[rank]:x} != local_base=0x{local_base:x}") + peer_windows = [int(host_ctx.windowsIn[i]) for i in range(nranks)] + if any(w == 0 for w in peer_windows): + raise AssertionError(f"peer windowsIn contains zero: {peer_windows}") + result["stage"] = "ctx_fields_ok" + result["peer_windows"] = peer_windows + result["win_size"] = int(win_size) + result["local_base"] = int(local_base) + result["rank_id"] = int(host_ctx.rankId) + result["rank_num"] = int(host_ctx.rankNum) + + # Barrier. L1a observed CANN error 507018 here on some builds; that + # bug is tracked independently. Surface the failure to the parent as + # a warning and continue with teardown so the non-barrier invariants + # above still gate this test. + try: + worker.comm_barrier(comm) + result["barrier_ok"] = True + except Exception as barrier_exc: # noqa: BLE001 + result["barrier_ok"] = False + result["barrier_error"] = str(barrier_exc) + + worker.comm_destroy(comm) + result["stage"] = "destroy" + + worker.finalize() + result["stage"] = "finalize" + result["ok"] = True + except Exception: # noqa: BLE001 + result["error"] = traceback.format_exc() + finally: + result_queue.put(result) + + +# --------------------------------------------------------------------------- +# Test +# --------------------------------------------------------------------------- + + +@pytest.mark.requires_hardware +@pytest.mark.platforms(["a2a3"]) +@pytest.mark.device_count(2) +def test_two_rank_comm_lifecycle(st_device_ids): + """End-to-end 2-rank hardware smoke test for ChipWorker.comm_* wrappers.""" + from simpler_setup.runtime_builder import RuntimeBuilder + + build = bool(os.environ.get("PTO_UT_BUILD")) + bins = RuntimeBuilder(platform="a2a3").get_binaries("tensormap_and_ringbuffer", build=build) + host_lib = str(bins.host_path) + aicpu_path = str(bins.aicpu_path) + aicore_path = str(bins.aicore_path) + sim_context_path = str(bins.sim_context_path) if bins.sim_context_path else "" + + assert len(st_device_ids) >= 2, "device_count(2) fixture must yield >= 2 ids" + nranks = 2 + rootinfo_path = f"/tmp/pto_comm_py_ut_rootinfo_{os.getpid()}.bin" + + ctx = mp.get_context("fork") + result_queue: mp.Queue = ctx.Queue() # type: ignore[type-arg] + procs = [] + for rank in range(nranks): + p = ctx.Process( + target=_rank_entry, + args=( + rank, + nranks, + int(st_device_ids[rank]), + host_lib, + aicpu_path, + aicore_path, + sim_context_path, + rootinfo_path, + result_queue, + ), + daemon=False, + ) + p.start() + procs.append(p) + + # Drain the queue before joining — workers block on queue.put if the pipe + # buffer fills, so pulling N results first avoids a fork+deadlock. + results_by_rank: dict[int, dict] = {} + for _ in range(nranks): + r = result_queue.get(timeout=180) + results_by_rank[int(r["rank"])] = r + + for p in procs: + p.join(timeout=60) + + try: + os.unlink(rootinfo_path) + except FileNotFoundError: + pass + + for rank in range(nranks): + if rank not in results_by_rank: + pytest.fail(f"rank {rank} never reported a result") + r = results_by_rank[rank] + if not r.get("ok"): + pytest.fail(f"rank {rank} failed at stage {r.get('stage')!r}:\n{r.get('error', '(no traceback)')}") + + # Each rank's own-slot invariant (windowsIn[rank] == local_base) is + # asserted inside _rank_entry; all peer slots are already checked to be + # non-zero there. We deliberately do NOT assert cross-rank address + # agreement at this layer: under HCCL, windowsIn[i] holds the *current + # rank's device-local view* of peer i's window (HBM pointer resolved via + # remoteRes / MESH remapping), which is not required to numerically + # equal peer i's own local_base. The C++ hardware UT makes the same + # weaker check for the same reason — anything stricter would fail on + # every well-formed HCCL deployment. + + # Barrier is allowed to fail under the known 507018 regression; emit a + # warning instead of failing the test. The non-barrier invariants above + # are the load-bearing assertions here. + barrier_failures = [r for r in range(nranks) if not results_by_rank[r].get("barrier_ok")] + if barrier_failures: + msgs = "; ".join(f"rank {r}: {results_by_rank[r].get('barrier_error', '?')}" for r in barrier_failures) + warnings.warn( + f"comm_barrier failed on {len(barrier_failures)}/{nranks} ranks (known issue 507018): {msgs}", + stacklevel=1, + )