-
Notifications
You must be signed in to change notification settings - Fork 455
Move modular design of newton-actuators to Newton repo #2449
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 5 commits
4f8f617
2d8eefd
e23b3ed
9738c81
ca82871
79cc869
36437b5
08541f0
48810f5
74112db
2b56eef
997900c
387ef13
875284f
cce0824
d2a885d
a067fe4
206f624
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| # SPDX-FileCopyrightText: Copyright (c) 2025 The Newton Developers | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| from .actuator import Actuator | ||
| from .clamping import Clamping, ClampingDCMotor, ClampingMaxForce, ClampingPositionBased | ||
| from .controllers import Controller, ControllerNetLSTM, ControllerNetMLP, ControllerPD, ControllerPID | ||
| from .delay import Delay | ||
| from .usd_parser import ParsedActuator, parse_actuator_prim | ||
|
|
||
| __all__ = [ | ||
| "Actuator", | ||
| "Clamping", | ||
| "ClampingDCMotor", | ||
| "ClampingMaxForce", | ||
| "ClampingPositionBased", | ||
| "Controller", | ||
| "ControllerNetLSTM", | ||
| "ControllerNetMLP", | ||
| "ControllerPD", | ||
| "ControllerPID", | ||
| "Delay", | ||
| "ParsedActuator", | ||
| "parse_actuator_prim", | ||
| ] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,311 @@ | ||
| # SPDX-FileCopyrightText: Copyright (c) 2025 The Newton Developers | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| from dataclasses import dataclass | ||
| from typing import Any | ||
|
|
||
| import numpy as np | ||
| import warp as wp | ||
|
|
||
| from .clamping.base import Clamping | ||
| from .controllers.base import Controller | ||
| from .delay import Delay | ||
|
|
||
|
|
||
| # TODO: replace with a Transmission class that does J multiplication before accumulating into the output array. | ||
| @wp.kernel | ||
| def _scatter_add_kernel( | ||
| forces: wp.array[float], | ||
| indices: wp.array[wp.uint32], | ||
| output: wp.array[float], | ||
| ): | ||
| """Scatter-add forces into output at specified indices.""" | ||
| i = wp.tid() | ||
| idx = indices[i] | ||
| output[idx] = output[idx] + forces[i] | ||
|
|
||
|
|
||
| @wp.kernel | ||
| def _scatter_add_dual_kernel( | ||
| applied_forces: wp.array[float], | ||
| computed_forces: wp.array[float], | ||
| indices: wp.array[wp.uint32], | ||
| applied_output: wp.array[float], | ||
| computed_output: wp.array[float], | ||
| ): | ||
| """Scatter-add both applied and computed forces in one pass.""" | ||
| i = wp.tid() | ||
| idx = indices[i] | ||
| applied_output[idx] = applied_output[idx] + applied_forces[i] | ||
| computed_output[idx] = computed_output[idx] + computed_forces[i] | ||
|
|
||
|
|
||
| class Actuator: | ||
| """Composed actuator: controller + optional delay + clamping. | ||
|
|
||
| An actuator reads from simulation state/control arrays, computes | ||
| forces via a controller, applies clamping (force limits, saturation, etc.), | ||
| and writes the result to the output array. | ||
|
|
||
| Delay is handled separately from clamping because it is the only | ||
| pre-controller modifier (it replaces targets with delayed versions). | ||
| All clamping is post-controller (it bounds forces). | ||
|
|
||
| Usage:: | ||
|
|
||
| actuator = Actuator( | ||
| indices=indices, | ||
| controller=ControllerPD(kp=kp, kd=kd), | ||
| delay=Delay(delay=5), | ||
| clamping=[ClampingMaxForce(max_force=max_f)], | ||
| ) | ||
|
|
||
| # Simulation loop | ||
| actuator.step(sim_state, sim_control, state_a, state_b, dt=0.01) | ||
|
|
||
| Args: | ||
| indices: DOF indices for reading state/targets and writing forces. Shape (N,). | ||
| controller: Controller that computes raw forces. | ||
| delay: Optional Delay instance for input delay. | ||
| clamping: List of Clamping objects (post-controller force bounds). | ||
| state_pos_attr: Attribute on sim_state for positions. | ||
| state_vel_attr: Attribute on sim_state for velocities. | ||
| control_target_pos_attr: Attribute on sim_control for target positions. | ||
| control_target_vel_attr: Attribute on sim_control for target velocities. | ||
| control_input_attr: Attribute on sim_control for control input. None to skip. | ||
| control_output_attr: Attribute on sim_control for clamped output forces. | ||
| control_computed_output_attr: Attribute on sim_control for raw (pre-clamp) | ||
| forces. None to skip writing computed forces. | ||
| """ | ||
|
|
||
| @dataclass | ||
| class State: | ||
| """Composed state for an :class:`Actuator`. | ||
|
|
||
| Holds the controller state and, if a delay is present, the delay | ||
| state. Clamping objects are stateless. | ||
| """ | ||
|
|
||
| controller_state: Controller.State | None = None | ||
| delay_state: Delay.State | None = None | ||
|
|
||
| def reset(self) -> None: | ||
| if self.controller_state is not None: | ||
| self.controller_state.reset() | ||
| if self.delay_state is not None: | ||
| self.delay_state.reset() | ||
|
|
||
| def __init__( | ||
| self, | ||
| indices: wp.array, | ||
| controller: Controller, | ||
| delay: Delay | None = None, | ||
| clamping: list[Clamping] | None = None, | ||
| state_pos_attr: str = "joint_q", | ||
| state_vel_attr: str = "joint_qd", | ||
| control_target_pos_attr: str = "joint_target_pos", | ||
| control_target_vel_attr: str = "joint_target_vel", | ||
| control_input_attr: str | None = "joint_act", | ||
| control_output_attr: str = "joint_f", | ||
| control_computed_output_attr: str | None = None, | ||
| ): | ||
| self.indices = indices | ||
| self.controller = controller | ||
| self.delay = delay | ||
| self.clamping = clamping or [] | ||
| self.num_actuators = len(indices) | ||
|
|
||
| self.state_pos_attr = state_pos_attr | ||
| self.state_vel_attr = state_vel_attr | ||
| self.control_target_pos_attr = control_target_pos_attr | ||
| self.control_target_vel_attr = control_target_vel_attr | ||
| self.control_input_attr = control_input_attr | ||
| self.control_output_attr = control_output_attr | ||
| self.control_computed_output_attr = control_computed_output_attr | ||
|
|
||
| device = indices.device | ||
| self._sequential_indices = wp.array(np.arange(self.num_actuators, dtype=np.uint32), device=device) | ||
| self._computed_forces = wp.zeros(self.num_actuators, dtype=wp.float32, device=device) | ||
| self._applied_forces = wp.zeros(self.num_actuators, dtype=wp.float32, device=device) | ||
|
|
||
| controller.finalize(device, self.num_actuators) | ||
|
|
||
| def get_param(self, name: str) -> wp.array | None: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need this method? It seems like a potential source of problems if the controller and clamping object share the same parameter name. The user should know from where to get the parameter, right?
jvonmuralt marked this conversation as resolved.
Outdated
|
||
| """Search for a named warp array parameter across controller and clamping. | ||
|
|
||
| Searches controller first, then each clamping object in order. | ||
|
|
||
| Args: | ||
| name: Parameter name (e.g. ``"kp"``, ``"max_force"``). | ||
|
|
||
| Returns: | ||
| The first matching :class:`warp.array`, or ``None`` if not found. | ||
| """ | ||
| val = getattr(self.controller, name, None) | ||
| if val is not None and isinstance(val, wp.array): | ||
| return val | ||
| for clamp in self.clamping: | ||
| val = getattr(clamp, name, None) | ||
| if val is not None and isinstance(val, wp.array): | ||
| return val | ||
| return None | ||
|
|
||
| @property | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
grep shows nothing else in the repo actually calls |
||
| def SHARED_PARAMS(self) -> set[str]: | ||
| params: set[str] = set() | ||
| params |= self.controller.SHARED_PARAMS | ||
| if self.delay is not None: | ||
| params |= self.delay.SHARED_PARAMS | ||
| for c in self.clamping: | ||
| params |= c.SHARED_PARAMS | ||
| return params | ||
|
|
||
| def is_stateful(self) -> bool: | ||
| """Return True if controller or delay maintains internal state.""" | ||
| return self.controller.is_stateful() or self.delay is not None | ||
|
|
||
| def is_graphable(self) -> bool: | ||
| """Return True if all components can be captured in a CUDA graph.""" | ||
| return self.controller.is_graphable() and all(c.is_graphable() for c in self.clamping) | ||
|
|
||
| def state(self) -> Actuator.State | None: | ||
| """Return a new composed state, or None if fully stateless.""" | ||
| if not self.is_stateful(): | ||
| return None | ||
| device = self.indices.device | ||
| return Actuator.State( | ||
| controller_state=( | ||
| self.controller.state(self.num_actuators, device) if self.controller.is_stateful() else None | ||
| ), | ||
| delay_state=(self.delay.state(self.num_actuators, device) if self.delay is not None else None), | ||
| ) | ||
|
|
||
| def step( | ||
| self, | ||
| sim_state: Any, | ||
| sim_control: Any, | ||
| current_act_state: Actuator.State | None = None, | ||
| next_act_state: Actuator.State | None = None, | ||
| dt: float | None = None, | ||
| ) -> None: | ||
| """Execute one control step. | ||
|
|
||
| 1. **Delay** — read delayed targets from buffer. | ||
| 2. **Controller** — compute raw forces into ``_computed_forces``. | ||
| 3. **Clamping** — bound forces from computed → ``_applied_forces``. | ||
| 4. **Scatter** — add applied (and optionally computed) forces to output. | ||
| 5. **State updates** — update delay buffer and controller state. | ||
|
|
||
| If the delay buffer is still filling, steps 2-3 are skipped | ||
| (no forces produced) but the buffer keeps accumulating. | ||
|
|
||
| Args: | ||
| sim_state: Simulation state with position/velocity arrays. | ||
| sim_control: Control structure with target/output arrays. | ||
| current_act_state: Current composed state (None if stateless). | ||
| next_act_state: Next composed state (None if stateless). | ||
| dt: Timestep in seconds. | ||
| """ | ||
| has_states = current_act_state is not None and next_act_state is not None | ||
|
coderabbitai[bot] marked this conversation as resolved.
Outdated
|
||
|
|
||
| positions = getattr(sim_state, self.state_pos_attr) | ||
| velocities = getattr(sim_state, self.state_vel_attr) | ||
|
|
||
| orig_target_pos = getattr(sim_control, self.control_target_pos_attr) | ||
| orig_target_vel = getattr(sim_control, self.control_target_vel_attr) | ||
| orig_act_input = None | ||
| if self.control_input_attr is not None: | ||
| orig_act_input = getattr(sim_control, self.control_input_attr, None) | ||
|
|
||
| target_pos = orig_target_pos | ||
| target_vel = orig_target_vel | ||
| act_input = orig_act_input | ||
| target_indices = self.indices | ||
|
|
||
| # --- 1. Delay: read delayed targets --- | ||
| skip_compute = False | ||
| if self.delay is not None: | ||
| delay_state = current_act_state.delay_state if current_act_state else None | ||
|
|
||
| if self.delay.is_ready(delay_state): | ||
| target_pos, target_vel, act_input = self.delay.get_delayed_targets(act_input, delay_state) | ||
| target_indices = self._sequential_indices | ||
| else: | ||
| skip_compute = True | ||
|
|
||
| if not skip_compute: | ||
| # --- 2. Controller: compute raw forces --- | ||
| ctrl_state = current_act_state.controller_state if current_act_state else None | ||
| self.controller.compute( | ||
| positions, | ||
| velocities, | ||
| target_pos, | ||
| target_vel, | ||
| act_input, | ||
| self.indices, | ||
| target_indices, | ||
| self._computed_forces, | ||
| self.num_actuators, | ||
| ctrl_state, | ||
| dt, | ||
| ) | ||
|
|
||
| # --- 3. Clamping: computed → applied (fused copy+clamp) --- | ||
| if self.clamping: | ||
| src = self._computed_forces | ||
| for clamp in self.clamping: | ||
| clamp.modify_forces( | ||
| src, | ||
| self._applied_forces, | ||
| positions, | ||
| velocities, | ||
| self.indices, | ||
| self.num_actuators, | ||
| ) | ||
| src = self._applied_forces | ||
| else: | ||
| wp.copy(self._applied_forces, self._computed_forces) | ||
|
|
||
| # --- 4. Scatter-add applied (+ optionally computed) to output --- | ||
| applied_output = getattr(sim_control, self.control_output_attr) | ||
| if self.control_computed_output_attr is not None: | ||
| computed_output = getattr(sim_control, self.control_computed_output_attr) | ||
| wp.launch( | ||
| kernel=_scatter_add_dual_kernel, | ||
| dim=self.num_actuators, | ||
| inputs=[ | ||
| self._applied_forces, | ||
| self._computed_forces, | ||
| self.indices, | ||
| ], | ||
| outputs=[applied_output, computed_output], | ||
| ) | ||
| else: | ||
| wp.launch( | ||
| kernel=_scatter_add_kernel, | ||
| dim=self.num_actuators, | ||
| inputs=[self._applied_forces, self.indices], | ||
| outputs=[applied_output], | ||
| ) | ||
|
|
||
| # --- 5. State updates --- | ||
| if has_states: | ||
| if self.controller.is_stateful() and not skip_compute: | ||
| self.controller.update_state( | ||
| current_act_state.controller_state, | ||
| next_act_state.controller_state, | ||
| ) | ||
|
|
||
| if self.delay is not None: | ||
| self.delay.update_state( | ||
| orig_target_pos, | ||
| orig_target_vel, | ||
| orig_act_input, | ||
| self.indices, | ||
| self.num_actuators, | ||
| current_act_state.delay_state, | ||
| next_act_state.delay_state, | ||
| dt, | ||
| ) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| # SPDX-FileCopyrightText: Copyright (c) 2025 The Newton Developers | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| from .base import Clamping | ||
| from .clamping_dc_motor import ClampingDCMotor | ||
| from .clamping_max_force import ClampingMaxForce | ||
| from .clamping_position_based import ClampingPositionBased | ||
|
|
||
| __all__ = [ | ||
| "Clamping", | ||
| "ClampingDCMotor", | ||
| "ClampingMaxForce", | ||
| "ClampingPositionBased", | ||
| ] |
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the original version of the library, we had both "input_indices" and "output_indices". I think we got rid of this because in the up-coming integration with Lab/Sim, we are:
However, isn't collapsing those two lists into just "indices" a bit limiting on the library beyond Sim/Lab? That requires that every consumer of this library matches the structure of their input and output arrays, which may not always be the case.
For example, what if:
That is a perfectly reasonable way for a user to organize their data, but if the user only has
indicesthen they are being blocked from doing things that way.Thoughts @jvonmuralt @eric-heiden @preist-nvidia ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking to start it minimal for now and introduce output_indices as an optional parameter at a later iteration.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also have a concern if we have some D6 joint in the robot, then joint_q and joint_qd indexing differs. should it be a users responsibility to construct every step joint_q_per_dof so that library reads correct values. Or should we then accept pos_indices, vel_indices, output_indices in actuator and then newton state and control can be passed as they are (it would also imply changing control.joint_target_pos to be consistent with state.joint_q)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. So I guess we could say we have three options:
indices: simplest (and what we already have) but most restrictive on the user data.input_indicesandoutput_indices: I think this will work for all 1DOF acuators, including those with tranmission, but is slightly more complex.Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused - D6 joints have the same number of q and qd coordinates, maybe @jvonmuralt means free and ball joints which have differing coordinate counts?
I also think we need some separation of q and qd indices because for floating-base articulations (or articulations which have ball joints somewhere) the q indices will differ from qd/"output" indices.
This isn't really a question of input vs. output indices, right? It is more about
joint_qandjoint_qdindices because an actuator controller could read inputs from bothjoint_qandjoint_qd.state_pos_attrandstate_vel_attrare the rawjoint_qandjoint_qdarrays in Newton so there are going to be indexing problems unless we provide q/qd indexing information.So maybe we actually need 3 index arrays?
joint_qread indexjoint_qdread indexjoint_fwrite indexIf an actuator actuates exactly the joints that it reads from then joint_qd and joint_f indexing are the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I meant free and ball joints:) and exactly the problem which you describe.
It looks like we would need 3 index arrays indeed.
I also think we would need to change control.joint_target_pos to be the same shape as joint_q, so that index array A can be used for reading both position state and target.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The analysis table is great! It's a very good summary of cases where having just one index array will fail.
@jvonmuralt and @preist-nvidia , is the preference still to keep maximally simple on the first release and then add the other index arrays later?
Nitpick: I don't think we use the word "joint" except for on the output, where we are in fact always output joint forces.
On inputs, some actuators will depend on positions/velocities which are "actuator space" values, no always "joint-space" (i.e. length or speed of a retracting tendon).