Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/guide/release.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ Pre-release planning
- Determine target version (``X.Y.Z``).
* - ☐
- Confirm dependency versions and availability: warp-lang, mujoco,
mujoco-warp, newton-usd-schemas, newton-actuators.
mujoco-warp, newton-usd-schemas.
* - ☐
- Set timeline: code freeze → RC1 → testing window → GA.
* - ☐
Expand Down
3 changes: 2 additions & 1 deletion newton/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,10 @@
# ==================================================================================
# submodule APIs
# ==================================================================================
from . import geometry, ik, math, selection, sensors, solvers, usd, utils, viewer # noqa: E402
from . import actuators, geometry, ik, math, selection, sensors, solvers, usd, utils, viewer # noqa: E402

__all__ += [
"actuators",
"geometry",
"ik",
"math",
Expand Down
24 changes: 24 additions & 0 deletions newton/_src/actuators/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 The Newton Developers
# SPDX-License-Identifier: Apache-2.0

from .actuator import Actuator
from .clamping import Clamping, ClampingDCMotor, ClampingMaxForce, ClampingPositionBased
from .controllers import Controller, ControllerNetLSTM, ControllerNetMLP, ControllerPD, ControllerPID
from .delay import Delay
from .usd_parser import ParsedActuator, parse_actuator_prim

__all__ = [
"Actuator",
"Clamping",
"ClampingDCMotor",
"ClampingMaxForce",
"ClampingPositionBased",
"Controller",
"ControllerNetLSTM",
"ControllerNetMLP",
"ControllerPD",
"ControllerPID",
"Delay",
"ParsedActuator",
"parse_actuator_prim",
]
318 changes: 318 additions & 0 deletions newton/_src/actuators/actuator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,318 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 The Newton Developers
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations

from dataclasses import dataclass
from typing import Any

import numpy as np
import warp as wp

from .clamping.base import Clamping
from .controllers.base import Controller
from .delay import Delay


# TODO: replace with a Transmission class that does J multiplication before accumulating into the output array.
@wp.kernel
def _scatter_add_kernel(
forces: wp.array[float],
indices: wp.array[wp.uint32],
output: wp.array[float],
):
"""Scatter-add forces into output at specified indices."""
i = wp.tid()
idx = indices[i]
output[idx] = output[idx] + forces[i]


@wp.kernel
def _scatter_add_dual_kernel(
applied_forces: wp.array[float],
computed_forces: wp.array[float],
indices: wp.array[wp.uint32],
applied_output: wp.array[float],
computed_output: wp.array[float],
):
"""Scatter-add both applied and computed forces in one pass."""
i = wp.tid()
idx = indices[i]
applied_output[idx] = applied_output[idx] + applied_forces[i]
computed_output[idx] = computed_output[idx] + computed_forces[i]


class Actuator:
"""Composed actuator: controller + optional delay + clamping.

An actuator reads from simulation state/control arrays, computes
forces via a controller, applies clamping (force limits, saturation, etc.),
and writes the result to the output array.

Delay is handled separately from clamping because it is the only
pre-controller modifier (it replaces targets with delayed versions).
All clamping is post-controller (it bounds forces).

Usage::

actuator = Actuator(
indices=indices,
controller=ControllerPD(kp=kp, kd=kd),
delay=Delay(delay=5),
clamping=[ClampingMaxForce(max_force=max_f)],
)

# Simulation loop
actuator.step(sim_state, sim_control, state_a, state_b, dt=0.01)

Args:
indices: DOF indices for reading state/targets and writing forces. Shape (N,).
controller: Controller that computes raw forces.
delay: Optional Delay instance for input delay.
clamping: List of Clamping objects (post-controller force bounds).
state_pos_attr: Attribute on sim_state for positions.
state_vel_attr: Attribute on sim_state for velocities.
control_target_pos_attr: Attribute on sim_control for target positions.
control_target_vel_attr: Attribute on sim_control for target velocities.
control_input_attr: Attribute on sim_control for control input. None to skip.
control_output_attr: Attribute on sim_control for clamped output forces.
control_computed_output_attr: Attribute on sim_control for raw (pre-clamp)
forces. None to skip writing computed forces.
"""

@dataclass
class State:
"""Composed state for an :class:`Actuator`.

Holds the controller state and, if a delay is present, the delay
state. Clamping objects are stateless.
"""

controller_state: Controller.State | None = None
delay_state: Delay.State | None = None

def reset(self) -> None:
if self.controller_state is not None:
self.controller_state.reset()
if self.delay_state is not None:
self.delay_state.reset()

def __init__(
self,
indices: wp.array,
Copy link
Copy Markdown

@jeff-hough jeff-hough Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the original version of the library, we had both "input_indices" and "output_indices". I think we got rid of this because in the up-coming integration with Lab/Sim, we are:

  • limiting ourselves to the case where we have one actuator per DOF of the robot, and
  • we expect that the input array (list of joint measurements) will have the same structure as the output array (list of joint force outputs)

However, isn't collapsing those two lists into just "indices" a bit limiting on the library beyond Sim/Lab? That requires that every consumer of this library matches the structure of their input and output arrays, which may not always be the case.

For example, what if:

  • the input array contains passive and active joint measurements, and
  • the output array contains only actuated joint forces?

That is a perfectly reasonable way for a user to organize their data, but if the user only has indices then they are being blocked from doing things that way.

Thoughts @jvonmuralt @eric-heiden @preist-nvidia ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking to start it minimal for now and introduce output_indices as an optional parameter at a later iteration.

Copy link
Copy Markdown
Member Author

@jvonmuralt jvonmuralt Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also have a concern if we have some D6 joint in the robot, then joint_q and joint_qd indexing differs. should it be a users responsibility to construct every step joint_q_per_dof so that library reads correct values. Or should we then accept pos_indices, vel_indices, output_indices in actuator and then newton state and control can be passed as they are (it would also imply changing control.joint_target_pos to be consistent with state.joint_q)?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. So I guess we could say we have three options:

  1. Keep just indices: simplest (and what we already have) but most restrictive on the user data.
  2. Put back input_indices and output_indices: I think this will work for all 1DOF acuators, including those with tranmission, but is slightly more complex.
  3. Every array gets its own index array: most flexible and seems needed for D6 joints, adds the most indexing complexity.

Copy link
Copy Markdown
Member

@eric-heiden eric-heiden Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused - D6 joints have the same number of q and qd coordinates, maybe @jvonmuralt means free and ball joints which have differing coordinate counts?
I also think we need some separation of q and qd indices because for floating-base articulations (or articulations which have ball joints somewhere) the q indices will differ from qd/"output" indices.
This isn't really a question of input vs. output indices, right? It is more about joint_q and joint_qd indices because an actuator controller could read inputs from both joint_q and joint_qd. state_pos_attr and state_vel_attr are the raw joint_q and joint_qd arrays in Newton so there are going to be indexing problems unless we provide q/qd indexing information.

So maybe we actually need 3 index arrays?

Axis What it indexes Example where collapsing fails
A. joint_q read index positions for controller feedback free/ball joint shifts coord vs. DOF numbering
B. joint_qd read index velocities for controller feedback same joints — q and qd diverge
C. joint_f write index where clamped force gets scattered passive vs. actuated DOFs, tendon/coupled transmissions

If an actuator actuates exactly the joints that it reads from then joint_qd and joint_f indexing are the same.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I meant free and ball joints:) and exactly the problem which you describe.
It looks like we would need 3 index arrays indeed.
I also think we would need to change control.joint_target_pos to be the same shape as joint_q, so that index array A can be used for reading both position state and target.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The analysis table is great! It's a very good summary of cases where having just one index array will fail.

@jvonmuralt and @preist-nvidia , is the preference still to keep maximally simple on the first release and then add the other index arrays later?

Nitpick: I don't think we use the word "joint" except for on the output, where we are in fact always output joint forces.

On inputs, some actuators will depend on positions/velocities which are "actuator space" values, no always "joint-space" (i.e. length or speed of a retracting tendon).

controller: Controller,
delay: Delay | None = None,
clamping: list[Clamping] | None = None,
state_pos_attr: str = "joint_q",
state_vel_attr: str = "joint_qd",
control_target_pos_attr: str = "joint_target_pos",
control_target_vel_attr: str = "joint_target_vel",
control_input_attr: str | None = "joint_act",
control_output_attr: str = "joint_f",
control_computed_output_attr: str | None = None,
):
self.indices = indices
self.controller = controller
self.delay = delay
self.clamping = clamping or []
self.num_actuators = len(indices)

self.state_pos_attr = state_pos_attr
self.state_vel_attr = state_vel_attr
self.control_target_pos_attr = control_target_pos_attr
self.control_target_vel_attr = control_target_vel_attr
self.control_input_attr = control_input_attr
self.control_output_attr = control_output_attr
self.control_computed_output_attr = control_computed_output_attr

device = indices.device
self._sequential_indices = wp.array(np.arange(self.num_actuators, dtype=np.uint32), device=device)
self._computed_forces = wp.zeros(self.num_actuators, dtype=wp.float32, device=device)
self._applied_forces = wp.zeros(self.num_actuators, dtype=wp.float32, device=device)

controller.set_device(device)
controller.set_indices(indices, self._sequential_indices)
for clamp in self.clamping:
clamp.set_device(device)
if self.delay is not None:
self.delay.set_indices(self.num_actuators, self._sequential_indices)

def get_param(self, name: str) -> wp.array | None:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this method? It seems like a potential source of problems if the controller and clamping object share the same parameter name. The user should know from where to get the parameter, right?

Comment thread
jvonmuralt marked this conversation as resolved.
Outdated
"""Search for a named warp array parameter across controller and clamping.

Searches controller first, then each clamping object in order.

Args:
name: Parameter name (e.g. ``"kp"``, ``"max_force"``).

Returns:
The first matching :class:`warp.array`, or ``None`` if not found.
"""
val = getattr(self.controller, name, None)
if val is not None and isinstance(val, wp.array):
return val
for clamp in self.clamping:
val = getattr(clamp, name, None)
if val is not None and isinstance(val, wp.array):
return val
return None

@property
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actuator.SHARED_PARAMS is an instance property, but the component-level Controller.SHARED_PARAMS / Clamping.SHARED_PARAMS are ClassVar[set[str]]. The builder relies on reading it as a classvar (getattr(controller_class, "SHARED_PARAMS", set())). Reading Actuator.SHARED_PARAMS on the class returns the property object, not a set — so this method would silently break the same grouping logic if someone ever reached for it as a group key.

grep shows nothing else in the repo actually calls Actuator.SHARED_PARAMS. Is this still needed, or can it be dropped? (Noting @eric-heiden raised a related question in the existing thread — this is asking the same thing from the "consistency with the rest of the system" angle.)

def SHARED_PARAMS(self) -> set[str]:
params: set[str] = set()
params |= self.controller.SHARED_PARAMS
if self.delay is not None:
params |= self.delay.SHARED_PARAMS
for c in self.clamping:
params |= c.SHARED_PARAMS
return params

def is_stateful(self) -> bool:
"""Return True if controller or delay maintains internal state."""
return self.controller.is_stateful() or self.delay is not None

def is_graphable(self) -> bool:
"""Return True if all components can be captured in a CUDA graph."""
return self.controller.is_graphable() and all(c.is_graphable() for c in self.clamping)

def state(self) -> Actuator.State | None:
"""Return a new composed state, or None if fully stateless."""
if not self.is_stateful():
return None
device = self.indices.device
return Actuator.State(
controller_state=(
self.controller.state(self.num_actuators, device) if self.controller.is_stateful() else None
),
delay_state=(self.delay.state(self.num_actuators, device) if self.delay is not None else None),
)

def step(
self,
sim_state: Any,
sim_control: Any,
current_act_state: Actuator.State | None = None,
next_act_state: Actuator.State | None = None,
dt: float | None = None,
) -> None:
"""Execute one control step.

1. **Delay** — read delayed targets from buffer.
2. **Controller** — compute raw forces into ``_computed_forces``.
3. **Clamping** — bound forces from computed → ``_applied_forces``.
4. **Scatter** — add applied (and optionally computed) forces to output.
5. **State updates** — update delay buffer and controller state.

If the delay buffer is still filling, steps 2-3 are skipped
(no forces produced) but the buffer keeps accumulating.

Args:
sim_state: Simulation state with position/velocity arrays.
sim_control: Control structure with target/output arrays.
current_act_state: Current composed state (None if stateless).
next_act_state: Next composed state (None if stateless).
dt: Timestep in seconds.
"""
has_states = current_act_state is not None and next_act_state is not None
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

positions = getattr(sim_state, self.state_pos_attr)
velocities = getattr(sim_state, self.state_vel_attr)

orig_target_pos = getattr(sim_control, self.control_target_pos_attr)
orig_target_vel = getattr(sim_control, self.control_target_vel_attr)
orig_act_input = None
if self.control_input_attr is not None:
orig_act_input = getattr(sim_control, self.control_input_attr, None)

target_pos = orig_target_pos
target_vel = orig_target_vel
act_input = orig_act_input
target_indices = self.indices

# --- 1. Delay: read delayed targets ---
skip_compute = False
if self.delay is not None:
delay_state = current_act_state.delay_state if current_act_state else None

if self.delay.is_ready(delay_state):
target_pos, target_vel, act_input, target_indices = self.delay.get_delayed_targets(
act_input, delay_state
)
else:
skip_compute = True

if not skip_compute:
# --- 2. Controller: compute raw forces ---
ctrl_state = current_act_state.controller_state if current_act_state else None
self.controller.compute(
positions,
velocities,
target_pos,
target_vel,
act_input,
self.indices,
target_indices,
self._computed_forces,
self._sequential_indices,
self.num_actuators,
ctrl_state,
dt,
)

# --- 3. Clamping: computed → applied (fused copy+clamp) ---
if self.clamping:
src = self._computed_forces
for clamp in self.clamping:
clamp.modify_forces(
src,
self._applied_forces,
positions,
velocities,
self.indices,
self.num_actuators,
)
src = self._applied_forces
else:
wp.copy(self._applied_forces, self._computed_forces)

# --- 4. Scatter-add applied (+ optionally computed) to output ---
applied_output = getattr(sim_control, self.control_output_attr)
if self.control_computed_output_attr is not None:
computed_output = getattr(sim_control, self.control_computed_output_attr)
wp.launch(
kernel=_scatter_add_dual_kernel,
dim=self.num_actuators,
inputs=[
self._applied_forces,
self._computed_forces,
self.indices,
],
outputs=[applied_output, computed_output],
)
else:
wp.launch(
kernel=_scatter_add_kernel,
dim=self.num_actuators,
inputs=[self._applied_forces, self.indices],
outputs=[applied_output],
)

# --- 5. State updates ---
if has_states:
if self.controller.is_stateful() and not skip_compute:
self.controller.update_state(
current_act_state.controller_state,
next_act_state.controller_state,
)

if self.delay is not None:
self.delay.update_state(
orig_target_pos,
orig_target_vel,
orig_act_input,
self.indices,
self.num_actuators,
current_act_state.delay_state,
next_act_state.delay_state,
dt,
)
14 changes: 14 additions & 0 deletions newton/_src/actuators/clamping/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 The Newton Developers
# SPDX-License-Identifier: Apache-2.0

from .base import Clamping
from .clamping_dc_motor import ClampingDCMotor
from .clamping_max_force import ClampingMaxForce
from .clamping_position_based import ClampingPositionBased

__all__ = [
"Clamping",
"ClampingDCMotor",
"ClampingMaxForce",
"ClampingPositionBased",
]
Loading
Loading