diff --git a/services/motion/builtin/builtin.go b/services/motion/builtin/builtin.go index 326702b6333..7e750aa9db2 100644 --- a/services/motion/builtin/builtin.go +++ b/services/motion/builtin/builtin.go @@ -97,6 +97,17 @@ type Config struct { // example { "arm" : { "3" : { "min" : 0, "max" : 2 } } } InputRangeOverride map[string]map[string]referenceframe.Limit `json:"input_range_override"` + + // TeleopSmallMoveRad is the max joint displacement (radians) below which + // the arm's built-in interpolation is enabled for smooth motion. Default 0.005. + TeleopSmallMoveRad float64 `json:"teleop_small_move_rad"` + // TeleopInterpolateOverride when true forces interpolation ON for all teleop + // movements regardless of size. Useful for testing. + TeleopInterpolateOverride bool `json:"teleop_interpolate_override"` + // TeleopSmoothAlpha is the EMA smoothing factor for joint positions (0-1). + // Lower = smoother but more latency, higher = more responsive but less smooth. + // 0 = frozen, 1 = no smoothing. Default 0.5. + TeleopSmoothAlpha float64 `json:"teleop_smooth_alpha"` } func (c *Config) shouldWritePlan(start time.Time, err error) bool { @@ -532,67 +543,43 @@ func (ms *builtIn) plan(ctx context.Context, req motion.MoveReq, logger logging. return plan, err } -// planTeleop is a low-latency variant of plan() for the teleop pipeline. -// It uses a pre-built frame system and caller-provided fsInputs (merged from -// live CurrentInputs + planning head) to avoid per-call overhead. -func (ms *builtIn) planTeleop( +// planTeleopMulti plans a trajectory for multiple components simultaneously. +// It builds a multi-frame goal from the given poses map, allowing the planner +// to find collision-free paths for all arms jointly. +func (ms *builtIn) planTeleopMulti( ctx context.Context, - req motion.MoveReq, + goals referenceframe.FrameSystemPoses, + extra map[string]interface{}, frameSys *referenceframe.FrameSystem, fsInputs referenceframe.FrameSystemInputs, logger logging.Logger, ) (motionplan.Plan, error) { - ctx, span := trace.StartSpan(ctx, "motion::builtin::planTeleop") + ctx, span := trace.StartSpan(ctx, "motion::builtin::planTeleopMulti") defer span.End() - movingFrame := frameSys.Frame(req.ComponentName) - if movingFrame == nil { - return nil, fmt.Errorf("component named %s not found in robot frame system", req.ComponentName) - } - - startState, waypoints, err := waypointsFromRequest(req, fsInputs) - if err != nil { - return nil, err - } - if len(waypoints) == 0 { - return nil, errors.New("could not find any waypoints to plan for in MoveRequest. Fill in Destination or goal_state") - } - - if req.Extra != nil { - req.Extra["waypoints"] = nil - } - - // Re-evaluate goal poses to be in the frame of World. - worldWaypoints := []*armplanning.PlanState{} - solvingFrame := referenceframe.World - for _, wp := range waypoints { - if wp.Poses() != nil { - step := referenceframe.FrameSystemPoses{} - for fName, destination := range wp.Poses() { - tf, err := frameSys.Transform(fsInputs.ToLinearInputs(), destination, solvingFrame) - if err != nil { - return nil, err - } - goalPose, _ := tf.(*referenceframe.PoseInFrame) - step[fName] = goalPose - } - worldWaypoints = append(worldWaypoints, armplanning.NewPlanState(step, wp.Configuration())) - } else { - worldWaypoints = append(worldWaypoints, wp) + // Transform all goal poses to world frame. + worldGoals := make(referenceframe.FrameSystemPoses, len(goals)) + for fName, destination := range goals { + tf, err := frameSys.Transform(fsInputs.ToLinearInputs(), destination, referenceframe.World) + if err != nil { + return nil, err } + goalPose, _ := tf.(*referenceframe.PoseInFrame) + worldGoals[fName] = goalPose } - planOpts, err := armplanning.NewPlannerOptionsFromExtra(req.Extra) + startState := armplanning.NewPlanState(nil, fsInputs) + goalState := armplanning.NewPlanState(worldGoals, nil) + + planOpts, err := armplanning.NewPlannerOptionsFromExtra(extra) if err != nil { return nil, err } planRequest := &armplanning.PlanRequest{ FrameSystem: frameSys, - Goals: worldWaypoints, + Goals: []*armplanning.PlanState{goalState}, StartState: startState, - WorldState: req.WorldState, - Constraints: req.Constraints, PlannerOptions: planOpts, } @@ -688,6 +675,7 @@ func (ms *builtIn) execute(ctx context.Context, trajectory motionplan.Trajectory if err != nil { return err } + if err := ie.GoToInputs(ctx, inputs...); err != nil { // If there is an error on GoToInputs, stop the component if possible before returning the error if actuator, ok := r.(inputEnabledActuator); ok { diff --git a/services/motion/builtin/teleop.go b/services/motion/builtin/teleop.go index 8b6ef4d6049..0d3aaac4865 100644 --- a/services/motion/builtin/teleop.go +++ b/services/motion/builtin/teleop.go @@ -3,7 +3,6 @@ package builtin import ( "context" "fmt" - "math" "sync" "sync/atomic" "time" @@ -14,27 +13,47 @@ import ( "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/types/known/structpb" + "go.viam.com/rdk/components/arm" "go.viam.com/rdk/logging" "go.viam.com/rdk/motionplan" "go.viam.com/rdk/referenceframe" + "go.viam.com/rdk/resource" + "go.viam.com/rdk/robot/framesystem" "go.viam.com/rdk/services/motion" "go.viam.com/rdk/utils" ) +const defaultTeleopSmoothAlpha = 0.5 + +// teleopComponent tracks a single component being teleop'd within the pipeline. +type teleopComponent struct { + name string + moveReqBase motion.MoveReq + latestPose atomic.Pointer[referenceframe.PoseInFrame] +} + // teleopPipeline manages the continuous motion pipeline for low-latency teleop. -// It runs two goroutines connected by channels: +// It supports multiple components (arms) planned jointly in a single pipeline +// to guarantee collision-free trajectories. // -// poseCh → Planner goroutine → trajCh → Executor goroutine → arm.GoToInputs() +// notify → Planner goroutine → trajCh → Executor goroutine → arm.GoToInputs() type teleopPipeline struct { logger logging.Logger // Immutable after creation. - moveReqBase motion.MoveReq - cachedFrameSys *referenceframe.FrameSystem // built once at pipeline start + cachedFrameSys *referenceframe.FrameSystem // built once at pipeline start + cachedBaseInputs referenceframe.FrameSystemInputs // snapshot at pipeline start + + // Components being teleop'd. Protected by componentsMu. + componentsMu sync.RWMutex + components map[string]*teleopComponent - // Channels. - poseCh chan *referenceframe.PoseInFrame // buffer 1, latest-value semantics - trajCh chan motionplan.Trajectory // buffer 1, one-ahead lookahead + // Notification channel — poked when any component gets a new pose. + // Buffer 1, latest-value semantics. + notify chan struct{} + + // Trajectory output channel. Buffer 1, one-ahead lookahead. + trajCh chan motionplan.Trajectory // Planning head: the last configuration the planner planned TO. // This allows trajectories to chain seamlessly. @@ -52,47 +71,45 @@ type teleopPipeline struct { planCount atomic.Int64 execCount atomic.Int64 + // EMA-smoothed joint positions per component. Only accessed by executor goroutine. + smoothedJoints map[string][]referenceframe.Input + // Lifecycle. workers *goutils.StoppableWorkers } -// trySendLatest sends pose on ch using latest-value semantics: -// if a stale value is buffered, it is drained first so the new value replaces it. +// trySendNotify pokes the notify channel using latest-value semantics. // Safe for concurrent callers: never blocks. -func trySendLatest(ch chan *referenceframe.PoseInFrame, pose *referenceframe.PoseInFrame) { - // Fast path: channel is empty, send directly. +func trySendNotify(ch chan struct{}) { select { - case ch <- pose: + case ch <- struct{}{}: return default: } - // Channel full — drain stale value and retry. select { case <-ch: default: } select { - case ch <- pose: + case ch <- struct{}{}: default: - // Another writer beat us; their pose is equally fresh. } } -// runPlanner is the planner goroutine. It reads poses from poseCh, -// plans trajectories from the planning head, and sends them on trajCh. +// runPlanner is the planner goroutine. It wakes on notify signals, +// reads all components' latest poses, and plans a joint trajectory. func (tp *teleopPipeline) runPlanner(ctx context.Context, ms *builtIn) { for { select { case <-ctx.Done(): return - case pose := <-tp.poseCh: - tp.planOnce(ctx, ms, pose) + case <-tp.notify: + tp.planOnce(ctx, ms) } } } // planningHeadEqual reports whether two FrameSystemInputs snapshots are identical. -// Used to detect whether the planning head was reset while a plan was in flight. func planningHeadEqual(a, b referenceframe.FrameSystemInputs) bool { if len(a) != len(b) { return false @@ -111,48 +128,56 @@ func planningHeadEqual(a, b referenceframe.FrameSystemInputs) bool { return true } -func (tp *teleopPipeline) planOnce(ctx context.Context, ms *builtIn, pose *referenceframe.PoseInFrame) { - // Read the current planning head for the teleop'd arm. +func (tp *teleopPipeline) planOnce(ctx context.Context, ms *builtIn) { + // Snapshot the planning head under RLock. We need both a copy for safe iteration + // and for the stale-check after planning completes (line ~195). tp.planningHeadMu.RLock() - planningHead := tp.planningHead + planningHead := make(referenceframe.FrameSystemInputs, len(tp.planningHead)) + for k, v := range tp.planningHead { + planningHead[k] = v + } tp.planningHeadMu.RUnlock() - // Merge live inputs with the planning head: use fresh CurrentInputs for - // all components (including the other arm in bimanual setups), then overlay - // the planning head for the teleop'd component so trajectory chaining works. - // Timing includes RLock acquisition — intentional: wall-clock latency is what - // matters for diagnosing stutter; lock contention is a real part of that latency. + // Build merged inputs from cached base + planning head snapshot. inputsStart := time.Now() - ms.mu.RLock() - liveInputs, err := ms.fsService.CurrentInputs(ctx) - ms.mu.RUnlock() - inputsDur := time.Since(inputsStart) - tp.lastInputsNanos.Store(inputsDur.Nanoseconds()) - if err != nil { - tp.lastErr.Store(&err) - tp.logger.CWarnf(ctx, "teleop planner: failed to get current inputs: %v", err) - return - } - mergedInputs := make(referenceframe.FrameSystemInputs, len(liveInputs)) - for k, v := range liveInputs { + mergedInputs := make(referenceframe.FrameSystemInputs, len(tp.cachedBaseInputs)) + for k, v := range tp.cachedBaseInputs { mergedInputs[k] = v } - // Overlay planning head entries for the teleop'd arm's frames. for k, v := range planningHead { mergedInputs[k] = v } + inputsDur := time.Since(inputsStart) + tp.lastInputsNanos.Store(inputsDur.Nanoseconds()) - // Build a MoveReq with start_state set to the merged config. - req := tp.buildMoveReq(pose, mergedInputs) + // Collect latest poses from all registered components into a multi-frame goal. + tp.componentsMu.RLock() + goals := make(referenceframe.FrameSystemPoses, len(tp.components)) + var extra map[string]interface{} + for _, comp := range tp.components { + pose := comp.latestPose.Load() + if pose == nil { + continue + } + goals[comp.name] = pose + // Use the first component's extra for planner options (they should be the same). + if extra == nil { + extra = tp.buildExtra(comp.moveReqBase.Extra, mergedInputs) + } + } + tp.componentsMu.RUnlock() - // Call ms.planTeleop with cached frame system and merged inputs. + if len(goals) == 0 { + return + } + + // Plan for all components jointly. planStart := time.Now() ms.mu.RLock() - plan, err := ms.planTeleop(ctx, req, tp.cachedFrameSys, mergedInputs, tp.logger) + plan, err := ms.planTeleopMulti(ctx, goals, extra, tp.cachedFrameSys, mergedInputs, tp.logger) ms.mu.RUnlock() planDur := time.Since(planStart) tp.lastPlanNanos.Store(planDur.Nanoseconds()) - // Includes failed plans; compare with exec_count for success rate. tp.planCount.Add(1) if err != nil { @@ -163,18 +188,14 @@ func (tp *teleopPipeline) planOnce(ctx context.Context, ms *builtIn, pose *refer tp.lastErr.Store(nil) traj := plan.Trajectory() - tp.logger.CInfof(ctx, "teleop planner: inputs took: %s, plan took: %s, traj size: %d", inputsDur, planDur, len(traj)) + tp.logger.CInfof(ctx, "teleop planner: inputs took: %s, plan took: %s, traj size: %d, components: %d", + inputsDur, planDur, len(traj), len(goals)) if len(traj) == 0 { return } - // Re-acquire the write lock to atomically validate that the planning head - // hasn't been reset (by an execution error) while we were planning, update - // it to the last step of this trajectory, and enqueue the trajectory. - // The send must be non-blocking: a blocking send while holding the lock - // would deadlock with resetPlanningHead, which also needs the write lock to - // drain trajCh. If the channel is full the head is left unchanged so the - // next planning iteration re-plans from the same base. + // Atomically validate the planning head hasn't been reset, update it, + // and enqueue the trajectory. lastStep := traj[len(traj)-1] tp.planningHeadMu.Lock() if !planningHeadEqual(tp.planningHead, planningHead) { @@ -186,28 +207,22 @@ func (tp *teleopPipeline) planOnce(ctx context.Context, ms *builtIn, pose *refer case tp.trajCh <- traj: tp.planningHead = lastStep default: - // Executor is busy; leave head unchanged and let the next pose trigger a fresh plan. + // Executor is busy; leave head unchanged and let the next notify trigger a fresh plan. } tp.planningHeadMu.Unlock() } -// buildMoveReq creates a MoveReq from the template with the given destination -// and start_state set to the planning head configuration. -func (tp *teleopPipeline) buildMoveReq( - pose *referenceframe.PoseInFrame, +// buildExtra creates the extra map for planner options with teleop defaults and start_state. +func (tp *teleopPipeline) buildExtra( + baseExtra map[string]interface{}, startConfig referenceframe.FrameSystemInputs, -) motion.MoveReq { - req := tp.moveReqBase - req.Destination = pose - - // Clone Extra to avoid mutating the template. - extra := make(map[string]interface{}, len(tp.moveReqBase.Extra)+1) - for k, v := range tp.moveReqBase.Extra { +) map[string]interface{} { + extra := make(map[string]interface{}, len(baseExtra)+5) + for k, v := range baseExtra { extra[k] = v } - // Build start_state in the format DeserializePlanState expects ([]interface{} - // values, not native []float64) since this path doesn't go through a proto - // round-trip that would convert the types. + + // Build start_state in the format DeserializePlanState expects. confMap := make(map[string]interface{}, len(startConfig)) for fName, inputs := range startConfig { iArr := make([]interface{}, len(inputs)) @@ -218,13 +233,14 @@ func (tp *teleopPipeline) buildMoveReq( } extra["start_state"] = map[string]interface{}{"configuration": confMap} - // Apply teleop-optimized planner defaults. These only set values not - // already present so callers can override via teleop_start extra. + // Apply teleop-optimized planner defaults. These are aggressive because + // teleop movements are tiny and frequent — we trade solution optimality + // for speed. teleopDefaults := map[string]interface{}{ - "timeout": 5.0, // seconds; default is 300 - "max_ik_solutions": 20, // default is 100 - "min_ik_score": 0.05, // default is 0.01 - "frame_step": 0.05, // default is 0.01; reduces trajectory steps from ~14 to ~3-4 + "timeout": 5.0, + "max_ik_solutions": 10, // fewer solutions = faster (was 20) + "min_ik_score": 0.1, // accept worse solutions faster (was 0.05) + "frame_step": 0.1, // fewer trajectory steps (was 0.05) } for k, v := range teleopDefaults { if _, ok := extra[k]; !ok { @@ -232,13 +248,122 @@ func (tp *teleopPipeline) buildMoveReq( } } - req.Extra = extra + // Clear waypoints — not used in teleop. + extra["waypoints"] = nil + + return extra +} - return req +// emaSmooth applies exponential moving average smoothing to joint positions. +// alpha controls responsiveness: 0 = frozen, 1 = no smoothing. +func emaSmooth(target, previous []referenceframe.Input, alpha float64) []referenceframe.Input { + if previous == nil || len(previous) != len(target) || alpha >= 1.0 { + result := make([]referenceframe.Input, len(target)) + copy(result, target) + return result + } + b := 1.0 - alpha + result := make([]referenceframe.Input, len(target)) + for j := range target { + result[j] = alpha*target[j] + b*previous[j] + } + return result +} + +// executeTeleop executes a trajectory by calling GoToInputs on all components +// in parallel. Unlike ms.execute, it skips the step-0 position check (which +// blocks on CurrentInputs gRPC calls) and sends commands to all arms concurrently +// rather than sequentially. +func (tp *teleopPipeline) executeTeleop(ctx context.Context, ms *builtIn, traj motionplan.Trajectory) error { + if len(traj) < 2 { + return nil + } + + // Group inputs per component across all trajectory steps (skip step 0 = start position). + perComponent := make(map[string][][]referenceframe.Input) + for i := 1; i < len(traj); i++ { + for name, inputs := range traj[i] { + if len(inputs) == 0 { + continue + } + perComponent[name] = append(perComponent[name], inputs) + } + } + + // Read teleop config. + // NOTE: caller (runExecutor) already holds ms.mu.RLock, so safe to read ms.conf directly. + smoothAlpha := defaultTeleopSmoothAlpha + interpolateOverride := false + if ms.conf != nil { + if ms.conf.TeleopSmoothAlpha > 0 { + smoothAlpha = ms.conf.TeleopSmoothAlpha + } + interpolateOverride = ms.conf.TeleopInterpolateOverride + } + + // Execute GoToInputs on all components in parallel. + var wg sync.WaitGroup + errs := make([]error, len(perComponent)) + idx := 0 + for name, inputs := range perComponent { + r, ok := ms.components[name] + if !ok { + continue + } + ie, err := utils.AssertType[framesystem.InputEnabled](r) + if err != nil { + continue + } + + // Apply EMA smoothing to joint positions before sending (main goroutine only). + smoothed := make([][]referenceframe.Input, len(inputs)) + prev := tp.smoothedJoints[name] + for k, step := range inputs { + smoothed[k] = emaSmooth(step, prev, smoothAlpha) + prev = smoothed[k] + } + tp.smoothedJoints[name] = prev + + wg.Add(1) + go func(i int, ie framesystem.InputEnabled, smoothed [][]referenceframe.Input, r resource.Resource) { + defer wg.Done() + var err error + if armComp, ok := r.(arm.Arm); ok { + if interpolateOverride { + err = armComp.MoveThroughJointPositions(ctx, smoothed, nil, map[string]interface{}{ + "waitAtEnd": true, + "interpolate": true, + }) + } else { + err = armComp.MoveThroughJointPositions(ctx, smoothed, nil, map[string]interface{}{ + "waitAtEnd": false, + "interpolate": false, + }) + } + } else { + err = ie.GoToInputs(ctx, smoothed...) + } + if err != nil { + if actuator, ok := r.(inputEnabledActuator); ok { + _ = actuator.Stop(ctx, nil) + } + errs[i] = err + } + }(idx, ie, smoothed, r) + idx++ + } + wg.Wait() + + for _, err := range errs { + if err != nil { + return err + } + } + return nil } // runExecutor is the executor goroutine. It reads trajectories from trajCh -// and executes them on the arm via ms.execute. +// and executes them in parallel across all components. func (tp *teleopPipeline) runExecutor(ctx context.Context, ms *builtIn) { var lastExecEnd time.Time var totalCycle time.Duration @@ -253,14 +378,11 @@ func (tp *teleopPipeline) runExecutor(ctx context.Context, ms *builtIn) { tp.lastExecWaitNanos.Store(waitDur.Nanoseconds()) execStart := time.Now() - // Skip start-position check (math.MaxFloat64) because the arm - // is in continuous motion and won't be exactly at the trajectory start. ms.mu.RLock() - err := ms.execute(ctx, traj, math.MaxFloat64) + err := tp.executeTeleop(ctx, ms, traj) ms.mu.RUnlock() execDur := time.Since(execStart) tp.lastExecNanos.Store(execDur.Nanoseconds()) - // Includes failed executions; compare with plan_count for pipeline health. tp.execCount.Add(1) if !lastExecEnd.IsZero() { @@ -281,16 +403,20 @@ func (tp *teleopPipeline) runExecutor(ctx context.Context, ms *builtIn) { tp.resetPlanningHead(ctx, ms) } else { tp.lastErr.Store(nil) + // Update planning head to smoothed positions (what was actually + // sent to the arm) so the next plan starts from reality. + tp.planningHeadMu.Lock() + for name, joints := range tp.smoothedJoints { + tp.planningHead[name] = joints + } + tp.planningHeadMu.Unlock() } } } } // resetPlanningHead sets the planning head to the arm's actual current position -// after an execution error. Resetting the planning head invalidates all previously -// planned trajectories: any trajectory in trajCh was chained from the old (now -// incorrect) head. The drain of trajCh and the head reset are held under the same -// write lock so that planOnce cannot enqueue a stale trajectory between them. +// after an execution error. func (tp *teleopPipeline) resetPlanningHead(ctx context.Context, ms *builtIn) { ms.mu.RLock() defer ms.mu.RUnlock() @@ -310,60 +436,107 @@ func (tp *teleopPipeline) resetPlanningHead(ctx context.Context, ms *builtIn) { tp.planningHeadMu.Unlock() } -// stop shuts down the pipeline goroutines and best-effort stops the arm. +// stop shuts down the pipeline goroutines. func (tp *teleopPipeline) stop(ctx context.Context, ms *builtIn) { tp.workers.Stop() } -// startTeleopPipeline creates and starts a new teleop pipeline. -func (ms *builtIn) startTeleopPipeline(cmdCtx context.Context, req motion.MoveReq) error { - // Stop any existing pipeline first. +// addTeleopComponent adds a component to the teleop pipeline, creating the pipeline if needed. +func (ms *builtIn) addTeleopComponent(cmdCtx context.Context, req motion.MoveReq) error { ms.teleopMu.Lock() - if ms.teleopPipeline != nil { - ms.teleopPipeline.stop(cmdCtx, ms) - } defer ms.teleopMu.Unlock() ms.mu.RLock() - fsInputs, err := ms.fsService.CurrentInputs(cmdCtx) - if err != nil { - ms.mu.RUnlock() - return err - } - - // Validate the command. + // Validate the component. if _, ok := ms.components[req.ComponentName]; !ok || req.Destination == nil { ms.mu.RUnlock() - return fmt.Errorf("Component must exist and destination must be set. Component: %v Destination: %v", + return fmt.Errorf("component must exist and destination must be set. Component: %v Destination: %v", req.ComponentName, req.Destination) } - // Build and cache the frame system once for the lifetime of this pipeline. - // The kinematic structure doesn't change during teleop; Reconfigure() stops - // the pipeline before any config changes. - frameSys, err := ms.getFrameSystem(cmdCtx, req.WorldState.Transforms()) - if err != nil { + if ms.teleopPipeline == nil { + // Create a new pipeline. + fsInputs, err := ms.fsService.CurrentInputs(cmdCtx) + if err != nil { + ms.mu.RUnlock() + return err + } + + frameSys, err := ms.getFrameSystem(cmdCtx, req.WorldState.Transforms()) + if err != nil { + ms.mu.RUnlock() + return err + } ms.mu.RUnlock() - return err + + tp := &teleopPipeline{ + logger: ms.logger.Sublogger("teleop"), + cachedFrameSys: frameSys, + cachedBaseInputs: fsInputs, + components: make(map[string]*teleopComponent), + notify: make(chan struct{}, 1), + trajCh: make(chan motionplan.Trajectory, 1), + planningHead: fsInputs, + smoothedJoints: make(map[string][]referenceframe.Input), + } + + comp := &teleopComponent{ + name: req.ComponentName, + moveReqBase: req, + } + comp.latestPose.Store(req.Destination) + tp.components[req.ComponentName] = comp + + // Send initial notification to trigger first plan. + trySendNotify(tp.notify) + + tp.workers = goutils.NewBackgroundStoppableWorkers( + func(pipelineCtx context.Context) { tp.runPlanner(pipelineCtx, ms) }, + func(pipelineCtx context.Context) { tp.runExecutor(pipelineCtx, ms) }, + ) + + ms.teleopPipeline = tp + } else { + ms.mu.RUnlock() + + // Add component to existing pipeline. + tp := ms.teleopPipeline + tp.componentsMu.Lock() + comp := &teleopComponent{ + name: req.ComponentName, + moveReqBase: req, + } + comp.latestPose.Store(req.Destination) + tp.components[req.ComponentName] = comp + tp.componentsMu.Unlock() + + // Trigger a replan with the new component included. + trySendNotify(tp.notify) } - ms.mu.RUnlock() - ms.teleopPipeline = &teleopPipeline{ - logger: ms.logger.Sublogger("teleop"), - moveReqBase: req, - cachedFrameSys: frameSys, - poseCh: make(chan *referenceframe.PoseInFrame, 1), - trajCh: make(chan motionplan.Trajectory, 1), - planningHead: fsInputs, + return nil +} + +// removeTeleopComponent removes a component from the pipeline. +// If no components remain, the pipeline is stopped. +func (ms *builtIn) removeTeleopComponent(ctx context.Context, componentName string) { + ms.teleopMu.Lock() + defer ms.teleopMu.Unlock() + + tp := ms.teleopPipeline + if tp == nil { + return } - ms.teleopPipeline.poseCh <- req.Destination - ms.teleopPipeline.workers = goutils.NewBackgroundStoppableWorkers( - func(pipelineCtx context.Context) { ms.teleopPipeline.runPlanner(pipelineCtx, ms) }, - func(pipelineCtx context.Context) { ms.teleopPipeline.runExecutor(pipelineCtx, ms) }, - ) + tp.componentsMu.Lock() + delete(tp.components, componentName) + remaining := len(tp.components) + tp.componentsMu.Unlock() - return nil + if remaining == 0 { + tp.stop(ctx, ms) + ms.teleopPipeline = nil + } } // handleTeleopCommand handles teleop DoCommand requests. @@ -400,7 +573,7 @@ func (ms *builtIn) handleTeleopCommand( return nil, true, err } - if err := ms.startTeleopPipeline(ctx, moveReq); err != nil { + if err := ms.addTeleopComponent(ctx, moveReq); err != nil { return nil, true, err } @@ -409,6 +582,8 @@ func (ms *builtIn) handleTeleopCommand( } if req, ok := cmd[DoTeleopMove]; ok { + componentName, _ := cmd["component_name"].(string) + ms.teleopMu.RLock() tp := ms.teleopPipeline ms.teleopMu.RUnlock() @@ -426,23 +601,49 @@ func (ms *builtIn) handleTeleopCommand( } pif := referenceframe.ProtobufToPoseInFrame(&pifProto) + + // Update the component's latest pose. + tp.componentsMu.RLock() + comp := tp.components[componentName] + // Backward compat: if no component_name and exactly one component, use it. + if comp == nil && componentName == "" && len(tp.components) == 1 { + for _, c := range tp.components { + comp = c + } + } + tp.componentsMu.RUnlock() + + if comp == nil { + return nil, true, fmt.Errorf("component %q not registered in teleop pipeline", componentName) + } + if seq, ok := cmd["seq"]; ok { if seqF, ok := seq.(float64); ok { - tp.logger.CDebugf(ctx, "teleop received seq=%d", int64(seqF)) + tp.logger.CDebugf(ctx, "teleop received component=%s seq=%d", comp.name, int64(seqF)) } } - trySendLatest(tp.poseCh, pif) + + comp.latestPose.Store(pif) + trySendNotify(tp.notify) + resp[DoTeleopMove] = true return resp, true, nil } if _, ok := cmd[DoTeleopStop]; ok { - ms.teleopMu.Lock() - if ms.teleopPipeline != nil { - ms.teleopPipeline.stop(ctx, ms) - ms.teleopPipeline = nil + componentName, _ := cmd["component_name"].(string) + + if componentName == "" { + // Backward compat: stop entire pipeline. + ms.teleopMu.Lock() + if ms.teleopPipeline != nil { + ms.teleopPipeline.stop(ctx, ms) + ms.teleopPipeline = nil + } + ms.teleopMu.Unlock() + } else { + ms.removeTeleopComponent(ctx, componentName) } - ms.teleopMu.Unlock() resp[DoTeleopStop] = true return resp, true, nil @@ -455,13 +656,12 @@ func (ms *builtIn) handleTeleopCommand( if tp == nil { return map[string]any{ - "running": tp != nil, + DoTeleopStatus: map[string]any{"running": false}, }, true, nil } status := map[string]any{ - "queued_poses": len(tp.poseCh), - "queued_plans": len(tp.trajCh), + "running": true, "last_inputs_ms": float64(tp.lastInputsNanos.Load()) / 1e6, "last_plan_ms": float64(tp.lastPlanNanos.Load()) / 1e6, "last_exec_ms": float64(tp.lastExecNanos.Load()) / 1e6, @@ -473,8 +673,15 @@ func (ms *builtIn) handleTeleopCommand( status["error"] = (*lastErr).Error() } - resp[DoTeleopStatus] = status + tp.componentsMu.RLock() + compNames := make([]string, 0, len(tp.components)) + for name := range tp.components { + compNames = append(compNames, name) + } + tp.componentsMu.RUnlock() + status["components"] = compNames + resp[DoTeleopStatus] = status return resp, true, nil }