Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
295 changes: 288 additions & 7 deletions peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"log/slog"
"math/rand/v2"
"runtime/debug"
"sync"
"time"

Expand Down Expand Up @@ -59,14 +60,15 @@ type Status struct {
}

// Config plumbs in dependencies. Zero-valued fields fall back to production
// defaults; HeartbeatInterval and HeartbeatTimeout exist so tests can drive
// the loop without sleeping a full minute.
// defaults; HeartbeatInterval, HeartbeatTimeout, and CredRotationInterval
// exist so tests can drive the loops without sleeping a full minute / hour.
type Config struct {
API *API
NewForwarder func(ctx context.Context) (portForwarder, error)
BuildBoxService boxFactory
HeartbeatInterval time.Duration
HeartbeatTimeout time.Duration
API *API
NewForwarder func(ctx context.Context) (portForwarder, error)
BuildBoxService boxFactory
HeartbeatInterval time.Duration
HeartbeatTimeout time.Duration
CredRotationInterval time.Duration
}

// Client orchestrates one peer-proxy session: open UPnP port → register with
Expand Down Expand Up @@ -97,8 +99,38 @@ type Client struct {
forwarder portForwarder
box boxService
routeID string
// externalPort / internalPort persist the port mapping picked at
// Start so the cred-rotation loop can re-register against the same
// (address, port) tuple without re-probing UPnP / re-mapping. The
// router-side mapping itself stays put across rotations; only the
// samizdat creds and route_id rotate.
externalPort uint16
internalPort uint16
// boxOptions is the fresh options string passed to BuildBoxService,
// kept for diagnostics and so the rotation path doesn't need to
// re-derive it from the (also-stored) box reference.
boxOptions string
// runCtx is captured here for the cred-rotation goroutine to bind
// the new libbox lifetime to the same context as the original Start.
// Stop's cancelRun() teardown still applies to the rebuilt box.
runCtx context.Context
}

// peerCredRotationInterval bounds how long a leaked samizdat
// credential remains usable. At each tick the peer re-registers with
// lantern-cloud (new route_id, new keypair, new shortID), rebuilds the
// libbox service against the new options, and deregisters the prior
// route. Caps blast radius from credential leakage (logs, telemetry,
// memory dumps, the H2 leakage path in engineering#3440) to ~1h
// regardless of peer process lifetime.
Comment thread
myleshorton marked this conversation as resolved.
Outdated
//
// Cost per rotation: one API.Register + Deregister round trip, one
// libbox build + start + close cycle. Brief (~hundreds-of-ms) port-
// rebind window during the swap; samizdat clients see TCP RST and
// reconnect via the bandit. Acceptable trade-off vs. holding the same
// cred for the full peer process lifetime.
const peerCredRotationInterval = 1 * time.Hour

// peerCleanupTimeout caps how long Start's rollback path waits for
// Deregister / UnmapPort. Cleanup uses a fresh Background context (not the
// caller's ctx) so an already-canceled or expired Start ctx doesn't skip
Expand Down Expand Up @@ -253,6 +285,10 @@ func (c *Client) Start(ctx context.Context) error {
c.forwarder = fwd
c.box = box
c.routeID = regResp.RouteID
c.externalPort = mapping.ExternalPort
c.internalPort = mapping.InternalPort
c.boxOptions = options
c.runCtx = runCtx
c.cancelRun = cancelRun
c.runDone = runDone
c.status = Status{
Expand All @@ -265,8 +301,14 @@ func (c *Client) Start(ctx context.Context) error {
statusSnapshot := c.status
c.mu.Unlock()

rotation := c.cfg.CredRotationInterval
if rotation == 0 {
rotation = peerCredRotationInterval
}

fwd.StartRenewal(runCtx)
go c.heartbeatLoop(runCtx, heartbeat, runDone)
go c.credRotationLoop(runCtx, rotation)

slog.Info("peer client started",
"external_ip", externalIP,
Expand Down Expand Up @@ -317,6 +359,10 @@ func (c *Client) Stop(ctx context.Context) error {
c.forwarder = nil
c.box = nil
c.routeID = ""
c.externalPort = 0
c.internalPort = 0
c.boxOptions = ""
c.runCtx = nil
c.status = Status{}
c.mu.Unlock()

Expand Down Expand Up @@ -385,6 +431,20 @@ func (c *Client) heartbeatLoop(ctx context.Context, interval time.Duration, done
// later heartbeat as a 404.
slog.Warn("peer heartbeat failed", "err", err, "route_id", routeID)
if isNotRegistered(err) {
// Re-check current routeID under lock. If credRotationLoop
// swapped routeID + deregistered the prior route between
// our heartbeat-prepare and heartbeat-response, the 404
// applies to a stale route and is expected, not a reason
// to stop. Skip the auto-Stop and let the next tick
// heartbeat the new route.
c.mu.Lock()
currentRouteID := c.routeID
c.mu.Unlock()
if currentRouteID != routeID {
slog.Info("peer heartbeat 404 on stale route_id; rotation in flight, continuing",
"stale_route_id", routeID, "current_route_id", currentRouteID)
continue
}
slog.Info("peer route no longer registered server-side, stopping client")
// Stop runs in a separate goroutine to avoid the cyclic
// Stop → cancelRun → loop-exit deadlock.
Expand All @@ -408,6 +468,227 @@ func isNotRegistered(err error) bool {
return errors.As(err, &apiErr) && apiErr.Status == 404
}

// credRotationLoop periodically rotates the peer's samizdat credentials
// (X25519 keypair, shortID, masquerade) by re-registering with
// lantern-cloud, rebuilding the libbox inbound, and deregistering the
// prior route. Caps blast radius from credential leakage to ~interval
// regardless of peer process lifetime — see peerCredRotationInterval.
//
// Closes done is the responsibility of heartbeatLoop; this loop just
// exits when ctx is cancelled. We deliberately don't add another close
// channel: heartbeatLoop's done already gates Stop, and rotation
// failures are non-fatal (log + retry next tick), so there's nothing
// the Stop path needs to wait on from this goroutine.
func (c *Client) credRotationLoop(ctx context.Context, interval time.Duration) {
Comment thread
myleshorton marked this conversation as resolved.
// Non-positive interval would panic time.NewTicker. Treat it the same
// as the zero case Start handles when CredRotationInterval is unset:
// fall back to the default cap rather than disabling rotation
// silently (an unset value still wants rotation; a negative value is
// almost certainly a test/config bug).
if interval <= 0 {
interval = peerCredRotationInterval
}
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
c.runRotation(ctx)
}
}
}

// runRotation wraps rotateCreds with a panic recover. libbox.Start can
// panic (see the recover in vpn/tunnel.go's tunnel-start path); an
// unrecovered panic here would crash the host process during a
// background rotation, taking the user's main VPN with it. Treat a
// panic the same as any other rotation failure: log, keep the existing
// box serving, try again next tick.
Comment thread
myleshorton marked this conversation as resolved.
Outdated
func (c *Client) runRotation(ctx context.Context) {
defer func() {
if r := recover(); r != nil {
slog.Error("peer cred rotation panicked; current creds remain in use",
"panic", r, "stack", string(debug.Stack()))
}
}()
if err := c.rotateCreds(ctx); err != nil {
// Don't kill the loop on a single failure — current
// box / route is still serving. Try again next tick.
slog.Warn("peer cred rotation failed; current creds remain in use", "err", err)
}
}

// rotateCreds atomically swaps the peer's samizdat credentials. On
// success: a fresh route_id and keypair are in use, the libbox inbound
// has been rebuilt against the new options, the prior route is
// deregistered server-side, and the FlutterEvent stream sees no gap.
// On failure: the prior creds and box continue serving — rotation is
// best-effort. The router-side port mapping is preserved across the
Comment thread
myleshorton marked this conversation as resolved.
Outdated
// rotation; only the in-process samizdat state changes.
//
// Sequence:
// 1. Re-register with the same (externalIP, externalPort) as Start.
// 2. Patch the new server-supplied options for VPN bypass.
// 3. Build a new libbox service against the new options.
// 4. Close the old box (releases the listening port).
// 5. Start the new box (re-binds the same port, now with new creds).
// 6. Atomic swap: c.box, c.routeID, c.boxOptions point at the new box.
// 7. Best-effort deregister of the prior route_id so the bandit
// catalog stops handing the old (now-invalid) creds to clients.
//
// Steps 4-5 leave a brief (~hundreds of ms) window where the port
// isn't bound; samizdat clients see TCP RST and reconnect. Acceptable
// trade-off vs. the security cost of holding the same cred for the
// peer process lifetime.
func (c *Client) rotateCreds(ctx context.Context) error {
c.mu.Lock()
if !c.active {
c.mu.Unlock()
return errors.New("not active")
}
fwd := c.forwarder
extPort := c.externalPort
intPort := c.internalPort
oldRouteID := c.routeID
oldBox := c.box
c.mu.Unlock()

if fwd == nil || oldBox == nil {
return errors.New("rotateCreds: client state inconsistent")
}

externalIP, err := fwd.ExternalIP(ctx)
if err != nil {
return fmt.Errorf("get external ip: %w", err)
}
regResp, err := c.cfg.API.Register(ctx, RegisterRequest{
ExternalIP: externalIP,
ExternalPort: extPort,
InternalPort: intPort,
})
if err != nil {
return fmt.Errorf("re-register: %w", err)
}
// From here on, any error path must deregister regResp.RouteID —
// otherwise the newly-created server-side row leaks until TTL expiry
// and the bandit catalog may briefly hand out creds for a route
// whose box never came up.
cleanupNewRoute := func(reason error) {
// Use a fresh ctx so a cancelled rotation ctx doesn't skip the
// cleanup we just made necessary.
cleanupCtx, cancel := context.WithTimeout(context.Background(), peerCleanupTimeout)
defer cancel()
if dErr := c.cfg.API.Deregister(cleanupCtx, regResp.RouteID); dErr != nil {
slog.Warn("deregister orphan route after rotation failure",
"reason", reason, "err", dErr, "orphan_route_id", regResp.RouteID)
}
}

options, err := ensurePeerOutboundsBypassVPN(regResp.ServerConfig)
if err != nil {
cleanupNewRoute(err)
return fmt.Errorf("patch sing-box options: %w", err)
}

c.mu.Lock()
runCtx := c.runCtx
c.mu.Unlock()
if runCtx == nil {
// Stop happened between the unlock above and here. Skip the
// build to avoid spinning up a libbox tied to a torn-down ctx,
// and clean up the just-created route so it doesn't linger.
cleanupNewRoute(errors.New("client stopped during rotation"))
return errors.New("client stopped during rotation")
}
newBox, err := c.cfg.BuildBoxService(runCtx, options)
if err != nil {
cleanupNewRoute(err)
return fmt.Errorf("build new sing-box: %w", err)
}
Comment thread
myleshorton marked this conversation as resolved.

// Close old, start new. Order matters — both want the same port.
// If newBox.Start fails after oldBox.Close, retry briefly to absorb
// router-side TIME_WAIT / EADDRINUSE windows before giving up.
if closeErr := oldBox.Close(); closeErr != nil {
slog.Warn("close old box during rotation", "err", closeErr)
}
if err := startNewBoxWithRetry(ctx, newBox); err != nil {
// Catastrophic: port is now unbound. Leave c.box pointing at
// oldBox so a future Stop tries to close it (idempotent on
// already-closed); the next rotation tick will try again. Also
// deregister the now-orphan new route so the bandit doesn't
// hand its creds out for a non-listening port.
cleanupNewRoute(err)
return fmt.Errorf("start new sing-box: %w", err)
}
Comment thread
myleshorton marked this conversation as resolved.

// Final swap under lock. Re-check active so a Stop racing us between
// runCtx-check above and now doesn't get resurrected by overwriting
// the cleared state Stop just set up.
c.mu.Lock()
if !c.active {
c.mu.Unlock()
// Stop already cleared c.box / c.routeID. Close the new box we
// just brought up (Stop has no reference to it) and deregister
// the new route. The old route Stop already deregistered as
// part of its own teardown.
if err := newBox.Close(); err != nil {
slog.Warn("close new box after Stop raced rotation", "err", err)
}
cleanupNewRoute(errors.New("client stopped during rotation swap"))
return errors.New("client stopped during rotation")
}
c.box = newBox
c.routeID = regResp.RouteID
c.boxOptions = options
c.status.RouteID = regResp.RouteID
Comment thread
myleshorton marked this conversation as resolved.
c.status.ExternalIP = externalIP
c.mu.Unlock()
Comment thread
myleshorton marked this conversation as resolved.
Comment thread
myleshorton marked this conversation as resolved.

// Deregister the prior route so the bandit stops handing the old
// (now-invalid) creds to clients. Best-effort: the prior row will
// expire from its TTL anyway, but explicit deregister cuts the
// stale-creds window from up-to-TTL down to ~immediately.
if err := c.cfg.API.Deregister(ctx, oldRouteID); err != nil {
slog.Warn("deregister prior route after rotation",
"err", err, "old_route_id", oldRouteID)
}
Comment thread
myleshorton marked this conversation as resolved.
Comment thread
myleshorton marked this conversation as resolved.
Outdated

slog.Info("peer cred rotation succeeded",
"new_route_id", regResp.RouteID,
"old_route_id", oldRouteID,
)
return nil
}

// startNewBoxWithRetry retries newBox.Start a handful of times with a
// short backoff to absorb router-side TIME_WAIT / EADDRINUSE between
// oldBox.Close releasing the port and newBox.Start re-binding it. Total
// wait is bounded under 1s so a healthy rotation isn't delayed
// noticeably; the alternative is leaving the peer's listener down for
// the full rotation interval (default 1h) on a transient bind failure.
Comment thread
myleshorton marked this conversation as resolved.
Outdated
func startNewBoxWithRetry(ctx context.Context, newBox boxService) error {
const attempts = 5
backoff := 50 * time.Millisecond
var lastErr error
for i := 0; i < attempts; i++ {
if err := newBox.Start(); err == nil {
return nil
} else {
lastErr = err
}
select {
case <-ctx.Done():
return fmt.Errorf("start new sing-box (ctx cancelled after %d attempts): %w", i+1, lastErr)
case <-time.After(backoff):
}
backoff *= 2
}
return fmt.Errorf("start new sing-box (%d attempts): %w", attempts, lastErr)
}
Comment thread
myleshorton marked this conversation as resolved.
Outdated

// ensurePeerOutboundsBypassVPN guarantees the peer sing-box's outbound dials
// bind to the physical interface rather than whatever the OS routing table
// picks. Without this, when the user's own Lantern VPN is up its TUN holds
Expand Down
Loading
Loading