Skip to content

feat(torch_codegen): cross-core split + scheduler for tpush/tpop ops#1107

Open
zhaozhaozz wants to merge 1 commit intohw-native-sys:mainfrom
zhaozhaozz:pr/torch-codegen-split-tpush-tpop
Open

feat(torch_codegen): cross-core split + scheduler for tpush/tpop ops#1107
zhaozhaozz wants to merge 1 commit intohw-native-sys:mainfrom
zhaozhaozz:pr/torch-codegen-split-tpush-tpop

Conversation

@zhaozhaozz
Copy link
Copy Markdown
Contributor

@zhaozhaozz zhaozhaozz commented Apr 21, 2026

Summary

Make torch_codegen produce numerically faithful PyTorch for the four
cross-core pipe ops in both unidirectional and bidirectional Group
bodies, and for split modes that fan a tile out to two AIV subblocks
running in parallel.

Closes #1032.

Why

The previous codegen for the cross-core ops had two shortcomings:

  1. tpop_from_aic(split>0) returned only subblock 0's half and dropped
    subblock 1's data. End-to-end comparison against the hardware was
    therefore impossible whenever a kernel used UpDown / LeftRight split.
  2. Group bodies that used bidirectional pipes (V↔C) or same-side
    feedback (push + pop on the same direction) deadlocked at codegen
    time because each AIC/AIV body ran to completion before the other
    side started, so the receiver popped from an empty deque.

Both made torch_codegen unusable as a precision-debugging reference
for any cross-core kernel beyond the simplest unidirectional, no-split
case.

What changed

Codegen / runtime helpers

  • Preamble adds four direction-specific helpers (_tpush_to_aiv,
    _tpush_to_aic, _tpop_from_aic, _tpop_from_aiv) plus
    generator-mode variants (_tpush_to_aiv_g, …) that yield
    _WaitPop / _WaitPush at every pipe sync point.
  • _tpop_from_aic(pipe, split) now reassembles the two queued halves
    via torch.cat, so the legacy single-AIV roundtrip path returns the
    full tile instead of dropping half the data.
  • Per-subblock pipes (to_aiv_sb0/sb1, to_aic_sb0/sb1) plus a
    _current_sb[0] mutable singleton model two AIV subblocks running
    the same kernel on different halves of a split tile. The scheduler
    injects the active subblock id before each next() / send() and
    the AIV body reads it via tile.get_subblock_idx, mapped to
    _current_sb[0].
  • _run_scheduler is a cooperative round-robin over generator-style
    AIC/AIV bodies. It raises a clear deadlock error when no task can
    make progress.

Codegen visitor

  • _detect_scheduled_groups triggers the scheduler path on
    bidirectional pipe usage, same-side feedback, or any pipe op
    carrying split>0. AIV members that touch split>0 are emitted as
    two scheduler tasks (one per AIV subblock id).
  • _walk_pipe_calls / _function_uses_split / _scan_pipe_ops
    centralize pipe-op IR walking so detection stays in one place.
  • visit_program emits AIC/AIV → Opaque/Other → Group → Orchestration,
    with _visit_group_function adding a # Group: ... comment and
    _generate_entry_point wrapping multi-function programs in
    def run(...).
  • _stable_hints + _var_refs prevent nanobind GC id recycling from
    silently remapping function params; _visit_expr_str dispatches
    nested ir.Call nodes through visit_call so handlers are reached
    for all argument shapes.

End-to-end shape (split = UpDown example)

AIC: matmul(a, b) = R(4×4)
   ├─ tpush_to_aiv split=1 ──► to_aiv_sb0 = R[0:2]   to_aiv_sb1 = R[2:4]
   │
AIV#0 (sb=0):                       AIV#1 (sb=1):
  half = pop sb0 (2×4)                 half = pop sb1 (2×4)
  res_half = slice(residual,           res_half = slice(residual,
              offset=0*2)                            offset=1*2)
  out_half = half + res_half           out_half = half + res_half
  tpush_to_aic split=1 ──►             tpush_to_aic split=1 ──►
    to_aic_sb0 = out_half                to_aic_sb1 = out_half
   │                                    │
AIC: tpop_from_aiv split=1 ──► cat([sb0, sb1], dim=-2) = full(4×4)
     store(out)

out == matmul(a, b) + residual for every split mode.

Tests

  • UT (tests/ut/debug/test_torch_codegen.py)
    • split semantics for the four cross-core ops
    • multi-Group isolation
    • qwen3-style cross-core matmul + residual (UpDown / LeftRight),
      full-tile correctness
    • bidirectional pipe smoke tests, error / unsupported-op cases
    • legacy roundtrip tests updated to expect the full reassembled tile
  • ST (tests/st/codegen/test_torch_codegen_cross_core.py)
    • V2C and C2V × {NoSplit, UpDown, LeftRight} torch_codegen ≈ torch
      golden, with split>0 paths driving the bidirectional reassembly
  • ST (tests/st/codegen/test_torch_codegen_scheduler.py)
    • cooperative scheduler over bidirectional, same-side-feedback,
      plain unidirectional, and deadlock cases

All 76 torch_codegen tests pass locally; remaining failures in
tests/st/codegen/ (paged_attention etc.) are pre-existing in this
worktree and unrelated to this change.

Reproduce

cd pypto_torch_codegen
source .venv/bin/activate
pytest tests/ut/debug/test_torch_codegen.py \
       tests/st/codegen/test_torch_codegen_cross_core.py \
       tests/st/codegen/test_torch_codegen_scheduler.py -q

Copilot AI review requested due to automatic review settings April 21, 2026 04:20
@gemini-code-assist
Copy link
Copy Markdown
Contributor

Warning

You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again!

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 21, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds split-aware cross-core pipe simulation, cooperative scheduler emission, stable identifier sanitization/scoping, dependency-ordered function emission, and Program-level run(...) entry-point generation to torch_codegen; updates op dispatch and runtime subblock handling and adds extensive unit/system tests for V2C/C2V and scheduler paths.

Changes

Cohort / File(s) Summary
Core Codegen
python/pypto/debug/torch_codegen.py
Added identifier sanitization/unique-name utilities, stable-name tracking, per-function var-scope reset, dependency-ordered function emission, Program run wrapper generation, split-aware pipe helpers (_tpush_to_*, _tpop_from_*), generator-based scheduler machinery, scheduler-aware op dispatch, and runtime subblock index usage.
System Tests — Cross-Core
tests/st/codegen/test_torch_codegen_cross_core.py
New system tests constructing IR programs to validate V2C and C2V cross-core push/pop across split modes 0/1/2; assert emitted helper/scheduler fragments and numeric correctness vs. PyTorch golden.
System Tests — Scheduler
tests/st/codegen/test_torch_codegen_scheduler.py
New scheduler-focused system tests validating generator-based scheduler emission, deadlock detection, and numeric correctness for bidirectional and same-side feedback scenarios.
Unit Tests — Torch Codegen
tests/ut/debug/test_torch_codegen.py
Expanded unit tests: updated pipe-op assertions to expect split-aware helper calls, added push/pop roundtrip numeric tests across split modes, nested-call coverage, bidirectional/edge-case pipe-state tests, and shape/empty-pipe/error checks.

Sequence Diagram(s)

sequenceDiagram
    participant Client as Test / Caller
    participant CG as TorchCodegen
    participant AIC as AIC Func
    participant AIV as AIV Func
    participant Pipes as Module Pipes<br/>(deques)

    Client->>CG: torch_codegen(Program)
    activate CG
    CG->>CG: _reset_var_scope(), build funcs in order
    CG->>AIC: emit AIC function (may use _tpush/_tpop)
    CG->>AIV: emit AIV function (may use _tpush/_tpop)
    CG->>CG: emit Group / Orchestration and entry `run(...)`
    deactivate CG
    Client->>run: call generated run(...)
    activate run
    run->>AIC: call AIC (pushes via helper)
    AIC->>Pipes: _tpush_to_aiv / _tpush_to_aic (split-aware)
    Pipes-->>AIV: data available
    run->>AIV: call AIV (pops via helper)
    AIV->>Pipes: _tpop_from_aic / _tpop_from_aiv (split-aware)
    deactivate run
Loading
sequenceDiagram
    participant Scheduler as _run_scheduler
    participant GenHelper as _tpush/_tpop_*_g
    participant Pipes as Module Pipes<br/>(per-subblock)
    participant Worker as Subblock Generator

    Scheduler->>GenHelper: yield from _tpush_to_aic_g(...)
    activate GenHelper
    GenHelper->>Pipes: append per-subblock slice
    GenHelper-->>Scheduler: yield control
    Scheduler->>GenHelper: resume for _tpop_from_aiv_g(...)
    GenHelper->>Pipes: pop and reassemble via torch.cat
    GenHelper-->>Scheduler: returned tensor
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested reviewers

  • Hzfengsy
  • lyfne123

Poem

🐰 Through deque-lined hops the tensors glide,
Split wise and tidy, carried side by side.
Scheduler yields, generators play,
Names kept steady, scopes in sway.
Hop, hop, codegen — run the cross-core ride!

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 70.69% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly describes the main feature: adding cross-core split and scheduler support for tpush/tpop ops in torch_codegen.
Description check ✅ Passed The description is comprehensive and directly related to the changeset, explaining the why, what, and how of the cross-core split and scheduler implementation.
Linked Issues check ✅ Passed The PR implementation fully addresses all objectives in issue #1032: enhanced tpush/tpop kwargs handling (split, pipe_id), cross-core program simulation via scheduler/multi-function ordering, and comprehensive end-to-end tests.
Out of Scope Changes check ✅ Passed All changes are within scope: torch_codegen enhancements for cross-core ops, related test modules (unit and system tests), and supporting IR utilities. No extraneous refactorings or unrelated fixes detected.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
tests/st/codegen/test_torch_codegen_cross_core.py (1)

250-325: LGTM — pins codegen contract + numerical equivalence across split modes.

Parametrization over (0, 1, 2) for both V2C and C2V, combined with literal-string assertions on the emitted _tpush_to_aiv / _tpop_from_aic / _tpush_to_aic / _tpop_from_aiv calls plus per-tile torch.allclose against a hand-written golden, correctly anchors the split semantics at the codegen level as intended by the PR objective. The ns["_pipes"] reset on line 256 is defensively idempotent (the preamble already initializes fresh deques in a new exec namespace) but harmless.

One small optional simplification: code is generated twice per test (once at line 282/311 for the contract assertions, once inside _run_codegen). Sharing via a helper like _codegen_and_run(program, specs) would halve codegen work, but it is negligible at these sizes.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/st/codegen/test_torch_codegen_cross_core.py` around lines 250 - 325,
Tests currently call torch_codegen(program, ...) twice (once in the test for
string assertions and again inside _run_codegen), wasting work; refactor by
generating code once and reusing it: either add a helper like
_codegen_and_run(program, code, specs) or change _run_codegen to accept an
optional precomputed code parameter (reference _run_codegen, torch_codegen, and
the test functions test_v2c_tpush_tpop_codegen_vs_golden /
test_c2v_tpush_tpop_codegen_vs_golden) and update the tests to call
torch_codegen(program, check_shapes=False) only once, perform the literal-string
assertions against that code, then pass the same code into the runner to execute
the generated run().
python/pypto/debug/torch_codegen.py (1)

560-588: Edge cases in _generate_entry_point.

Two small gaps worth handling:

  1. If the selected entry_func.name is exactly "run", the emitted wrapper becomes def run(...): return run(...) — unbounded recursion at exec time. A guard that either renames the wrapper or skips wrapping when the name collides would be safer.

  2. When a program contains multiple Group functions (covered by test_program_with_multiple_group_functions) and no Orchestration/Opaque, only the first Group is wrapped by run(...); the others are reachable only by their IR names. That's probably fine for debug but is not documented — either picking the last (outermost) group or emitting a comment listing the alternatives would reduce surprise.

Consider a small guard like:

🛠️ Suggested fix for (1)
     if entry_func is None:
         return ""
+    if entry_func.name == "run":
+        # Avoid emitting `def run(...): return run(...)` (infinite recursion).
+        return ""
     param_names = _make_unique_names([p.name_hint for p in entry_func.params])
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@python/pypto/debug/torch_codegen.py` around lines 560 - 588, The
_generate_entry_point function can produce a recursive wrapper when
entry_func.name == "run" and silently hides additional Group functions; fix it
by checking entry_func.name and if it equals "run" either skip emitting the
wrapper or emit it under a safe name like "run_wrapped" (update the returned def
name and call target accordingly), and for the Group-case when selecting the
entry function (where you inspect _ir.FunctionType.Group) gather all group
function names and, if more than one exists, include a short comment above the
emitted wrapper listing the other group names (or note that only the selected
group will be wrapped) so callers are aware; update references to
_generate_entry_point, entry_func.name, and the Group-selection loop and use
_make_unique_names as before.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@python/pypto/debug/torch_codegen.py`:
- Around line 655-670: The current population of _stable_hints in visit_function
and _visit_group_function can overwrite mappings when multiple params share the
same p.name_hint, causing _name_of to resolve a fresh nanobind wrapper to the
wrong param; change the registration logic in both visit_function and
_visit_group_function so you only add _stable_hints[p.name_hint] =
self._var_names[id(p)] when the hint is unambiguous (e.g., not already seen for
a different param or not present multiple times among params), otherwise do not
register that hint (or mark it as ambiguous) so _name_of continues to call
_unique_name for any Var whose id() is not tracked rather than silently
remapping via _stable_hints.

---

Nitpick comments:
In `@python/pypto/debug/torch_codegen.py`:
- Around line 560-588: The _generate_entry_point function can produce a
recursive wrapper when entry_func.name == "run" and silently hides additional
Group functions; fix it by checking entry_func.name and if it equals "run"
either skip emitting the wrapper or emit it under a safe name like "run_wrapped"
(update the returned def name and call target accordingly), and for the
Group-case when selecting the entry function (where you inspect
_ir.FunctionType.Group) gather all group function names and, if more than one
exists, include a short comment above the emitted wrapper listing the other
group names (or note that only the selected group will be wrapped) so callers
are aware; update references to _generate_entry_point, entry_func.name, and the
Group-selection loop and use _make_unique_names as before.

In `@tests/st/codegen/test_torch_codegen_cross_core.py`:
- Around line 250-325: Tests currently call torch_codegen(program, ...) twice
(once in the test for string assertions and again inside _run_codegen), wasting
work; refactor by generating code once and reusing it: either add a helper like
_codegen_and_run(program, code, specs) or change _run_codegen to accept an
optional precomputed code parameter (reference _run_codegen, torch_codegen, and
the test functions test_v2c_tpush_tpop_codegen_vs_golden /
test_c2v_tpush_tpop_codegen_vs_golden) and update the tests to call
torch_codegen(program, check_shapes=False) only once, perform the literal-string
assertions against that code, then pass the same code into the runner to execute
the generated run().
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 085e57c6-e213-41c6-b38a-afbbd022734d

📥 Commits

Reviewing files that changed from the base of the PR and between e8f32c5 and 27b6f15.

📒 Files selected for processing (3)
  • python/pypto/debug/torch_codegen.py
  • tests/st/codegen/test_torch_codegen_cross_core.py
  • tests/ut/debug/test_torch_codegen.py

Comment thread python/pypto/debug/torch_codegen.py
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds split-aware torch-level simulation for cross-core tile.tpush_*/tile.tpop_* ops so torch_codegen can be used for numerical verification of split-mode cross-core programs (e.g., qwen3-style patterns).

Changes:

  • Extend python/pypto/debug/torch_codegen.py with split-aware pipe helpers and improved multi-function Program emission (AIC/AIV/Group/Orchestration ordering + run(...) entry point).
  • Expand UT coverage for split semantics, nested calls, bidirectional pipes, and program-level simulation.
  • Add new STs that compare torch_codegen output against a torch “golden” for both directions (V2C and C2V) across split modes.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.

File Description
python/pypto/debug/torch_codegen.py Implements split-aware tpush/tpop helpers, group/program emission changes, and entry-point generation.
tests/ut/debug/test_torch_codegen.py Adds extensive unit tests for split-aware pipe ops and cross-core simulation behaviors.
tests/st/codegen/test_torch_codegen_cross_core.py New system tests for cross-core split semantics vs a torch golden reference.

Comment thread python/pypto/debug/torch_codegen.py
Comment thread python/pypto/debug/torch_codegen.py Outdated
Comment thread python/pypto/debug/torch_codegen.py
Comment thread tests/ut/debug/test_torch_codegen.py
Comment thread tests/st/codegen/test_torch_codegen_cross_core.py Outdated
@zhaozhaozz zhaozhaozz force-pushed the pr/torch-codegen-split-tpush-tpop branch from 27b6f15 to 30963ad Compare April 21, 2026 04:51
@zhaozhaozz
Copy link
Copy Markdown
Contributor Author

Pushed 30963ad addressing the review.

Applied

  • Copilot L20 (test docstring): Updated path from runtime/test_cross_core.py to tests/st/runtime/test_cross_core.py.
  • Copilot L751 (_visit_group_function unused param): Renamed program_program to mark as intentionally unused.
  • Copilot L668 + CodeRabbit L670 (_stable_hints ambiguity): Real bug surface. Added _register_param_hints helper that drops a hint when two params share the same name_hint, so fresh nanobind wrappers fall back to counter-based uniquing instead of silently resolving to the wrong param. Used in both visit_function and _visit_group_function.
  • CodeRabbit nit (run-name recursion in _generate_entry_point): Added a guard that returns empty string when entry_func.name == "run" to avoid emitting def run(...): return run(...).

Declined

  • Copilot L518 / L1000 (long lines exceed 110): False positive — ruff check passes locally on these files. The cited lines are 102 and 105 chars respectively, both under the configured E501 limit of 110.
  • CodeRabbit nit (codegen called twice per ST): The second call inside _run_codegen keeps the helper self-contained and reusable; the cost is microseconds at these IR sizes. Refactoring would couple the runner to caller-side codegen lifetime for negligible benefit.

UT + ST: 70/70 passing locally (60 UT + 6 new ST + 4 misc). pre-commit clean.

@zhaozhaozz zhaozhaozz force-pushed the pr/torch-codegen-split-tpush-tpop branch from 30963ad to 3d1fd1d Compare April 21, 2026 10:47
@zhaozhaozz zhaozhaozz changed the title feat(torch_codegen): support split mode in tpush/tpop ops feat(torch_codegen): cross-core split + scheduler for tpush/tpop ops Apr 21, 2026
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
tests/st/codegen/test_torch_codegen_cross_core.py (1)

424-430: Pin the V2C return-path push helper too.

For split > 0, this program also executes tile.tpush_to_aic on the AIV return path, but the contract assertions only check the matching _tpop_from_aiv_g. Add the _tpush_to_aic_g assertion so this test catches regressions where the return push stops using scheduler-generator form.

Proposed test assertion
         assert "_run_scheduler([" in code, f"[{label}] expected scheduler emission for split>0"
         assert f"_tpush_to_aiv_g(_pipes['to_aiv'], result, {split})" in code
         assert f"_tpop_from_aic_g(_pipes['to_aiv'], {split})" in code
+        assert f"_tpush_to_aic_g(_pipes['to_aic'], add_half, {split})" in code
         assert f"_tpop_from_aiv_g(_pipes['to_aic'], {split})" in code
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/st/codegen/test_torch_codegen_cross_core.py` around lines 424 - 430,
The test for the split>0 path is missing an assertion that the AIV return-path
push uses the scheduler-generator helper; add an assertion checking that
"_tpush_to_aic_g(_pipes['to_aic'], result, {split})" appears in the emitted code
alongside the existing checks for "_run_scheduler([", "_tpush_to_aiv_g(...,
{split})", "_tpop_from_aic_g(..., {split})", and "_tpop_from_aiv_g(...,
{split})" so regressions where tile.tpush_to_aic switches away from the
scheduler-generator form are caught; update the block handling split>0 to
include this new assertion referencing the symbol _tpush_to_aic_g.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@tests/ut/debug/test_torch_codegen.py`:
- Around line 1108-1155: Group 2 has the pipe direction reversed: aic_func2
currently calls "tile.tpush_to_aic" and aiv_func2 calls "tile.tpop_from_aiv",
but to model AIV→AIC ownership you should have the AIV push and the AIC pop.
Move the "tile.tpush_to_aic" call into aiv_func2 (aiv_norm) and replace the call
in aic_func2 (aic_activation) with "tile.tpop_from_aiv" so that aiv_norm issues
the push and aic_activation performs the corresponding pop.

---

Nitpick comments:
In `@tests/st/codegen/test_torch_codegen_cross_core.py`:
- Around line 424-430: The test for the split>0 path is missing an assertion
that the AIV return-path push uses the scheduler-generator helper; add an
assertion checking that "_tpush_to_aic_g(_pipes['to_aic'], result, {split})"
appears in the emitted code alongside the existing checks for
"_run_scheduler([", "_tpush_to_aiv_g(..., {split})", "_tpop_from_aic_g(...,
{split})", and "_tpop_from_aiv_g(..., {split})" so regressions where
tile.tpush_to_aic switches away from the scheduler-generator form are caught;
update the block handling split>0 to include this new assertion referencing the
symbol _tpush_to_aic_g.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 7826b98d-c90b-47e1-a1d4-62b93ceaa9ce

📥 Commits

Reviewing files that changed from the base of the PR and between 27b6f15 and 3d1fd1d.

📒 Files selected for processing (4)
  • python/pypto/debug/torch_codegen.py
  • tests/st/codegen/test_torch_codegen_cross_core.py
  • tests/st/codegen/test_torch_codegen_scheduler.py
  • tests/ut/debug/test_torch_codegen.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • python/pypto/debug/torch_codegen.py

Comment thread tests/ut/debug/test_torch_codegen.py Outdated
Make the four cross-core pipe ops produce numerically faithful PyTorch
for both unidirectional and bidirectional Group bodies, and for split
modes that fan a tile out to two AIV subblocks running in parallel.

Codegen / runtime
- Preamble adds four direction-specific helpers (_tpush_to_aiv,
  _tpush_to_aic, _tpop_from_aic, _tpop_from_aiv) plus generator-mode
  variants (..._g) that yield _WaitPop / _WaitPush at every pipe sync
  point, used inside scheduled Group bodies.
- _tpop_from_aic(pipe, split) reassembles the two queued halves via
  torch.cat instead of discarding subblock 1's data, so legacy
  single-AIV roundtrips return the full tile.
- Per-subblock pipes (to_aiv_sb0/sb1, to_aic_sb0/sb1) plus a
  _current_sb[0] mutable singleton model two AIV subblocks running the
  same kernel on different halves of a split tile; the scheduler
  injects the active id before each next() / send() and the AIV body
  reads it via tile.get_subblock_idx (mapped to _current_sb[0]).
- _run_scheduler is a cooperative round-robin over generator-style
  AIC/AIV bodies; it raises a clear deadlock error when no task can
  make progress.
- _detect_scheduled_groups triggers on bidirectional pipe usage,
  same-side feedback, or any pipe op carrying split>0; AIV members
  that touch split>0 are emitted as two scheduler tasks (sb=0/sb=1).
- _walk_pipe_calls / _function_uses_split / _scan_pipe_ops centralize
  pipe-op IR walking.
- visit_program emits AIC/AIV → Opaque/Other → Group → Orchestration
  with a "# Group: ..." comment; _generate_entry_point wraps
  multi-function programs in def run(...).
- _stable_hints + _var_refs prevent nanobind GC id recycling from
  remapping function params; _visit_expr_str dispatches nested
  ir.Call nodes through visit_call.

Tests
- UT: split semantics, multi-Group isolation, qwen3-style
  matmul+residual (UpDown / LeftRight, full-tile correctness),
  bidirectional pipes, error cases, legacy roundtrips updated to
  expect full reassembled tiles.
- ST: tests/st/codegen/test_torch_codegen_cross_core.py covers V2C
  and C2V × {NoSplit, UpDown, LeftRight} codegen-vs-golden, with
  split>0 paths driving the bidirectional reassembly.
- ST: tests/st/codegen/test_torch_codegen_scheduler.py covers the
  cooperative scheduler over bidirectional, same-side-feedback, and
  deadlock cases.

Closes hw-native-sys#1032

Signed-off-by: zhaozhaozz
@zhaozhaozz zhaozhaozz force-pushed the pr/torch-codegen-split-tpush-tpop branch from 3d1fd1d to ef65156 Compare April 21, 2026 11:11
@zhaozhaozz
Copy link
Copy Markdown
Contributor Author

Python yield 与协作调度器原理详解

本文档详细解释 torch_codegen 中协作调度器的实现原理,从 Python yield 基础语法开始,逐步深入到调度器的完整执行流程。


一、yield 基础语法

1.1 Generator 函数

在 Python 中,包含 yield 语句的函数会自动变成 generator 函数

def simple_gen():
    print("开始")
    yield 1
    print("继续")
    yield 2
    print("结束")

# 调用 generator 函数不会执行代码,而是返回一个 generator 对象
gen = simple_gen()
print(type(gen))  # <class 'generator'>

# 代码还没有执行!

1.2 next() 推进执行

gen = simple_gen()

value = next(gen)  # 执行到第一个 yield,返回 yield 后面的值
# 输出: 开始
print(value)       # 1

value = next(gen)  # 从上次暂停处继续,执行到下一个 yield
# 输出: 继续
print(value)       # 2

value = next(gen)  # 继续执行
# 输出: 结束
# 抛出 StopIteration 异常,因为函数结束了

关键点

  • yield暂停函数执行,保存当前状态(局部变量、执行位置)
  • next()恢复执行,从上次暂停的位置继续
  • 函数正常结束时抛出 StopIteration

1.3 send() 双向通信

yield 不仅可以产出值,还可以接收值:

def receiver():
    print("准备接收")
    x = yield "准备好了"   # yield 返回 "准备好了",同时等待接收值赋给 x
    print(f"收到: {x}")
    y = yield "已处理"
    print(f"再收到: {y}")
    return "完成"

gen = receiver()
# 第一步:必须用 next() 或 send(None) 启动
status = next(gen)           # 执行到第一个 yield
# 输出: 准备接收
print(status)                # "准备好了"

# 第二步:send(value) 恢复执行,value 成为 yield 表达式的值
status = gen.send(100)       # x = 100, 继续执行到下一个 yield
# 输出: 收到: 100
print(status)                # "已处理"

status = gen.send(200)       # y = 200, 继续执行到 return
# 输出: 再收到: 200
# StopIteration: "完成" (return 的值成为异常的 value)

执行流程图

next(gen) 或 gen.send(None)
    │
    ▼
┌─────────────────────────────────────┐
│  x = yield "准备好了"                │
│         │                           │
│         │ yield 返回 "准备好了"      │
│         │ 给调用者                   │
│         ▼                           │
│      [暂停] ◄───────────────────────┤ gen.send(100)
│         │                           │
│         │ 100 成为 yield 表达式的值  │
│         │ 赋给 x                     │
│         ▼                           │
│  print(f"收到: {x}")                │
│  y = yield "已处理"                 │
│         │                           │
│      [暂停] ◄───────────────────────┤ gen.send(200)
│         │                           │
│         ▼                           │
│  return "完成"                      │
│  → StopIteration("完成")            │
└─────────────────────────────────────┘

二、yield from 委托

yield from 用于在一个 generator 中委托另一个 generator:

def sub_gen():
    yield 1
    yield 2
    return "子任务完成"

def main_gen():
    result = yield from sub_gen()  # 委托给 sub_gen
    print(f"子任务返回: {result}")
    yield 3

gen = main_gen()
print(next(gen))  # 1 (来自 sub_gen)
print(next(gen))  # 2 (来自 sub_gen)
print(next(gen))  # 输出: 子任务返回: 子任务完成
                   # 3 (来自 main_gen)

yield from 的作用

  1. 自动转发 next()send() 调用
  2. 捕获子 generator 的 return 值
  3. 简化嵌套 generator 的编写

三、调度器实现原理

3.1 请求对象

class _WaitPop:
    __slots__ = ("pipe",)
    def __init__(self, pipe):
        self.pipe = pipe

class _WaitPush:
    __slots__ = ("pipe", "item")
    def __init__(self, pipe, item):
        self.pipe = pipe
        self.item = item

这是两个简单的数据类,用于表示 generator 的"请求":

  • _WaitPop: "我需要从管道弹出一个数据"
  • _WaitPush: "我需要向管道推送一个数据"

3.2 Generator 版本的 tpush/tpop

def _tpush_to_aiv_g(pipe, tensor, split_mode):
    if split_mode == 0:
        yield _WaitPush(pipe, tensor.clone())  # 暂停,请求推送
        return tensor
    # split > 0 的情况...

def _tpop_from_aic_g(pipe, split_mode):
    if split_mode == 0:
        return (yield _WaitPop(pipe))  # 暂停,请求弹出,返回弹出的值

关键语法解析

return (yield _WaitPop(pipe))

这行代码的执行顺序:

  1. 创建 _WaitPop(pipe) 对象
  2. yield 这个对象,暂停函数,把对象返回给调度器
  3. 调度器处理请求,通过 send(value) 恢复执行
  4. yield 表达式的值变成 value
  5. return 这个值

3.3 调度器核心循环

def _run_scheduler(tasks):
    # tasks = [("aiv_fn", generator1, 0), ("aic_fn", generator2, 0), ...]
    # 元素为 (name, generator, subblock_id) 三元组

    # 初始化:启动每个 generator,获取第一个请求
    states = []
    for name, gen, sb in tasks:
        _current_sb[0] = sb  # 设置当前 subblock id
        try:
            req = next(gen)  # 启动 generator,获取第一个 yield 的值
            states.append([name, gen, req, False, sb])  # [name, gen, 当前请求, 是否完成, subblock_id]
        except StopIteration:
            states.append([name, gen, None, True, sb])

    # 主循环
    while True:
        progressed = False
        all_done = True

        for st in states:
            if st[3]:  # 已完成
                continue
            all_done = False

            req = st[2]  # 当前请求
            advance_value = None
            advance = False

            if isinstance(req, _WaitPush):
                # Push 请求:直接执行
                req.pipe.append(req.item)
                advance = True

            elif isinstance(req, _WaitPop):
                # Pop 请求:只有管道非空才能执行
                if len(req.pipe) > 0:
                    advance_value = req.pipe.popleft()
                    advance = True

            else:
                # Defensive: unknown yield value treated as cooperative yield.
                advance = True

            if advance:
                progressed = True
                _current_sb[0] = st[4]  # 恢复时设置正确的 subblock id
                try:
                    # 恢复 generator 执行,传入值(对于 pop)
                    st[2] = st[1].send(advance_value)
                except StopIteration:
                    st[3] = True  # 标记完成

        if all_done:
            return  # 所有任务完成

        if not progressed:
            # 死锁:没有任何任务能推进
            blocked = [(s[0], type(s[2]).__name__) for s in states if not s[3]]
            raise RuntimeError("Cross-core simulation deadlock; tasks blocked: " + repr(blocked))

四、完整执行流程示例

4.1 双向通信场景

# IR 场景:
# AIV: push(A) → pop(result) → store
# AIC: pop(A) → matmul(A, B) → push(result)

def aiv_fn(a, residual, out):
    # 第一步:push A 到 to_aic 管道
    yield from _tpush_to_aic_g(_pipes['to_aic'], a, 0)

    # 第二步:从 to_aiv 管道 pop 结果
    result = yield from _tpop_from_aic_g(_pipes['to_aiv'], 0)

    # 第三步:处理并存储
    final = result + residual
    _tile_store(final, (0, 0), out)

def aic_fn(b):
    # 第一步:从 to_aic 管道 pop A
    a = yield from _tpop_from_aiv_g(_pipes['to_aic'], 0)

    # 第二步:计算
    result = torch.matmul(a, b)

    # 第三步:push 结果到 to_aiv 管道
    yield from _tpush_to_aiv_g(_pipes['to_aiv'], result, 0)

4.2 展开 yield from 后

def aiv_fn(a, residual, out):
    # yield from _tpush_to_aic_g(...)
    _req1 = _WaitPush(_pipes['to_aic'], a.clone())
    yield _req1                    # 暂停点 1

    # yield from _tpop_from_aic_g(...)
    _req2 = _WaitPop(_pipes['to_aiv'])
    _result = yield _req2          # 暂停点 2
    result = _result

    final = result + residual
    _tile_store(final, (0, 0), out)

def aic_fn(b):
    # yield from _tpop_from_aiv_g(...)
    _req1 = _WaitPop(_pipes['to_aic'])
    _a = yield _req1               # 暂停点 1
    a = _a

    result = torch.matmul(a, b)

    # yield from _tpush_to_aiv_g(...)
    _req2 = _WaitPush(_pipes['to_aiv'], result.clone())
    yield _req2                    # 暂停点 2

4.3 调度器执行时序图

时间线    │  aiv_fn generator        │  aic_fn generator        │  to_aic pipe │  to_aiv pipe
──────────┼──────────────────────────┼──────────────────────────┼──────────────┼─────────────
初始化    │  next() → _WaitPush(A)   │  next() → _WaitPop()     │     []       │     []
          │  状态: 等待 push         │  状态: 等待 pop          │              │
──────────┼──────────────────────────┼──────────────────────────┼──────────────┼─────────────
Round 1   │  Push 请求: 执行         │  Pop 请求: 管道空,阻塞   │              │
          │  to_aic.append(A)        │  (跳过)                  │    [A]       │     []
          │  send() → _WaitPop()     │                          │              │
          │  状态: 等待 pop          │  状态: 等待 pop          │              │
──────────┼──────────────────────────┼──────────────────────────┼──────────────┼─────────────
Round 2   │  Pop 请求: 管道空,阻塞   │  Pop 请求: 管道有 A!     │              │
          │  (跳过)                  │  a = to_aic.popleft()    │     []       │     []
          │                          │  send(a) → 计算         │              │
          │                          │  → _WaitPush(result)    │              │
          │  状态: 等待 pop          │  状态: 等待 push         │              │
──────────┼──────────────────────────┼──────────────────────────┼──────────────┼─────────────
Round 3   │  Pop 请求: 管道空,阻塞   │  Push 请求: 执行         │              │
          │  (跳过)                  │  to_aiv.append(result)   │     []       │  [result]
          │                          │  send() → StopIteration │              │
          │  状态: 等待 pop          │  状态: 完成 ✓            │              │
──────────┼──────────────────────────┼──────────────────────────┼──────────────┼─────────────
Round 4   │  Pop 请求: 管道有数据!   │  (已完成,跳过)           │              │
          │  result = to_aiv.pop()   │                          │     []       │     []
          │  send(result) → store   │                          │              │
          │  → StopIteration        │                          │              │
          │  状态: 完成 ✓            │                          │              │
──────────┼──────────────────────────┼──────────────────────────┼──────────────┼─────────────
结束      │  all_done = True, 返回   │                          │              │

五、Split 模式调度

5.1 Per-Subblock 管道

# 新增的 per-subblock 管道
_pipes_sb = {
    'to_aiv_sb0': deque(), 'to_aiv_sb1': deque(),  # AIV subblock 0/1 的管道
    'to_aic_sb0': deque(), 'to_aic_sb1': deque(),  # AIC subblock 0/1 的管道
}

# 当前 subblock id(调度器设置)
_current_sb = [0]

5.2 Split 模式下的执行

split=1 (UpDown) 场景:

AIC 端:
  tpush_to_aiv(full_tile, split=1)
    → split 成 [top_half, bottom_half]
    → push top_half 到 to_aiv_sb0
    → push bottom_half 到 to_aiv_sb1

AIV 端:
  调度器创建两个任务:
    ("aiv_fn#0", aiv_fn(args), subblock_id=0)  # 处理 top_half
    ("aiv_fn#1", aiv_fn(args), subblock_id=1)  # 处理 bottom_half

  每个 AIV 任务:
    tpop_from_aic(split=1)
      → 根据 _current_sb[0] 选择管道
      → sb0 从 to_aiv_sb0 pop
      → sb1 从 to_aiv_sb1 pop

5.3 调度器中的 subblock 处理

def _run_scheduler(tasks):
    states = []
    for name, gen, sb in tasks:
        _current_sb[0] = sb  # 设置当前 subblock
        try:
            req = next(gen)
            states.append([name, gen, req, False, sb])  # 保存 sb
        except StopIteration:
            states.append([name, gen, None, True, sb])

    while True:
        for st in states:
            if st[3]:  # 已完成
                continue
            # ...
            if advance:
                _current_sb[0] = st[4]  # 恢复时设置正确的 sb
                st[2] = st[1].send(advance_value)

这样,同一个 AIV 函数代码可以运行两次,每次处理不同的 subblock 数据,正确模拟了硬件的双核并行行为。


六、设计权衡

6.1 协作式 vs 抢占式

特性 协作式 (Cooperative) 抢占式 (Preemptive)
切换时机 任务主动让出 (yield) 操作系统强制切换
状态保存 只需保存显式状态 需要保存完整上下文
实现复杂度 简单 复杂
死锁风险 有(任务不让出)
适用场景 用户态调度、模拟器 操作系统

Python generator 天然适合协作式调度,因为:

  1. yield 就是"让出控制权"的语义
  2. 局部变量自动保存在 generator 的栈帧中
  3. 不需要手动保存/恢复上下文

6.2 与硬件行为的对应

硬件视角                          Python 模拟
─────────────────────────────────────────────────────
AIC/AIV 核心并行执行              多个 generator 对象
核心间通过 FIFO 通信              deque 作为管道
核心在 FIFO 空/满时阻塞           yield _WaitPop/_WaitPush
硬件调度器仲裁                    _run_scheduler 轮询

6.3 关键设计决策

  1. 无界管道:简化实现,_WaitPush 总是成功

    • 真实硬件 FIFO 有深度限制,但模拟时忽略
    • 如果需要模拟 FIFO 深度,可以添加 _WaitSpace 请求
  2. 轮询顺序:按 tasks 列表顺序

    • 简单实现
    • 可以优化为"谁先能推进谁先执行"
  3. 死锁检测:一轮无进展则报错

    • 帮助调试 IR 问题
    • 真实硬件也会有类似的死锁场景

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

[Feature] Enhance torch_codegen tpush/tpop support to enable qwen3 precision verification in pypto_lib

2 participants