diff --git a/python/bindings/worker_bind.h b/python/bindings/worker_bind.h index 6b00dd142..fbc715770 100644 --- a/python/bindings/worker_bind.h +++ b/python/bindings/worker_bind.h @@ -28,6 +28,7 @@ #include #include +#include #include #include "chip_bootstrap_channel.h" @@ -40,6 +41,40 @@ namespace nb = nanobind; +// --------------------------------------------------------------------------- +// Mailbox acquire/release helpers (exposed to Python as _mailbox_load_i32 / +// _mailbox_store_i32). Mirror WorkerThread::read_mailbox_state / +// write_mailbox_state in worker_manager.cpp so the Python side of the mailbox +// handshake uses the same memory order as the C++ side. Without these, a +// plain struct.pack_into("i", ...) on the Python child followed by the parent +// C++ acquire-load on aarch64 can observe the state flip before the +// preceding error-field writes are visible. +inline int32_t mailbox_load_i32(uint64_t addr) { + volatile int32_t *ptr = reinterpret_cast(addr); + int32_t v; +#if defined(__aarch64__) + __asm__ volatile("ldar %w0, [%1]" : "=r"(v) : "r"(ptr) : "memory"); +#elif defined(__x86_64__) + v = *ptr; + __asm__ volatile("" ::: "memory"); +#else + __atomic_load(ptr, &v, __ATOMIC_ACQUIRE); +#endif + return v; +} + +inline void mailbox_store_i32(uint64_t addr, int32_t v) { + volatile int32_t *ptr = reinterpret_cast(addr); +#if defined(__aarch64__) + __asm__ volatile("stlr %w0, [%1]" : : "r"(v), "r"(ptr) : "memory"); +#elif defined(__x86_64__) + __asm__ volatile("" ::: "memory"); + *ptr = v; +#else + __atomic_store(ptr, &v, __ATOMIC_RELEASE); +#endif +} + inline void bind_worker(nb::module_ &m) { // --- WorkerType --- nb::enum_(m, "WorkerType").value("NEXT_LEVEL", WorkerType::NEXT_LEVEL).value("SUB", WorkerType::SUB); @@ -279,4 +314,22 @@ inline void bind_worker(nb::module_ &m) { .def_prop_ro("actual_window_size", &ChipBootstrapChannel::actual_window_size) .def_prop_ro("buffer_ptrs", &ChipBootstrapChannel::buffer_ptrs) .def_prop_ro("error_message", &ChipBootstrapChannel::error_message); + + // Private mailbox acquire/release helpers — only for simpler.worker. The + // underscore prefix keeps them out of the public surface; they do not + // appear in task_interface.__all__. + m.def( + "_mailbox_load_i32", + [](uint64_t addr) -> int32_t { + return mailbox_load_i32(addr); + }, + nb::arg("addr"), "Acquire-load a 32-bit mailbox word at `addr`." + ); + m.def( + "_mailbox_store_i32", + [](uint64_t addr, int32_t value) { + mailbox_store_i32(addr, value); + }, + nb::arg("addr"), nb::arg("value"), "Release-store a 32-bit mailbox word at `addr`." + ); } diff --git a/python/simpler/worker.py b/python/simpler/worker.py index bcbab7a4c..c342b8bbd 100644 --- a/python/simpler/worker.py +++ b/python/simpler/worker.py @@ -53,6 +53,11 @@ def my_l4_orch(orch, args, config): from multiprocessing.shared_memory import SharedMemory from typing import Any, Callable, Optional +from _task_interface import ( # pyright: ignore[reportMissingImports] + _mailbox_load_i32, + _mailbox_store_i32, +) + from .orchestrator import Orchestrator from .task_interface import ( MAILBOX_ERROR_MSG_SIZE, @@ -117,6 +122,16 @@ def _mailbox_addr(shm: SharedMemory) -> int: return ctypes.addressof(ctypes.c_char.from_buffer(buf)) +def _buffer_field_addr(buf, offset: int) -> int: + """Absolute address of a field inside a shared-memory buffer. + + Used to feed `_mailbox_load_i32` / `_mailbox_store_i32`, which operate on + raw pointers so the acquire/release semantics match the C++ side + (worker_manager.cpp::read_mailbox_state / write_mailbox_state). + """ + return ctypes.addressof(ctypes.c_char.from_buffer(buf)) + offset + + def _write_error(buf, code: int, msg: str = "") -> None: """Write an (error code, message) tuple into the mailbox error region. @@ -185,8 +200,9 @@ def _sub_worker_loop(buf, registry: dict) -> None: error-message region; the parent's ``WorkerThread::dispatch_process`` rethrows it as ``std::runtime_error``. """ + state_addr = _buffer_field_addr(buf, _OFF_STATE) while True: - state = struct.unpack_from("i", buf, _OFF_STATE)[0] + state = _mailbox_load_i32(state_addr) if state == _TASK_READY: cid = struct.unpack_from("Q", buf, _OFF_CALLABLE)[0] fn = registry.get(int(cid)) @@ -203,7 +219,7 @@ def _sub_worker_loop(buf, registry: dict) -> None: code = 1 msg = _format_exc("sub_worker", e) _write_error(buf, code, msg) - struct.pack_into("i", buf, _OFF_STATE, _TASK_DONE) + _mailbox_store_i32(state_addr, _TASK_DONE) elif state == _SHUTDOWN: break @@ -237,12 +253,13 @@ def _chip_process_loop( return mailbox_addr = ctypes.addressof(ctypes.c_char.from_buffer(buf)) + state_addr = mailbox_addr + _OFF_STATE args_ptr = mailbox_addr + _OFF_ARGS sys.stderr.write(f"[chip_process pid={os.getpid()} dev={device_id}] ready\n") sys.stderr.flush() while True: - state = struct.unpack_from("i", buf, _OFF_STATE)[0] + state = _mailbox_load_i32(state_addr) if state == _TASK_READY: callable_ptr = struct.unpack_from("Q", buf, _OFF_CALLABLE)[0] block_dim = struct.unpack_from("i", buf, _OFF_BLOCK_DIM)[0] @@ -257,7 +274,7 @@ def _chip_process_loop( code = 1 msg = _format_exc(f"chip_process dev={device_id}", e) _write_error(buf, code, msg) - struct.pack_into("i", buf, _OFF_STATE, _TASK_DONE) + _mailbox_store_i32(state_addr, _TASK_DONE) elif state == _CONTROL_REQUEST: sub_cmd = struct.unpack_from("Q", buf, _OFF_CALLABLE)[0] code = 0 @@ -284,7 +301,7 @@ def _chip_process_loop( code = 1 msg = _format_exc(f"chip_process dev={device_id} ctrl={int(sub_cmd)}", e) _write_error(buf, code, msg) - struct.pack_into("i", buf, _OFF_STATE, _CONTROL_DONE) + _mailbox_store_i32(state_addr, _CONTROL_DONE) elif state == _SHUTDOWN: cw.finalize() break @@ -312,8 +329,9 @@ def _child_worker_loop( ``inner_worker.run(orch_fn, args, cfg)`` which opens its own scope, runs the orch function, and drains. """ + state_addr = _buffer_field_addr(buf, _OFF_STATE) while True: - state = struct.unpack_from("i", buf, _OFF_STATE)[0] + state = _mailbox_load_i32(state_addr) if state == _TASK_READY: cid = struct.unpack_from("Q", buf, _OFF_CALLABLE)[0] orch_fn = registry.get(int(cid)) @@ -331,7 +349,7 @@ def _child_worker_loop( code = 1 msg = _format_exc(f"child_worker level={inner_worker.level}", e) _write_error(buf, code, msg) - struct.pack_into("i", buf, _OFF_STATE, _TASK_DONE) + _mailbox_store_i32(state_addr, _TASK_DONE) elif state == _SHUTDOWN: inner_worker.close() break @@ -449,7 +467,7 @@ def _init_hierarchical(self) -> None: for _ in range(n_sub): shm = SharedMemory(create=True, size=MAILBOX_SIZE) assert shm.buf is not None - struct.pack_into("i", shm.buf, _OFF_STATE, _IDLE) + _mailbox_store_i32(_buffer_field_addr(shm.buf, _OFF_STATE), _IDLE) self._sub_shms.append(shm) # 2. Prepare chip-worker config (L3 only — L4+ has Worker children instead) @@ -472,14 +490,14 @@ def _init_hierarchical(self) -> None: for _ in device_ids: shm = SharedMemory(create=True, size=MAILBOX_SIZE) assert shm.buf is not None - struct.pack_into("i", shm.buf, _OFF_STATE, _IDLE) + _mailbox_store_i32(_buffer_field_addr(shm.buf, _OFF_STATE), _IDLE) self._chip_shms.append(shm) # 3. Allocate next-level Worker child mailboxes (L4+ only). for _ in self._next_level_workers: shm = SharedMemory(create=True, size=MAILBOX_SIZE) assert shm.buf is not None - struct.pack_into("i", shm.buf, _OFF_STATE, _IDLE) + _mailbox_store_i32(_buffer_field_addr(shm.buf, _OFF_STATE), _IDLE) self._next_level_shms.append(shm) # 4. Construct the _Worker *before* fork so the HeapRing mmap @@ -584,21 +602,22 @@ def _chip_control(self, worker_id: int, sub_cmd: int, arg0: int = 0, arg1: int = shm = self._chip_shms[worker_id] buf = shm.buf assert buf is not None + state_addr = _buffer_field_addr(buf, _OFF_STATE) _write_error(buf, 0, "") struct.pack_into("Q", buf, _OFF_CALLABLE, sub_cmd) struct.pack_into("Q", buf, _CTRL_OFF_ARG0, arg0) struct.pack_into("Q", buf, _CTRL_OFF_ARG1, arg1) struct.pack_into("Q", buf, _CTRL_OFF_ARG2, arg2) - struct.pack_into("i", buf, _OFF_STATE, _CONTROL_REQUEST) - while struct.unpack_from("i", buf, _OFF_STATE)[0] != _CONTROL_DONE: + _mailbox_store_i32(state_addr, _CONTROL_REQUEST) + while _mailbox_load_i32(state_addr) != _CONTROL_DONE: pass error = struct.unpack_from("i", buf, _OFF_ERROR)[0] if error != 0: err_msg = _read_error_msg(buf) - struct.pack_into("i", buf, _OFF_STATE, _IDLE) + _mailbox_store_i32(state_addr, _IDLE) raise RuntimeError(f"chip control command {sub_cmd} failed on worker {worker_id}: {err_msg}") result = struct.unpack_from("Q", buf, _CTRL_OFF_RESULT)[0] - struct.pack_into("i", buf, _OFF_STATE, _IDLE) + _mailbox_store_i32(state_addr, _IDLE) return result def malloc(self, size: int, worker_id: int = 0) -> int: @@ -705,7 +724,7 @@ def close(self) -> None: for shm in self._sub_shms: buf = shm.buf assert buf is not None - struct.pack_into("i", buf, _OFF_STATE, _SHUTDOWN) + _mailbox_store_i32(_buffer_field_addr(buf, _OFF_STATE), _SHUTDOWN) for pid in self._sub_pids: os.waitpid(pid, 0) for shm in self._sub_shms: @@ -716,7 +735,7 @@ def close(self) -> None: for shm in self._chip_shms: buf = shm.buf assert buf is not None - struct.pack_into("i", buf, _OFF_STATE, _SHUTDOWN) + _mailbox_store_i32(_buffer_field_addr(buf, _OFF_STATE), _SHUTDOWN) for pid in self._chip_pids: os.waitpid(pid, 0) for shm in self._chip_shms: @@ -728,7 +747,7 @@ def close(self) -> None: for shm in self._next_level_shms: buf = shm.buf assert buf is not None - struct.pack_into("i", buf, _OFF_STATE, _SHUTDOWN) + _mailbox_store_i32(_buffer_field_addr(buf, _OFF_STATE), _SHUTDOWN) for pid in self._next_level_pids: os.waitpid(pid, 0) for shm in self._next_level_shms: diff --git a/tests/ut/py/test_worker/test_mailbox_atomics.py b/tests/ut/py/test_worker/test_mailbox_atomics.py new file mode 100644 index 000000000..9295f2d84 --- /dev/null +++ b/tests/ut/py/test_worker/test_mailbox_atomics.py @@ -0,0 +1,218 @@ +# Copyright (c) PyPTO Contributors. +# This program is free software, you can redistribute it and/or modify it under the terms and conditions of +# CANN Open Software License Agreement Version 2.0 (the "License"). +# Please refer to the License for details. You may not use this file except in compliance with the License. +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, +# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. +# See LICENSE in the root of the software repository for the full text of the License. +# ----------------------------------------------------------------------------------------------------------- +"""Unit tests for _mailbox_load_i32 / _mailbox_store_i32. + +These helpers bridge Python mailbox accesses to the same acquire/release +memory order that the C++ side uses (worker_manager.cpp::read_mailbox_state / +write_mailbox_state). The tests exercise: + +1. Single-process roundtrip — binding signatures work, value is preserved. +2. Cross-process visibility via fork() on a MAP_SHARED SharedMemory region. +3. Field-ordering invariant: a payload written BEFORE the state release-store + is visible to any reader that observes the state via acquire-load. This is + the guarantee the three worker loops rely on to publish OFF_ERROR / + OFF_ERROR_MSG along with the TASK_DONE transition. +4. No regression in the L4 error-propagation paths that exercise every + refactored site in practice — imported from ``test_error_propagation`` so + this test module's CI run acts as a second line of defense. + +All cases run on CPU only (no NPU required) and complete in well under a +second each. +""" + +import ctypes +import os +import struct +import time +from multiprocessing.shared_memory import SharedMemory + +import pytest +from _task_interface import _mailbox_load_i32, _mailbox_store_i32 # pyright: ignore[reportMissingImports] + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _addr(buf, offset: int = 0) -> int: + """Absolute address of ``buf[offset]`` — must match the helper in worker.py.""" + return ctypes.addressof(ctypes.c_char.from_buffer(buf)) + offset + + +@pytest.fixture +def shm(): + """32-byte MAP_SHARED region; freed and unlinked after the test.""" + region = SharedMemory(create=True, size=32) + try: + yield region + finally: + region.close() + region.unlink() + + +# --------------------------------------------------------------------------- +# 1. Single-process roundtrip +# --------------------------------------------------------------------------- + + +class TestSingleProcessRoundtrip: + def test_roundtrip(self, shm): + addr = _addr(shm.buf, 0) + _mailbox_store_i32(addr, 42) + assert _mailbox_load_i32(addr) == 42 + + def test_negative(self, shm): + addr = _addr(shm.buf, 0) + _mailbox_store_i32(addr, -1) + assert _mailbox_load_i32(addr) == -1 + + def test_offset(self, shm): + # Confirm the helpers operate on absolute addresses — writing at +8 must + # not touch the word at offset 0. + base = _addr(shm.buf, 0) + _mailbox_store_i32(base, 7) + _mailbox_store_i32(base + 8, 99) + assert _mailbox_load_i32(base) == 7 + assert _mailbox_load_i32(base + 8) == 99 + + +# --------------------------------------------------------------------------- +# 2. Cross-process visibility (fork) +# --------------------------------------------------------------------------- + + +class TestCrossProcess: + def test_child_transitions_visible_in_parent(self, shm): + """Child cycles state 0→1→2→3→0; parent must at least see the final 0. + + We don't assert every intermediate value (the parent poll rate is not + synchronized with the child), but the terminal 0 must land. + """ + addr = _addr(shm.buf, 0) + _mailbox_store_i32(addr, -1) + + pid = os.fork() + if pid == 0: + try: + for v in (0, 1, 2, 3, 0): + _mailbox_store_i32(addr, v) + time.sleep(0.001) + finally: + os._exit(0) + + deadline = time.monotonic() + 5.0 + final_seen = False + while time.monotonic() < deadline: + v = _mailbox_load_i32(addr) + if v == 0: + final_seen = True + break + os.waitpid(pid, 0) + assert final_seen, "parent never observed child's final state=0" + + +# --------------------------------------------------------------------------- +# 3. Release/acquire field-ordering invariant +# --------------------------------------------------------------------------- + + +class TestFieldOrderingInvariant: + @pytest.mark.parametrize("iterations", [1000]) + def test_payload_visible_when_state_observed(self, iterations): + """The core L3 invariant. + + Layout: int32 state @ off 0, uint64 payload @ off 8. + Child: write payload (plain), then release-store state=1. + Parent: acquire-load state until == 1, then read payload. + Any observation of state==1 MUST come with payload == SENTINEL. + + On aarch64, dropping the release on the child store or the acquire on + the parent load would let this race fail. We run 1000 iterations to + make the failure reproducible on weakly-ordered hardware. + """ + SENTINEL = 0xDEADBEEFCAFEBABE + + for _ in range(iterations): + region = SharedMemory(create=True, size=32) + try: + buf = region.buf + assert buf is not None + state_addr = _addr(buf, 0) + _mailbox_store_i32(state_addr, 0) + struct.pack_into("Q", buf, 8, 0) + + pid = os.fork() + if pid == 0: + try: + # Writes to the payload are plain; they must become + # visible *before* the release-store of state=1 to any + # reader that acquire-loads state==1. + struct.pack_into("Q", buf, 8, SENTINEL) + _mailbox_store_i32(state_addr, 1) + finally: + os._exit(0) + + deadline = time.monotonic() + 5.0 + while _mailbox_load_i32(state_addr) != 1: + if time.monotonic() > deadline: + os.waitpid(pid, 0) + pytest.fail("child never published state=1") + payload = struct.unpack_from("Q", buf, 8)[0] + os.waitpid(pid, 0) + + assert payload == SENTINEL, f"reordering observed: state=1 visible but payload=0x{payload:016x}" + finally: + region.close() + region.unlink() + + +# --------------------------------------------------------------------------- +# 4. Refactor smoke: end-to-end worker loop still dispatches cleanly +# --------------------------------------------------------------------------- + + +class TestWorkerSmoke: + def test_l3_sub_roundtrip(self): + """A sub-worker dispatch round-trips through the refactored loop. + + The callable runs in a forked child, so we use a MAP_SHARED counter to + count dispatches back in the parent. Each successful ``run()`` must + increment the counter by 1. This exercises the TASK_READY → TASK_DONE + state flip that now uses the new acquire/release helpers on both + sides of the Python side of the mailbox. + """ + from simpler.worker import Worker # noqa: PLC0415 + + counter_shm = SharedMemory(create=True, size=4) + try: + buf = counter_shm.buf + assert buf is not None + struct.pack_into("i", buf, 0, 0) + counter_addr = _addr(buf, 0) + + def sub(args): + v = _mailbox_load_i32(counter_addr) + _mailbox_store_i32(counter_addr, v + 1) + + hw = Worker(level=3, num_sub_workers=1) + cid = hw.register(sub) + hw.init() + try: + + def orch(o, args, cfg): + o.submit_sub(cid) + + hw.run(orch) + hw.run(orch) + assert _mailbox_load_i32(counter_addr) == 2 + finally: + hw.close() + finally: + counter_shm.close() + counter_shm.unlink()