Skip to content
Draft
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 109 additions & 3 deletions examples/customresources/demos/optionaldepsmodule/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"errors"
"fmt"
"sync/atomic"

"go.viam.com/rdk/components/generic"
"go.viam.com/rdk/components/motor"
Expand All @@ -18,8 +19,10 @@ import (
)

var (
fooModel = resource.NewModel("acme", "demo", "foo")
mocModel = resource.NewModel("acme", "demo", "moc" /* "mutual optional child" */)
fooModel = resource.NewModel("acme", "demo", "foo")
mocModel = resource.NewModel("acme", "demo", "moc" /* "mutual optional child" */)
pointerTargetModel = resource.NewModel("acme", "demo", "pointer-target")
pointerHolderModel = resource.NewModel("acme", "demo", "pointer-holder")
)

func main() {
Expand All @@ -29,9 +32,17 @@ func main() {
resource.RegisterComponent(generic.API, mocModel, resource.Registration[resource.Resource, *MutualOptionalChildConfig]{
Constructor: newMutualOptionalChild,
})
resource.RegisterComponent(generic.API, pointerTargetModel, resource.Registration[resource.Resource, *PointerTargetConfig]{
Constructor: newPointerTarget,
})
resource.RegisterComponent(generic.API, pointerHolderModel, resource.Registration[resource.Resource, *PointerHolderConfig]{
Constructor: newPointerHolder,
})

module.ModularMain(resource.APIModel{generic.API, fooModel},
resource.APIModel{generic.API, mocModel})
resource.APIModel{generic.API, mocModel},
resource.APIModel{generic.API, pointerTargetModel},
resource.APIModel{generic.API, pointerHolderModel})
}

// FooConfig contains a required and optional motor that the component will necessarily
Expand Down Expand Up @@ -243,3 +254,98 @@ func (moc *mutualOptionalChild) DoCommand(ctx context.Context, req map[string]an
// `moc` is notably missing a `Reconfigure` method. Modular resources with optional
// dependencies should be able to leverage optional dependencies even as "always rebuild"
// resources.

// PointerTargetConfig configures a pointer-target: a module-served resource that carries
// an optional dependency, making it eligible for rebuild by the RDK's
// updateWeakAndOptionalDependents flow. Another module-internal resource (a pointer-holder)
// can capture a direct Go pointer to it and exercise stale-pointer scenarios when the
// target is rebuilt via the weak/optional path.
type PointerTargetConfig struct {
OptionalDep string `json:"optional_dep"`
}

// Validate returns the optional dep so updateWeakAndOptionalDependents considers this
// resource for rebuilding.
func (c *PointerTargetConfig) Validate(path string) ([]string, []string, error) {
var optionalDeps []string
if c.OptionalDep != "" {
optionalDeps = append(optionalDeps, c.OptionalDep)
}
return nil, optionalDeps, nil
}

// pointerTargetInstanceCounter gives each pointerTarget instance a unique ID so tests can
// verify whether a pointer-holder's captured Go pointer references the latest instance or
// a stale one.
var pointerTargetInstanceCounter atomic.Uint64

// pointerTarget tracks whether Close was called and carries a unique instance ID.
// DoCommand returns an error if called after Close — this is what lets a dependent detect
// a stale pointer to a closed instance. When alive, it returns its instance ID so callers
// can verify they are talking to the latest instance.
type pointerTarget struct {
resource.Named
resource.AlwaysRebuild
instanceID uint64
closed atomic.Bool
}

func newPointerTarget(_ context.Context, _ resource.Dependencies, conf resource.Config, _ logging.Logger) (resource.Resource, error) {
return &pointerTarget{
Named: conf.ResourceName().AsNamed(),
instanceID: pointerTargetInstanceCounter.Add(1),
}, nil
}

func (p *pointerTarget) Close(_ context.Context) error {
p.closed.Store(true)
return nil
}

func (p *pointerTarget) DoCommand(_ context.Context, _ map[string]any) (map[string]any, error) {
if p.closed.Load() {
return nil, errors.New("pointerTarget is closed")
}
return map[string]any{"instance_id": p.instanceID}, nil
}

// PointerHolderConfig configures a pointer-holder: a resource that explicitly depends on
// a pointer-target and captures a direct Go pointer to it at construction time.
type PointerHolderConfig struct {
Target string `json:"target"`
}

// Validate declares target as a required dep so it gets resolved via `deps` at
// construction time and a Go pointer can be captured.
func (c *PointerHolderConfig) Validate(path string) ([]string, []string, error) {
if c.Target == "" {
return nil, nil, fmt.Errorf(`expected "target" attribute for pointer-holder %q`, path)
}
return []string{c.Target}, nil, nil
}

// pointerHolder holds a direct Go pointer to a pointerTarget captured at construction time.
// Its DoCommand proxies through the stored pointer — this is the pattern that leaves the
// holder with a stale reference if the target is rebuilt without notifying the holder.
type pointerHolder struct {
resource.Named
resource.TriviallyCloseable
resource.AlwaysRebuild
target resource.Resource
}

func newPointerHolder(_ context.Context, deps resource.Dependencies, conf resource.Config, _ logging.Logger) (resource.Resource, error) {
cfg, err := resource.NativeConfig[*PointerHolderConfig](conf)
if err != nil {
return nil, err
}
target, ok := deps[generic.Named(cfg.Target)]
if !ok {
return nil, fmt.Errorf("pointer-holder could not find target %q in dependencies", cfg.Target)
}
return &pointerHolder{Named: conf.ResourceName().AsNamed(), target: target}, nil
}

func (p *pointerHolder) DoCommand(ctx context.Context, cmd map[string]any) (map[string]any, error) {
return p.target.DoCommand(ctx, cmd)
}
12 changes: 6 additions & 6 deletions module/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ type Module struct {
// registerMu protects the maps immediately below as resources/streams come in and out of existence
registerMu sync.Mutex
collections map[resource.API]resource.APIResourceCollection[resource.Resource]
// internalDeps is keyed by a "child" resource and its values are "internal" resources that
// depend on the child. We use a pointer for the value such that it's stable across map growth.
// Similarly, the slice of `resConfigureArgs` can grow, hence we must use pointers such that
// modifiying in place remains valid.
internalDeps map[resource.Resource][]resConfigureArgs
// internalDeps is keyed by a "child" resource name and its values are "internal" resources that
// depend on the child. Keying by name (rather than object pointer) ensures that the tracking
// survives remove+re-add cycles: when a resource is rebuilt, its name is stable even though
// the Go object pointer changes.
internalDeps map[resource.Name][]resConfigureArgs
resLoggers map[resource.Resource]logging.Logger
activeResourceStreams map[resource.Name]peerResourceState
streamSourceByName map[resource.Name]rtppassthrough.Source
Expand Down Expand Up @@ -186,7 +186,7 @@ func NewModule(ctx context.Context, address string, logger logging.Logger) (*Mod
handlers: HandlerMap{},
collections: map[resource.API]resource.APIResourceCollection[resource.Resource]{},
resLoggers: map[resource.Resource]logging.Logger{},
internalDeps: map[resource.Resource][]resConfigureArgs{},
internalDeps: map[resource.Name][]resConfigureArgs{},
}

if tracingEnabled {
Expand Down
130 changes: 112 additions & 18 deletions module/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,14 +373,83 @@ func (m *Module) addResource(
// Dan: We could call `m.getLocalResource(dep.Name())` but that's just a linear scan over
// resLoggers.
if _, exists := m.resLoggers[dep]; exists {
m.internalDeps[dep] = append(m.internalDeps[dep], resConfigureArgs{
depName := dep.Name()
m.internalDeps[depName] = append(m.internalDeps[depName], resConfigureArgs{
toReconfig: res,
conf: conf,
depStrings: depStrings,
})
}
}

// Cascade-rebuild any existing module-internal resources whose Go pointer to the newly-added
// resource is now stale. This handles the case where the RDK removes and re-adds a dependency
// where the dependent was built with an old Go pointer and must be rebuilt to hold a
// reference to the new parent object.
//
// The RDK's updateWeakAndOptionalDependents flow rebuilds a modular resource via
// RemoveResource + AddResource on the module but omits the markChildrenForUpdate call
// so the RDK never notifies dependents. If a module-internal dependent (e.g. a gripper
// that depends_on the arm) holds a direct Go pointer to the rebuilt resource, that
// pointer is left stale. This cascade closes the gap on the module side.
//
// If cascading would transitively circle back to the resource we just added (a mutual optional-dep
// cycle of any length), the recursion would keep rebuilding each member of the cycle without ever
// returning. We prevent that by seeding a `visited` set with the just-added resource and threading
// it through the rebuild recursion; any dependent whose name is already on the cascade stack is
// skipped. One side of the cycle ends up with a stale pointer.
newResName := conf.ResourceName()
if staleDependents, ok := m.internalDeps[newResName]; ok {
visited := map[resource.Name]struct{}{newResName: {}}
for _, args := range staleDependents {
depColl, collOk := m.collections[args.conf.API]
if !collOk {
continue
}
if _, err := depColl.Resource(args.conf.ResourceName().Name); err != nil {
// Dependent no longer registered in the collection; nothing to rebuild.
continue
}
if _, cycled := visited[args.conf.ResourceName()]; cycled {
// Cycle: this dependent is the just-added resource itself or
// has been added to the cascade stack by an outer caller.
// Skip — one side of the cycle will end up with a stale pointer
m.logger.Warnw("skipping cascade rebuild due to mutual-dep cycle; dependent will hold a stale pointer until next reconfigure",
"changedResource", newResName, "dependent", args.conf.Name)
continue
}
freshDeps, err := m.getDependenciesForConstruction(ctx, args.depStrings)
if err != nil {
m.logger.Warnw("failed to get deps for cascade rebuild after resource re-add",
"changedResource", newResName, "dependent", args.conf.Name, "err", err)
continue
}
m.registerMu.Unlock()
newDependentRes, err := m.rebuildResourceWithVisited(ctx, freshDeps, args.conf, nil, visited)
m.registerMu.Lock()
if err != nil {
m.logger.Warnw("failed to cascade rebuild dependent after resource re-add",
"changedResource", newResName, "dependent", args.conf.Name, "err", err)
continue
}
// Record the new dependent pointer in our entry so internalDeps stays in sync
// with the collection. This keeps `toReconfig` accurate for any downstream code
// that reads it (e.g. removeResource's filter loop matches entries by pointer
// identity — if we left toReconfig at the pre-rebuild pointer, a subsequent
// remove of the newly-rebuilt dependent wouldn't clean up this entry).
if newDependentRes != nil {
dependentName := args.conf.ResourceName()
entries := m.internalDeps[newResName]
for j := range entries {
if entries[j].conf.ResourceName() == dependentName {
entries[j].toReconfig = newDependentRes
break
}
}
}
Comment on lines +432 to +446
Copy link
Copy Markdown
Member Author

@danielbotros danielbotros Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The state management of the internalDeps here and below in removeResource is a overly complicated imo and can be simplified by turning it into a forward index but keeping this PR scoped.

}
}

return nil
}

Expand Down Expand Up @@ -416,18 +485,21 @@ func (m *Module) removeResource(ctx context.Context, resName resource.Name) erro
delete(m.activeResourceStreams, res.Name())
delete(m.resLoggers, res)

// The viam-server forbids removing a resource until dependents are first closed/removed. Hence
// it's safe to assume the value in the map for `res` is empty and simply remove the map entry.q
delete(m.internalDeps, res)

for dep, chainReconfiguresPtr := range m.internalDeps {
chainReconfigures := chainReconfiguresPtr
for idx, chainRes := range chainReconfigures {
if res == chainRes.toReconfig {
// Clear the removed resource from any chain of reconfigures it appears in.
m.internalDeps[dep] = append(chainReconfigures[:idx], chainReconfigures[idx+1:]...)
// Clear the removed resource from any dependency chain it appears in as a dependent. We do NOT
// delete the name-keyed entry for `res` itself — if `res` is re-added later, the cascade step
// in addResource needs that entry to find stale dependents and rebuild them.
for depName, chainReconfigures := range m.internalDeps {
filtered := chainReconfigures[:0]
for _, chainRes := range chainReconfigures {
if chainRes.toReconfig != res {
filtered = append(filtered, chainRes)
}
}
if len(filtered) == 0 {
delete(m.internalDeps, depName)
} else {
m.internalDeps[depName] = filtered
}
}

return coll.Remove(resName)
Expand All @@ -438,6 +510,21 @@ func (m *Module) removeResource(ctx context.Context, resName resource.Name) erro
func (m *Module) rebuildResource(
ctx context.Context, deps resource.Dependencies, conf *resource.Config, logLevel *logging.Level,
) (resource.Resource, error) {
return m.rebuildResourceWithVisited(ctx, deps, conf, logLevel, map[resource.Name]struct{}{})
}

// rebuildResourceWithVisited is the recursive body of rebuildResource. It tracks which
// resources are currently on the cascade stack (in `visited`) to prevent infinite recursion
// through mutual-dep cycles. The current resource is added to `visited` on entry and
// removed on exit (stack discipline) so that sibling branches of an outer cascade can be
// rebuilt independently.
func (m *Module) rebuildResourceWithVisited(
ctx context.Context, deps resource.Dependencies, conf *resource.Config, logLevel *logging.Level,
visited map[resource.Name]struct{},
) (resource.Resource, error) {
visited[conf.ResourceName()] = struct{}{}
defer delete(visited, conf.ResourceName())

m.registerMu.Lock()
coll, ok := m.collections[conf.API]
if !ok {
Expand Down Expand Up @@ -493,10 +580,18 @@ func (m *Module) rebuildResource(
m.streamSourceByName[res.Name()] = p
}

depsToRebuild := m.internalDeps[res]
// Build up a new slice to map `m.internalDeps[newRes]` to.
// Key by resource name (stable across rebuilds) rather than object pointer.
resName := res.Name()
depsToRebuild := m.internalDeps[resName]
// Build up a new slice to map `m.internalDeps[resName]` to.
newDepsToRebuild := make([]resConfigureArgs, 0, len(depsToRebuild))
for _, depToReconfig := range depsToRebuild {
if _, cycled := visited[depToReconfig.conf.ResourceName()]; cycled {
// This dependent is already on the cascade stack. Rebuilding would close
// a resource that an outer caller still holds. Keep the entry unchanged.
newDepsToRebuild = append(newDepsToRebuild, depToReconfig)
continue
}
// We are going to modify `toReconfig` at the end. Make sure changes to `dependentResConfigureArgs`
// get reflected in the slice.
deps, err := m.getDependenciesForConstruction(ctx, depToReconfig.depStrings)
Expand All @@ -512,12 +607,12 @@ func (m *Module) rebuildResource(
// We release the `registerMu` to let other resource query/acquisition methods make
// progress. We do not assume `rebuildResource` is fast.
//
// We also release the mutex as the recursive call to `rebuildResource` will reacquire
// it. And the mutex is not reentrant.
// We also release the mutex as the recursive call to `rebuildResourceWithVisited`
// will reacquire it. And the mutex is not reentrant.
m.registerMu.Unlock()

var nilLogLevel *logging.Level // pass in nil to avoid changing the log level
rebuiltRes, err := m.rebuildResource(ctx, deps, depToReconfig.conf, nilLogLevel)
rebuiltRes, err := m.rebuildResourceWithVisited(ctx, deps, depToReconfig.conf, nilLogLevel, visited)
if err != nil {
m.logger.Warn("Failed to cascade dependent reconfigure",
"changedResource", conf.Name,
Expand All @@ -533,8 +628,7 @@ func (m *Module) rebuildResource(
})
}

m.internalDeps[newRes] = newDepsToRebuild
delete(m.internalDeps, res)
m.internalDeps[resName] = newDepsToRebuild
m.registerMu.Unlock()

return newRes, nil
Expand Down
Loading
Loading