diff --git a/agimus_controller/agimus_controller/factory/robot_model.py b/agimus_controller/agimus_controller/factory/robot_model.py index 71e9b7aa..88a5a6cd 100644 --- a/agimus_controller/agimus_controller/factory/robot_model.py +++ b/agimus_controller/agimus_controller/factory/robot_model.py @@ -14,7 +14,8 @@ class RobotModelParameters: q0: npt.NDArray[np.float64] = field( default_factory=lambda: np.array([], dtype=np.float64) ) # Initial full configuration of the robot - free_flyer: bool = False # True if the robot has a free flyer + free_flyer: bool = False # True if the robot has a free flyer (SE3, 6 DoF) + planar_base: bool = False # True if the robot has a planar base (x, y, yaw — 3 DoF) moving_joint_names: list[str] = field(default_factory=list) robot_urdf: Union[Path, str] = ( "" # Path to the URDF file or string containing URDF as an XML @@ -46,19 +47,25 @@ class RobotModelParameters: ) # Red color for the collision model def __post_init__(self): - # Handle armature: + if self.free_flyer and self.planar_base: + raise ValueError("free_flyer and planar_base are mutually exclusive.") + + # Number of base DoFs added to the model before arm joints: + # free_flyer → 6 (SE3 twist) + # planar_base → 3 (vx, vy, omega) + # none → 0 + base_nv = 6 if self.free_flyer else (3 if self.planar_base else 0) + expected_nv = base_nv + len(self.moving_joint_names) + + # Handle armature: default to zeros sized to the full nv. if self.armature.size == 0: - # Use a default armature filled with 0s, - # based on the size of moving_joint_names - self.armature = np.zeros(len(self.moving_joint_names), dtype=np.float64) + self.armature = np.zeros(expected_nv, dtype=np.float64) - # Ensure armature has the same shape as moving_joint_names - if ( - len(self.armature) != len(self.moving_joint_names) and not self.free_flyer - ): # ! TODO: Do the same for free flyer + if len(self.armature) != expected_nv: raise ValueError( - f"Armature must have the same shape as moving_joint_names. " - f"Got {self.armature.shape} and {len(self.moving_joint_names)}." + f"Armature size mismatch. Expected {expected_nv} " + f"({base_nv} base DoF + {len(self.moving_joint_names)} arm joints), " + f"got {len(self.armature)}." ) # Ensure URDF and SRDF are valid @@ -148,6 +155,7 @@ def _load_urdf( self, urdf: Path | str, use_free_flyer: bool, + use_planar_base: bool, geometry_types: list[pin.GeometryType], ) -> tuple[pin.Model, pin.CollisionGeometry]: """Build pinocchio's models from URDF.""" @@ -162,6 +170,8 @@ def _load_urdf( print("Loaded URDF from XML.") if use_free_flyer: robot_model = pin.buildModelFromXML(urdf, pin.JointModelFreeFlyer()) + elif use_planar_base: + robot_model = pin.buildModelFromXML(urdf, pin.JointModelPlanar()) else: robot_model = pin.buildModelFromXML(urdf) package_dirs = ( @@ -192,14 +202,18 @@ def _load_full_pinocchio_models(self) -> None: ] self._full_robot_model, self._collision_model, self._visual_model = ( self._load_urdf( - self._params.robot_urdf, self._params.free_flyer, geometry_types + self._params.robot_urdf, + self._params.free_flyer, + self._params.planar_base, + geometry_types, ) ) if self._params.env_urdf is not None: env_model, env_collision_model, env_visual_model = self._load_urdf( self._params.env_urdf, - use_free_flyer=False, - geometry_types=geometry_types, + False, + False, + geometry_types, ) # make robot models append environment models @@ -237,12 +251,20 @@ def _lock_joints(self) -> None: jn + " not in the model. Model is:" + str(self._full_robot_model) ) # Find the joints to lock. + # When a floating base (free_flyer) or planar base is used, the root + # joint (index 1, typically named "root_joint") must NOT be locked so + # that the base DoFs survive the reduction. + has_base_joint = self._params.free_flyer or self._params.planar_base + root_joint_id = 1 if has_base_joint else None joints_to_lock = [] for jn in self._full_robot_model.names: if jn == "universe": continue + jid = self._full_robot_model.getJointId(jn) + if has_base_joint and jid == root_joint_id: + continue if jn not in self._params.moving_joint_names: - joints_to_lock.append(self._full_robot_model.getJointId(jn)) + joints_to_lock.append(jid) if len(self._q0) == 0: self._q0 = pin.neutral(self._full_robot_model) ( diff --git a/agimus_controller/agimus_controller/ocp_base_croco.py b/agimus_controller/agimus_controller/ocp_base_croco.py index 1405c05b..fb2ccda2 100644 --- a/agimus_controller/agimus_controller/ocp_base_croco.py +++ b/agimus_controller/agimus_controller/ocp_base_croco.py @@ -39,7 +39,17 @@ def __init__( ) else: self._state = crocoddyl.StateMultibody(self._robot_models.robot_model) - self._actuation = crocoddyl.ActuationModelFull(self._state) + + # free_flyer: base is unactuated (6 DoF), only arm joints are controlled + # → ActuationModelFloatingBase, nu = nv - 6 + # planar_base: base velocity commands are part of the control vector + # → ActuationModelFull, nu = nv (3 base + n_arm) + # none: fixed base, all joints actuated + # → ActuationModelFull, nu = nv + if robot_models.params.free_flyer: + self._actuation = crocoddyl.ActuationModelFloatingBase(self._state) + else: + self._actuation = crocoddyl.ActuationModelFull(self._state) # Setting the OCP parameters self._ocp_params = ocp_params diff --git a/agimus_controller/agimus_controller/trajectory.py b/agimus_controller/agimus_controller/trajectory.py index 1b86b641..4ef407c4 100644 --- a/agimus_controller/agimus_controller/trajectory.py +++ b/agimus_controller/agimus_controller/trajectory.py @@ -216,10 +216,11 @@ def compute_horizon_indexes(self): @property def horizon(self): - assert self.horizon_indexes[-1] < len(self._buffer), ( - "Size of buffer must be at least horizon_indexes[-1]." - ) - return [self._buffer[i] for i in self.horizon_indexes] + last = self._buffer[-1] + return [ + self._buffer[i] if i < len(self._buffer) else last + for i in self.horizon_indexes + ] def __len__(self): return len(self._buffer) diff --git a/agimus_controller/agimus_controller/warm_start_shift_previous_solution.py b/agimus_controller/agimus_controller/warm_start_shift_previous_solution.py index 2946f7b6..5d3834f8 100644 --- a/agimus_controller/agimus_controller/warm_start_shift_previous_solution.py +++ b/agimus_controller/agimus_controller/warm_start_shift_previous_solution.py @@ -46,9 +46,12 @@ def setup( assert ocp_params.dt == self._timesteps[0] assert all(dt >= self._dt for dt in self._timesteps) - # Build the integrator + # Build the integrator — actuation model must match the OCP state = crocoddyl.StateMultibody(robot_models.robot_model) - actuation = crocoddyl.ActuationModelFull(state) + if robot_models.params.free_flyer: + actuation = crocoddyl.ActuationModelFloatingBase(state) + else: + actuation = crocoddyl.ActuationModelFull(state) cost_model = crocoddyl.CostModelSum(state) differential = crocoddyl.DifferentialActionModelFreeFwdDynamics( state, diff --git a/agimus_controller_ros/agimus_controller_ros/agimus_controller.py b/agimus_controller_ros/agimus_controller_ros/agimus_controller.py index aeed7896..8087b53b 100644 --- a/agimus_controller_ros/agimus_controller_ros/agimus_controller.py +++ b/agimus_controller_ros/agimus_controller_ros/agimus_controller.py @@ -12,8 +12,9 @@ import rclpy.time from std_msgs.msg import Int32, String -from geometry_msgs.msg import Pose +from geometry_msgs.msg import Pose, Twist from agimus_msgs.msg import MpcInput, MpcDebug +from rcl_interfaces.srv import GetParameters import builtin_interfaces from tf2_ros import TransformException @@ -43,8 +44,8 @@ mpc_msg_to_weighted_traj_point, mpc_debug_data_to_msg, transform_msg_to_se3, - get_param_from_node, ros_pose_to_se3, + sensor_ff_to_planar_state, ) @@ -59,10 +60,13 @@ def init_ros_robot_creation(self) -> None: self.environment_msg = None self.robot_srdf_description_msg = None - # Get moving joint names from LFC - self.moving_joint_names = get_param_from_node( - self, "linear_feedback_controller", "moving_joint_names" - ).string_array_value + # moving_joint_names is fetched asynchronously in initialization_callback + # to avoid blocking __init__ when the LFC service is not yet available. + self.moving_joint_names = None + self._lfc_param_future = None + self._lfc_param_client = self.create_client( + GetParameters, "/linear_feedback_controller/get_parameters" + ) self.subscriber_robot_description = self.create_subscription( String, @@ -261,6 +265,14 @@ def initialize_ros_attributes(self) -> None: reliability=ReliabilityPolicy.BEST_EFFORT, ), ) + self.base_cmd_vel_publisher = self.create_publisher( + Twist, + "base_cmd_vel", + qos_profile=QoSProfile( + depth=10, + reliability=ReliabilityPolicy.BEST_EFFORT, + ), + ) if self.params.publish_buffer_size: self.ocp_buffer_size_pub = self.create_publisher( Int32, @@ -362,11 +374,19 @@ def setup_mpc_initial_guess(self): ws_ref.setup(self.robot_models._robot_model) np_sensor_msg: lfc_py_types.Sensor = sensor_msg_to_numpy(self.sensor_msg) + q_arm = np_sensor_msg.joint_state.position + v_arm = np_sensor_msg.joint_state.velocity + if self.params.planar_base: + q_sensor, v_sensor = sensor_ff_to_planar_state( + self.sensor_msg, q_arm, v_arm + ) + else: + q_sensor, v_sensor = q_arm, v_arm initial_state = TrajectoryPoint( time_ns=self.get_clock().now().nanoseconds, - robot_configuration=np_sensor_msg.joint_state.position, - robot_velocity=np_sensor_msg.joint_state.velocity, - robot_acceleration=np.zeros_like(np_sensor_msg.joint_state.velocity), + robot_configuration=q_sensor, + robot_velocity=v_sensor, + robot_acceleration=np.zeros_like(v_sensor), ) reference_trajectory = self.traj_buffer.horizon if len(self.ocp.input_transforms) > 0: @@ -390,8 +410,9 @@ def mpc_input_callback(self, msg: MpcInput) -> None: msg, self.get_clock().now().nanoseconds ) self.traj_buffer.append(w_traj_point) - self.params.ocp.effector_frame_name = msg.ee_inputs[0].frame_id - self.effector_frame_name = msg.ee_inputs[0].frame_id + if msg.ee_inputs: + self.params.ocp.effector_frame_name = msg.ee_inputs[0].frame_id + self.effector_frame_name = msg.ee_inputs[0].frame_id def update_geom_pose_callback(self, pose: Pose, geom_name: str): """Updates pose of a geometry with a given name.""" @@ -418,13 +439,57 @@ def buffer_has_enough_data(self, ratio: float) -> bool: def send_control_msg(self, ocp_res: OCPResults) -> None: """Get OCP control output and publish it.""" assert self.np_sensor_msg is not None + + # With a planar base (nu = 3 + n_arm), the feedforward and Ricatti gains + # include base DoFs that the LFC does not handle. + # Strip the base slice so the LFC receives arm-only commands (joint_nv). + # For free_flyer and fixed-base robots, base_nv = 0 or the actuation model + # already produces arm-only controls (ActuationModelFloatingBase), so no + # slicing is needed. + base_nv = 3 if self.params.planar_base else 0 + feedforward = ocp_res.feed_forward_terms[0][base_nv:] + ricatti_gain = ocp_res.ricatti_gains[0][base_nv:, :] + + if self.params.planar_base: + # MPC gain is (n_arm, ndx_planar), LFC expects (n_arm, ndx_ff). + # Map planar tangent cols to FF tangent cols, zero-padding vz/wx/wy. + # Planar ndx: [q_base_diff(nv_base), q_arm(n_arm), v_base(nv_base), v_arm(n_arm)] + # LFC FF ndx: [q_ff_diff(nv_ff), q_arm(n_arm), v_ff(nv_ff), v_arm(n_arm)] + nv_base = base_nv # 3 for planar [vx, vy, wz] + nv_joints = feedforward.shape[0] + nv_ff = 6 # free-flyer nv, hardcoded in LFC (free_flyer_nv_ = 6) + nv_lfc = nv_ff + nv_joints + + K_lfc = np.zeros((nv_joints, 2 * nv_lfc)) + # q_diff: planar [vx,vy,wz] → FF [0,1,5] (vz=2,wx=3,wy=4 stay zero) + K_lfc[:, [0, 1, 5]] = ricatti_gain[:, 0:nv_base] + K_lfc[:, nv_ff : nv_ff + nv_joints] = ricatti_gain[ + :, nv_base : nv_base + nv_joints + ] + # v_diff: same layout, offset by nv_lfc + K_lfc[:, [nv_lfc + 0, nv_lfc + 1, nv_lfc + 5]] = ricatti_gain[ + :, nv_base + nv_joints : 2 * nv_base + nv_joints + ] + K_lfc[:, nv_lfc + nv_ff : nv_lfc + nv_ff + nv_joints] = ricatti_gain[ + :, 2 * nv_base + nv_joints : 2 * nv_base + 2 * nv_joints + ] + ricatti_gain = K_lfc + ctrl_msg = lfc_py_types.Control( - feedback_gain=ocp_res.ricatti_gains[0], - feedforward=ocp_res.feed_forward_terms[0].reshape(self.rmodel.nv, 1), + feedback_gain=ricatti_gain, + feedforward=feedforward.reshape(len(feedforward), 1), initial_state=self.np_sensor_msg, ) self.control_publisher.publish(control_numpy_to_msg(ctrl_msg)) + if self.params.planar_base: + base_u = ocp_res.feed_forward_terms[0][:3] + twist = Twist() + twist.linear.x = base_u[0] + twist.linear.y = base_u[1] + twist.angular.z = base_u[2] + self.base_cmd_vel_publisher.publish(twist) + def initialization_callback(self) -> None: if self.sensor_msg is None: self.get_logger().warn( @@ -436,9 +501,36 @@ def initialization_callback(self) -> None: if not self.ros_robot_ready(): return + # Fetch moving_joint_names from LFC asynchronously (non-blocking). + if self.moving_joint_names is None: + if self._lfc_param_future is None: + if not self._lfc_param_client.service_is_ready(): + self.get_logger().warn( + "Waiting for LFC parameter service...", + throttle_duration_sec=5.0, + ) + return + req = GetParameters.Request() + req.names = ["moving_joint_names"] + self._lfc_param_future = self._lfc_param_client.call_async(req) + if not self._lfc_param_future.done(): + return + result = self._lfc_param_future.result() + self._lfc_param_future = None + if result is None: + self.get_logger().error( + "Failed to get moving_joint_names from LFC, retrying..." + ) + return + self.moving_joint_names = result.values[0].string_array_value + self.get_logger().info( + f"Got moving_joint_names from LFC: {self.moving_joint_names}" + ) + if self.mpc is None: self.create_robot_models( free_flyer=self.params.free_flyer, + planar_base=self.params.planar_base, collision_as_capsule=self.params.collision_as_capsule, self_collision=self.params.self_collision, armature=self.params.ocp.armature, @@ -500,7 +592,9 @@ def run_callback(self, *args) -> None: f"MPC is running but the buffer does not have enough data. Current size {len(self.traj_buffer)}", throttle_duration_sec=1.0, ) - self.traj_buffer.append(self.traj_buffer[-1]) + required = self.traj_buffer.horizon_indexes[-1] + 1 + while len(self.traj_buffer) < required: + self.traj_buffer.append(self.traj_buffer[-1]) if self.params.publish_debug_data: # Do not use ROS time here because we want to measure the real computation time start_compute_time = time.perf_counter() @@ -509,11 +603,20 @@ def run_callback(self, *args) -> None: # Update the input transforms required by the OCP, if any. self.update_transforms() + q_arm = self.np_sensor_msg.joint_state.position + v_arm = self.np_sensor_msg.joint_state.velocity + if self.params.planar_base: + q_sensor, v_sensor = sensor_ff_to_planar_state( + self.sensor_msg, q_arm, v_arm + ) + else: + q_sensor, v_sensor = q_arm, v_arm + x0_traj_point = TrajectoryPoint( time_ns=self.get_clock().now().nanoseconds, - robot_configuration=self.np_sensor_msg.joint_state.position, - robot_velocity=self.np_sensor_msg.joint_state.velocity, - robot_acceleration=np.zeros_like(self.np_sensor_msg.joint_state.velocity), + robot_configuration=q_sensor, + robot_velocity=v_sensor, + robot_acceleration=np.zeros_like(v_sensor), ) if self.params.constant_delay and control is not None: # Compensate for delay by estimating the future state. diff --git a/agimus_controller_ros/agimus_controller_ros/agimus_controller_parameters.yaml b/agimus_controller_ros/agimus_controller_ros/agimus_controller_parameters.yaml index c90a5391..e51ff980 100644 --- a/agimus_controller_ros/agimus_controller_ros/agimus_controller_parameters.yaml +++ b/agimus_controller_ros/agimus_controller_ros/agimus_controller_parameters.yaml @@ -82,6 +82,10 @@ agimus_controller_params: type: bool default_value: False description: "Wether we add a free flyer to the robot base" + planar_base: + type: bool + default_value: False + description: "Whether to add a planar base joint (x, y, yaw — 3 DoF) to the robot" collision_as_capsule: type: bool default_value: True diff --git a/agimus_controller_ros/agimus_controller_ros/mpc_debugger_node.py b/agimus_controller_ros/agimus_controller_ros/mpc_debugger_node.py index 5ed66366..b77cbb68 100644 --- a/agimus_controller_ros/agimus_controller_ros/mpc_debugger_node.py +++ b/agimus_controller_ros/agimus_controller_ros/mpc_debugger_node.py @@ -132,6 +132,7 @@ def __init__( def _get_agimus_controller_node_params(self): names = [ "free_flyer", + "planar_base", "ocp.dt_factor_n_seq.factors", "ocp.dt_factor_n_seq.n_steps", "collision_as_capsule", @@ -149,13 +150,14 @@ def _get_agimus_controller_node_params(self): self._agimus_controller_node_params = dict(zip(names, params)) self._robot_has_free_flyer = params[0].bool_value - dt_factors = params[1].integer_array_value - dt_n_steps = params[2].integer_array_value + self._robot_has_planar_base = params[1].bool_value + dt_factors = params[2].integer_array_value + dt_n_steps = params[3].integer_array_value self._ocp_dt_factor_n_seq = DTFactorsNSeq(dt_factors, dt_n_steps) - self._collision_as_capsule = params[3].bool_value - self._self_collision = params[4].bool_value - self._ocp_armature = np.array(params[5].double_array_value) + self._collision_as_capsule = params[4].bool_value + self._self_collision = params[5].bool_value + self._ocp_armature = np.array(params[6].double_array_value) self._horizon_indices = np.cumsum( sum( @@ -272,7 +274,7 @@ def _evaluate_ocp( if self._ocp_update_requested.is_set(): return - # Step 1: Update references + # Update references # Read transforms from TF. Note that doing do introduces a delay. now = self.get_clock().now() transforms = self._ocp.input_transforms @@ -293,14 +295,14 @@ def _evaluate_ocp( references = [self._references[i] for i in self._horizon_indices] self._ocp.set_reference_weighted_trajectory(references) - # Step 2: Build state and control and evaluate the OCP + # Build state and control and evaluate the OCP x = [np.asarray(state) for state in states] u = [np.asarray(control) for control in controls] problem = self._ocp._problem problem.calc(x, u) problem.calcDiff(x, u) - # Step 3: retrieve the individual cost items. + # Retrieve the individual cost items. idof = 0 with self._mpl_lock: for c, (cost_idx, model, data) in enumerate( @@ -435,6 +437,7 @@ def initialization_callback(self): self.get_logger().info("create robot...") self.create_robot_models( free_flyer=self._robot_has_free_flyer, + planar_base=self._robot_has_planar_base, collision_as_capsule=self._collision_as_capsule, self_collision=self._self_collision, armature=self._ocp_armature, diff --git a/agimus_controller_ros/agimus_controller_ros/ros_utils.py b/agimus_controller_ros/agimus_controller_ros/ros_utils.py index 03c73186..93f45116 100644 --- a/agimus_controller_ros/agimus_controller_ros/ros_utils.py +++ b/agimus_controller_ros/agimus_controller_ros/ros_utils.py @@ -317,6 +317,37 @@ def mpc_debug_data_to_msg(mpc_debug_data: MPCDebugData) -> MpcDebug: return mpc_debug_msg +def sensor_ff_to_planar_state( + sensor_msg, + arm_q: npt.NDArray[np.float64], + arm_v: npt.NDArray[np.float64], +) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64]]: + """Build planar+arm state from an LFC Sensor message with robot_has_free_flyer=true. + + The LFC sends base data in dedicated fields: + sensor_msg.base_pose (geometry_msgs/Pose) — position + orientation + sensor_msg.base_twist (geometry_msgs/Twist) — linear + angular velocity + + The pinocchio planar model expects: + q_planar = [x, y, cos(yaw), sin(yaw), arm_1..n] length 4 + n_arm + v_planar = [vx, vy, wz, arm_1..n] length 3 + n_arm + + Assumes flat floor (z ≈ 0, roll ≈ 0, pitch ≈ 0). + """ + pos = sensor_msg.base_pose.position + ori = sensor_msg.base_pose.orientation + lin = sensor_msg.base_twist.linear + ang = sensor_msg.base_twist.angular + + qx, qy, qz, qw = ori.x, ori.y, ori.z, ori.w + yaw = 2.0 * np.arctan2(qw * qz + qx * qy, 1.0 - 2.0 * (qy**2 + qz**2)) + + q_planar = np.concatenate([[pos.x, pos.y, np.cos(yaw), np.sin(yaw)], arm_q]) + v_planar = np.concatenate([[lin.x, lin.y, ang.z], arm_v]) + + return q_planar, v_planar + + def get_params_from_node( requester_node: Node, target_node_name: str, target_params_name: list[str] ) -> list[ParameterValue]: diff --git a/agimus_controller_ros/agimus_controller_ros/simple_trajectory_publisher.py b/agimus_controller_ros/agimus_controller_ros/simple_trajectory_publisher.py index 8127d7b5..eeb6059c 100644 --- a/agimus_controller_ros/agimus_controller_ros/simple_trajectory_publisher.py +++ b/agimus_controller_ros/agimus_controller_ros/simple_trajectory_publisher.py @@ -1,4 +1,5 @@ from typing import List +import time import numpy as np from agimus_msgs.msg import MpcInput @@ -9,6 +10,7 @@ from rclpy.qos import QoSProfile, DurabilityPolicy, ReliabilityPolicy from sensor_msgs.msg import JointState from linear_feedback_controller_msgs.msg import Sensor +from rcl_interfaces.srv import GetParameters from agimus_controller.trajectories.sine_wave_params import SinWaveParams from agimus_controller.factory.robot_model import RobotModelParameters, RobotModels @@ -27,6 +29,7 @@ from agimus_controller_ros.ros_utils import ( weighted_traj_point_to_mpc_msg, get_param_from_node, + sensor_ff_to_planar_state, ) from agimus_controller.trajectories.generic_visual_servoing_trajectory import ( GenericVisualServoingTrajectory, @@ -69,14 +72,28 @@ def __init__(self, name="trajectory_publisher"): self.q0 = None self.current_q = None + self.current_dq = None self.robot_description_msg = None - # Obtained by checking "QoS profile" values in out of: - # ros2 topic info -v /robot_description - # ros2 topic info -v /sensor - self.moving_joint_names = get_param_from_node( - self, "linear_feedback_controller", "moving_joint_names" - ).string_array_value + # Fetched asynchronously in _initialize_q0 to avoid blocking __init__. + self.moving_joint_names = None + self._free_flyer = None + self._planar_base = None + self._agimus_params_fetched = ( + False # True only after _on_agimus_params succeeds + ) + + self._lfc_param_client = self.create_client( + GetParameters, "/linear_feedback_controller/get_parameters" + ) + self._agimus_param_client = self.create_client( + GetParameters, "/agimus_controller_node/get_parameters" + ) + self._lfc_param_future = None + self._lfc_param_future_deadline = None # wall-clock timeout for the future + self._agimus_param_future = None + self._agimus_param_future_deadline = None + self.subscriber_robot_description_ = self.create_subscription( String, "/robot_description", @@ -109,16 +126,27 @@ def __init__(self, name="trajectory_publisher"): def joint_states_callback(self, msg: Sensor) -> None: """Set joint state reference.""" + if self.moving_joint_names is None or self._planar_base is None: + return jpos = np.array(msg.joint_state.position) - # TODO fix this, temp hac to work from sim + jvel = np.array(msg.joint_state.velocity) + # TODO fix this, temp hack to work from sim joint_idxs = get_joint_idxs(self.moving_joint_names, msg.joint_state) - if self.q0 is None and np.linalg.norm(jpos) > 1e-2: - self.q0 = get_reduced_configuration(jpos, joint_idxs) + arm_q = get_reduced_configuration(jpos, joint_idxs) + arm_v = get_reduced_configuration(jvel, joint_idxs) + if self.q0 is None and np.linalg.norm(arm_q) > 1e-2: + self.q0 = arm_q.copy() self.get_logger().info(f"Received q0 = {[round(el, 2) for el in self.q0]}.") - self.current_q = get_reduced_configuration(jpos, joint_idxs) - self.current_dq = get_reduced_configuration( - np.array(msg.joint_state.velocity), joint_idxs - ) + # With a planar base, the LFC sends base data in base_pose/base_twist fields. + # Convert to planar representation; before _load_models, fall back to neutral. + if self._planar_base: + q_planar, v_planar = sensor_ff_to_planar_state(msg, arm_q, arm_v) + self.current_q = q_planar + self.current_dq = v_planar + else: + base_q = getattr(self, "_base_q0", np.array([])) + self.current_q = np.concatenate([base_q, arm_q]) + self.current_dq = np.concatenate([np.zeros(len(base_q)), arm_v]) def robot_description_callback(self, msg: String) -> None: """Create the models of the robot from the urdf string.""" @@ -131,22 +159,141 @@ def _load_models(self): self.robot_models = RobotModels( param=RobotModelParameters( robot_urdf=self.robot_description_msg.data, - free_flyer=False, + free_flyer=self._free_flyer, + planar_base=self._planar_base, moving_joint_names=self.moving_joint_names, ) ) - self.get_logger().info( - f"Model loaded, pin_model.nq = {self.robot_models.robot_model.nq}" - ) + + # q0 and current_q from joint_states_callback contain arm joints only. + # With a floating base, prepend the neutral base configuration so that + # the full configuration matches robot_model.nq. + rmodel = self.robot_models.robot_model + nq_base = rmodel.nq - len(self.moving_joint_names) + if nq_base > 0: + import pinocchio as pin + + base_q0 = pin.neutral(rmodel)[:nq_base] + self.q0 = np.concatenate([base_q0, self.q0]) + self.current_q = np.concatenate([base_q0, self.current_q]) + self._base_q0 = base_q0 + else: + self._base_q0 = np.array([]) + + self.get_logger().info(f"Model loaded, pin_model.nq = {rmodel.nq}") self.get_logger().info(f"Model loaded, reduced self.q0 = {self.q0}") + def _agimus_params_to_fetch(self) -> list: + """Names of parameters to fetch from agimus_controller_node. + + Subclasses can override to request additional parameters. + The first two entries must always be 'free_flyer' and 'planar_base'. + """ + return ["free_flyer", "planar_base"] + + def _on_agimus_params(self, values) -> None: + """Called with the fetched agimus_controller_node parameter values. + + Subclasses that override _agimus_params_to_fetch must also override + this method to handle the extra values (indices 2+). + """ + self._free_flyer = values[0].bool_value + self._planar_base = values[1].bool_value + def _initialize_q0(self): + self.get_logger().debug("_initialize_q0 tick") + + # Step 1: fetch moving_joint_names from LFC (non-blocking). + if self.moving_joint_names is None: + if self._lfc_param_future is None: + if not self._lfc_param_client.service_is_ready(): + self.get_logger().info( + "Waiting for LFC parameter service...", + throttle_duration_sec=5.0, + ) + return + req = GetParameters.Request() + req.names = ["moving_joint_names"] + self._lfc_param_future = self._lfc_param_client.call_async(req) + self._lfc_param_future_deadline = time.monotonic() + 5.0 + self.get_logger().info("Sent GetParameters request to LFC.") + if not self._lfc_param_future.done(): + if time.monotonic() > self._lfc_param_future_deadline: + self.get_logger().warn( + "GetParameters request to LFC timed out (service busy during init?), retrying..." + ) + self._lfc_param_future = None + self._lfc_param_future_deadline = None + return + result = self._lfc_param_future.result() + self._lfc_param_future = None + if result is None: + self.get_logger().error( + "Failed to get moving_joint_names from LFC, retrying..." + ) + return + self.moving_joint_names = result.values[0].string_array_value + self.get_logger().info( + f"[Step 1 done] moving_joint_names from LFC: {self.moving_joint_names}" + ) + + # Step 2: fetch free_flyer / planar_base (+ subclass extras) from + # agimus_controller_node (non-blocking). + if not self._agimus_params_fetched: + if self._agimus_param_future is None: + if not self._agimus_param_client.service_is_ready(): + self.get_logger().info( + "Waiting for agimus_controller_node parameter service...", + throttle_duration_sec=5.0, + ) + return + req = GetParameters.Request() + names = self._agimus_params_to_fetch() + req.names = names + self._agimus_param_future = self._agimus_param_client.call_async(req) + self._agimus_param_future_deadline = time.monotonic() + 5.0 + self.get_logger().info( + f"Sent GetParameters request to agimus_controller_node: {names}" + ) + if not self._agimus_param_future.done(): + if time.monotonic() > self._agimus_param_future_deadline: + self.get_logger().warn( + "GetParameters request to agimus_controller_node timed out, retrying..." + ) + self._agimus_param_future = None + self._agimus_param_future_deadline = None + return + result = self._agimus_param_future.result() + self._agimus_param_future = None + if result is None: + self.get_logger().error( + "Failed to get params from agimus_controller_node, retrying..." + ) + return + try: + self._on_agimus_params(result.values) + except Exception as e: + self.get_logger().error( + f"_on_agimus_params raised {type(e).__name__}: {e}. " + f"Parameter values: {[(v.type, v.bool_value, v.double_value) for v in result.values]}. " + "Retrying..." + ) + return + self._agimus_params_fetched = True + self.get_logger().info( + "[Step 2 done] agimus_controller_node params fetched." + ) + + # Step 3: wait for robot description and first joint state. if self.robot_description_msg is None or self.q0 is None: self.get_logger().info( "Wait for robot model and q0", throttle_duration_sec=1.0 ) return + self.get_logger().info( + "[Step 3 done] robot model and q0 ready. Loading models..." + ) self._load_models() self.destroy_timer(self.timer) self.ready_callback()