Skip to content
52 changes: 37 additions & 15 deletions agimus_controller/agimus_controller/factory/robot_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ class RobotModelParameters:
q0: npt.NDArray[np.float64] = field(
default_factory=lambda: np.array([], dtype=np.float64)
) # Initial full configuration of the robot
free_flyer: bool = False # True if the robot has a free flyer
free_flyer: bool = False # True if the robot has a free flyer (SE3, 6 DoF)
Comment thread
MaximilienNaveau marked this conversation as resolved.
planar_base: bool = False # True if the robot has a planar base (x, y, yaw — 3 DoF)
moving_joint_names: list[str] = field(default_factory=list)
robot_urdf: Union[Path, str] = (
"" # Path to the URDF file or string containing URDF as an XML
Expand Down Expand Up @@ -46,19 +47,25 @@ class RobotModelParameters:
) # Red color for the collision model

def __post_init__(self):
# Handle armature:
if self.free_flyer and self.planar_base:
raise ValueError("free_flyer and planar_base are mutually exclusive.")

# Number of base DoFs added to the model before arm joints:
# free_flyer → 6 (SE3 twist)
# planar_base → 3 (vx, vy, omega)
# none → 0
base_nv = 6 if self.free_flyer else (3 if self.planar_base else 0)
expected_nv = base_nv + len(self.moving_joint_names)

# Handle armature: default to zeros sized to the full nv.
if self.armature.size == 0:
# Use a default armature filled with 0s,
# based on the size of moving_joint_names
self.armature = np.zeros(len(self.moving_joint_names), dtype=np.float64)
self.armature = np.zeros(expected_nv, dtype=np.float64)

# Ensure armature has the same shape as moving_joint_names
if (
len(self.armature) != len(self.moving_joint_names) and not self.free_flyer
): # ! TODO: Do the same for free flyer
if len(self.armature) != expected_nv:
raise ValueError(
f"Armature must have the same shape as moving_joint_names. "
f"Got {self.armature.shape} and {len(self.moving_joint_names)}."
f"Armature size mismatch. Expected {expected_nv} "
f"({base_nv} base DoF + {len(self.moving_joint_names)} arm joints), "
f"got {len(self.armature)}."
)

# Ensure URDF and SRDF are valid
Expand Down Expand Up @@ -148,6 +155,7 @@ def _load_urdf(
self,
urdf: Path | str,
use_free_flyer: bool,
use_planar_base: bool,
geometry_types: list[pin.GeometryType],
) -> tuple[pin.Model, pin.CollisionGeometry]:
"""Build pinocchio's models from URDF."""
Expand All @@ -162,6 +170,8 @@ def _load_urdf(
print("Loaded URDF from XML.")
if use_free_flyer:
robot_model = pin.buildModelFromXML(urdf, pin.JointModelFreeFlyer())
elif use_planar_base:
robot_model = pin.buildModelFromXML(urdf, pin.JointModelPlanar())
else:
robot_model = pin.buildModelFromXML(urdf)
package_dirs = (
Expand Down Expand Up @@ -192,14 +202,18 @@ def _load_full_pinocchio_models(self) -> None:
]
self._full_robot_model, self._collision_model, self._visual_model = (
self._load_urdf(
self._params.robot_urdf, self._params.free_flyer, geometry_types
self._params.robot_urdf,
self._params.free_flyer,
self._params.planar_base,
geometry_types,
)
)
if self._params.env_urdf is not None:
env_model, env_collision_model, env_visual_model = self._load_urdf(
self._params.env_urdf,
use_free_flyer=False,
geometry_types=geometry_types,
False,
False,
geometry_types,
)

# make robot models append environment models
Expand Down Expand Up @@ -237,12 +251,20 @@ def _lock_joints(self) -> None:
jn + " not in the model. Model is:" + str(self._full_robot_model)
)
# Find the joints to lock.
# When a floating base (free_flyer) or planar base is used, the root
# joint (index 1, typically named "root_joint") must NOT be locked so
# that the base DoFs survive the reduction.
has_base_joint = self._params.free_flyer or self._params.planar_base
root_joint_id = 1 if has_base_joint else None
joints_to_lock = []
for jn in self._full_robot_model.names:
if jn == "universe":
continue
jid = self._full_robot_model.getJointId(jn)
if has_base_joint and jid == root_joint_id:
continue
if jn not in self._params.moving_joint_names:
joints_to_lock.append(self._full_robot_model.getJointId(jn))
joints_to_lock.append(jid)
if len(self._q0) == 0:
self._q0 = pin.neutral(self._full_robot_model)
(
Expand Down
12 changes: 11 additions & 1 deletion agimus_controller/agimus_controller/ocp_base_croco.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,17 @@ def __init__(
)
else:
self._state = crocoddyl.StateMultibody(self._robot_models.robot_model)
self._actuation = crocoddyl.ActuationModelFull(self._state)

# free_flyer: base is unactuated (6 DoF), only arm joints are controlled
# → ActuationModelFloatingBase, nu = nv - 6
# planar_base: base velocity commands are part of the control vector
# → ActuationModelFull, nu = nv (3 base + n_arm)
# none: fixed base, all joints actuated
# → ActuationModelFull, nu = nv
if robot_models.params.free_flyer:
self._actuation = crocoddyl.ActuationModelFloatingBase(self._state)
else:
self._actuation = crocoddyl.ActuationModelFull(self._state)

# Setting the OCP parameters
self._ocp_params = ocp_params
Expand Down
9 changes: 5 additions & 4 deletions agimus_controller/agimus_controller/trajectory.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,11 @@ def compute_horizon_indexes(self):

@property
def horizon(self):
assert self.horizon_indexes[-1] < len(self._buffer), (
"Size of buffer must be at least horizon_indexes[-1]."
)
return [self._buffer[i] for i in self.horizon_indexes]
last = self._buffer[-1]
return [
self._buffer[i] if i < len(self._buffer) else last
for i in self.horizon_indexes
]

def __len__(self):
return len(self._buffer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,12 @@ def setup(
assert ocp_params.dt == self._timesteps[0]
assert all(dt >= self._dt for dt in self._timesteps)

# Build the integrator
# Build the integrator — actuation model must match the OCP
state = crocoddyl.StateMultibody(robot_models.robot_model)
actuation = crocoddyl.ActuationModelFull(state)
if robot_models.params.free_flyer:
actuation = crocoddyl.ActuationModelFloatingBase(state)
else:
actuation = crocoddyl.ActuationModelFull(state)
cost_model = crocoddyl.CostModelSum(state)
differential = crocoddyl.DifferentialActionModelFreeFwdDynamics(
state,
Expand Down
137 changes: 120 additions & 17 deletions agimus_controller_ros/agimus_controller_ros/agimus_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@

import rclpy.time
from std_msgs.msg import Int32, String
from geometry_msgs.msg import Pose
from geometry_msgs.msg import Pose, Twist
from agimus_msgs.msg import MpcInput, MpcDebug
from rcl_interfaces.srv import GetParameters
import builtin_interfaces

from tf2_ros import TransformException
Expand Down Expand Up @@ -43,8 +44,8 @@
mpc_msg_to_weighted_traj_point,
mpc_debug_data_to_msg,
transform_msg_to_se3,
get_param_from_node,
ros_pose_to_se3,
sensor_ff_to_planar_state,
)


Expand All @@ -59,10 +60,13 @@ def init_ros_robot_creation(self) -> None:
self.environment_msg = None
self.robot_srdf_description_msg = None

# Get moving joint names from LFC
self.moving_joint_names = get_param_from_node(
self, "linear_feedback_controller", "moving_joint_names"
).string_array_value
# moving_joint_names is fetched asynchronously in initialization_callback
# to avoid blocking __init__ when the LFC service is not yet available.
self.moving_joint_names = None
self._lfc_param_future = None
self._lfc_param_client = self.create_client(
GetParameters, "/linear_feedback_controller/get_parameters"
)

self.subscriber_robot_description = self.create_subscription(
String,
Expand Down Expand Up @@ -261,6 +265,14 @@ def initialize_ros_attributes(self) -> None:
reliability=ReliabilityPolicy.BEST_EFFORT,
),
)
self.base_cmd_vel_publisher = self.create_publisher(
Twist,
"base_cmd_vel",
qos_profile=QoSProfile(
depth=10,
reliability=ReliabilityPolicy.BEST_EFFORT,
),
)
if self.params.publish_buffer_size:
self.ocp_buffer_size_pub = self.create_publisher(
Int32,
Expand Down Expand Up @@ -362,11 +374,19 @@ def setup_mpc_initial_guess(self):
ws_ref.setup(self.robot_models._robot_model)

np_sensor_msg: lfc_py_types.Sensor = sensor_msg_to_numpy(self.sensor_msg)
q_arm = np_sensor_msg.joint_state.position
v_arm = np_sensor_msg.joint_state.velocity
if self.params.planar_base:
q_sensor, v_sensor = sensor_ff_to_planar_state(
self.sensor_msg, q_arm, v_arm
)
else:
q_sensor, v_sensor = q_arm, v_arm
initial_state = TrajectoryPoint(
time_ns=self.get_clock().now().nanoseconds,
robot_configuration=np_sensor_msg.joint_state.position,
robot_velocity=np_sensor_msg.joint_state.velocity,
robot_acceleration=np.zeros_like(np_sensor_msg.joint_state.velocity),
robot_configuration=q_sensor,
robot_velocity=v_sensor,
robot_acceleration=np.zeros_like(v_sensor),
)
reference_trajectory = self.traj_buffer.horizon
if len(self.ocp.input_transforms) > 0:
Expand All @@ -390,8 +410,9 @@ def mpc_input_callback(self, msg: MpcInput) -> None:
msg, self.get_clock().now().nanoseconds
)
self.traj_buffer.append(w_traj_point)
self.params.ocp.effector_frame_name = msg.ee_inputs[0].frame_id
self.effector_frame_name = msg.ee_inputs[0].frame_id
if msg.ee_inputs:
self.params.ocp.effector_frame_name = msg.ee_inputs[0].frame_id
self.effector_frame_name = msg.ee_inputs[0].frame_id

def update_geom_pose_callback(self, pose: Pose, geom_name: str):
"""Updates pose of a geometry with a given name."""
Expand All @@ -418,13 +439,57 @@ def buffer_has_enough_data(self, ratio: float) -> bool:
def send_control_msg(self, ocp_res: OCPResults) -> None:
"""Get OCP control output and publish it."""
assert self.np_sensor_msg is not None

# With a planar base (nu = 3 + n_arm), the feedforward and Ricatti gains
# include base DoFs that the LFC does not handle.
# Strip the base slice so the LFC receives arm-only commands (joint_nv).
# For free_flyer and fixed-base robots, base_nv = 0 or the actuation model
# already produces arm-only controls (ActuationModelFloatingBase), so no
# slicing is needed.
base_nv = 3 if self.params.planar_base else 0
feedforward = ocp_res.feed_forward_terms[0][base_nv:]
ricatti_gain = ocp_res.ricatti_gains[0][base_nv:, :]

if self.params.planar_base:

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry but this sounds fishy.
Can you extract the component of the FF for the free-flyer?
To see what component we miss?
I would rather map the planar ricatti gains to freeflyer ricatti gains by adding some zeros?

# MPC gain is (n_arm, ndx_planar), LFC expects (n_arm, ndx_ff).
# Map planar tangent cols to FF tangent cols, zero-padding vz/wx/wy.
# Planar ndx: [q_base_diff(nv_base), q_arm(n_arm), v_base(nv_base), v_arm(n_arm)]
# LFC FF ndx: [q_ff_diff(nv_ff), q_arm(n_arm), v_ff(nv_ff), v_arm(n_arm)]
nv_base = base_nv # 3 for planar [vx, vy, wz]
nv_joints = feedforward.shape[0]
nv_ff = 6 # free-flyer nv, hardcoded in LFC (free_flyer_nv_ = 6)
nv_lfc = nv_ff + nv_joints

K_lfc = np.zeros((nv_joints, 2 * nv_lfc))
# q_diff: planar [vx,vy,wz] → FF [0,1,5] (vz=2,wx=3,wy=4 stay zero)
K_lfc[:, [0, 1, 5]] = ricatti_gain[:, 0:nv_base]
K_lfc[:, nv_ff : nv_ff + nv_joints] = ricatti_gain[
:, nv_base : nv_base + nv_joints
]
# v_diff: same layout, offset by nv_lfc
K_lfc[:, [nv_lfc + 0, nv_lfc + 1, nv_lfc + 5]] = ricatti_gain[
:, nv_base + nv_joints : 2 * nv_base + nv_joints
]
K_lfc[:, nv_lfc + nv_ff : nv_lfc + nv_ff + nv_joints] = ricatti_gain[
:, 2 * nv_base + nv_joints : 2 * nv_base + 2 * nv_joints
]
ricatti_gain = K_lfc

ctrl_msg = lfc_py_types.Control(
feedback_gain=ocp_res.ricatti_gains[0],
feedforward=ocp_res.feed_forward_terms[0].reshape(self.rmodel.nv, 1),
feedback_gain=ricatti_gain,
feedforward=feedforward.reshape(len(feedforward), 1),
initial_state=self.np_sensor_msg,
)
self.control_publisher.publish(control_numpy_to_msg(ctrl_msg))

if self.params.planar_base:
base_u = ocp_res.feed_forward_terms[0][:3]
twist = Twist()
twist.linear.x = base_u[0]
twist.linear.y = base_u[1]
twist.angular.z = base_u[2]
self.base_cmd_vel_publisher.publish(twist)

def initialization_callback(self) -> None:
if self.sensor_msg is None:
self.get_logger().warn(
Expand All @@ -436,9 +501,36 @@ def initialization_callback(self) -> None:
if not self.ros_robot_ready():
return

# Fetch moving_joint_names from LFC asynchronously (non-blocking).
if self.moving_joint_names is None:
if self._lfc_param_future is None:
if not self._lfc_param_client.service_is_ready():
self.get_logger().warn(
"Waiting for LFC parameter service...",
throttle_duration_sec=5.0,
)
return
req = GetParameters.Request()
req.names = ["moving_joint_names"]
self._lfc_param_future = self._lfc_param_client.call_async(req)
if not self._lfc_param_future.done():
return
result = self._lfc_param_future.result()
self._lfc_param_future = None
if result is None:
self.get_logger().error(
"Failed to get moving_joint_names from LFC, retrying..."
)
return
self.moving_joint_names = result.values[0].string_array_value
self.get_logger().info(
f"Got moving_joint_names from LFC: {self.moving_joint_names}"
)

if self.mpc is None:
self.create_robot_models(
free_flyer=self.params.free_flyer,
planar_base=self.params.planar_base,
collision_as_capsule=self.params.collision_as_capsule,
self_collision=self.params.self_collision,
armature=self.params.ocp.armature,
Expand Down Expand Up @@ -500,7 +592,9 @@ def run_callback(self, *args) -> None:
f"MPC is running but the buffer does not have enough data. Current size {len(self.traj_buffer)}",
throttle_duration_sec=1.0,
)
self.traj_buffer.append(self.traj_buffer[-1])
required = self.traj_buffer.horizon_indexes[-1] + 1
while len(self.traj_buffer) < required:
self.traj_buffer.append(self.traj_buffer[-1])
if self.params.publish_debug_data:
# Do not use ROS time here because we want to measure the real computation time
start_compute_time = time.perf_counter()
Expand All @@ -509,11 +603,20 @@ def run_callback(self, *args) -> None:
# Update the input transforms required by the OCP, if any.
self.update_transforms()

q_arm = self.np_sensor_msg.joint_state.position
v_arm = self.np_sensor_msg.joint_state.velocity
if self.params.planar_base:
q_sensor, v_sensor = sensor_ff_to_planar_state(
self.sensor_msg, q_arm, v_arm
)
else:
q_sensor, v_sensor = q_arm, v_arm

x0_traj_point = TrajectoryPoint(
time_ns=self.get_clock().now().nanoseconds,
robot_configuration=self.np_sensor_msg.joint_state.position,
robot_velocity=self.np_sensor_msg.joint_state.velocity,
robot_acceleration=np.zeros_like(self.np_sensor_msg.joint_state.velocity),
robot_configuration=q_sensor,
robot_velocity=v_sensor,
robot_acceleration=np.zeros_like(v_sensor),
)
if self.params.constant_delay and control is not None:
# Compensate for delay by estimating the future state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ agimus_controller_params:
type: bool
default_value: False
description: "Wether we add a free flyer to the robot base"
planar_base:
type: bool
default_value: False
description: "Whether to add a planar base joint (x, y, yaw — 3 DoF) to the robot"
collision_as_capsule:
type: bool
default_value: True
Expand Down
Loading
Loading