Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
333 changes: 326 additions & 7 deletions peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"log/slog"
"math/rand/v2"
"runtime/debug"
"sync"
"time"

Expand Down Expand Up @@ -59,14 +60,15 @@ type Status struct {
}

// Config plumbs in dependencies. Zero-valued fields fall back to production
// defaults; HeartbeatInterval and HeartbeatTimeout exist so tests can drive
// the loop without sleeping a full minute.
// defaults; HeartbeatInterval, HeartbeatTimeout, and CredRotationInterval
// exist so tests can drive the loops without sleeping a full minute / hour.
type Config struct {
API *API
NewForwarder func(ctx context.Context) (portForwarder, error)
BuildBoxService boxFactory
HeartbeatInterval time.Duration
HeartbeatTimeout time.Duration
API *API
NewForwarder func(ctx context.Context) (portForwarder, error)
BuildBoxService boxFactory
HeartbeatInterval time.Duration
HeartbeatTimeout time.Duration
CredRotationInterval time.Duration
}

// Client orchestrates one peer-proxy session: open UPnP port → register with
Expand Down Expand Up @@ -97,8 +99,38 @@ type Client struct {
forwarder portForwarder
box boxService
routeID string
// externalPort / internalPort persist the port mapping picked at
// Start so the cred-rotation loop can re-register against the same
// (address, port) tuple without re-probing UPnP / re-mapping. The
// router-side mapping itself stays put across rotations; only the
// samizdat creds and route_id rotate.
externalPort uint16
internalPort uint16
// boxOptions is the fresh options string passed to BuildBoxService,
// kept for diagnostics and so the rotation path doesn't need to
// re-derive it from the (also-stored) box reference.
boxOptions string
// runCtx is captured here for the cred-rotation goroutine to bind
// the new libbox lifetime to the same context as the original Start.
// Stop's cancelRun() teardown still applies to the rebuilt box.
runCtx context.Context
}

// peerCredRotationInterval bounds how long a leaked samizdat
// credential remains usable. At each tick the peer re-registers with
// lantern-cloud (new route_id, new keypair, new shortID), rebuilds the
// libbox service against the new options, and deregisters the prior
// route. Caps blast radius from credential leakage (logs, telemetry,
// memory dumps, H2 leakage paths) to ~1h regardless of peer process
// lifetime.
//
// Cost per rotation: one API.Register + Deregister round trip, one
// libbox build + start + close cycle. Brief (~hundreds-of-ms) port-
// rebind window during the swap; samizdat clients see TCP RST and
// reconnect via the bandit. Acceptable trade-off vs. holding the same
// cred for the full peer process lifetime.
const peerCredRotationInterval = 1 * time.Hour

// peerCleanupTimeout caps how long Start's rollback path waits for
// Deregister / UnmapPort. Cleanup uses a fresh Background context (not the
// caller's ctx) so an already-canceled or expired Start ctx doesn't skip
Expand Down Expand Up @@ -253,6 +285,10 @@ func (c *Client) Start(ctx context.Context) error {
c.forwarder = fwd
c.box = box
c.routeID = regResp.RouteID
c.externalPort = mapping.ExternalPort
c.internalPort = mapping.InternalPort
c.boxOptions = options
c.runCtx = runCtx
c.cancelRun = cancelRun
c.runDone = runDone
c.status = Status{
Expand All @@ -265,8 +301,14 @@ func (c *Client) Start(ctx context.Context) error {
statusSnapshot := c.status
c.mu.Unlock()

rotation := c.cfg.CredRotationInterval
if rotation == 0 {
rotation = peerCredRotationInterval
}

fwd.StartRenewal(runCtx)
go c.heartbeatLoop(runCtx, heartbeat, runDone)
go c.credRotationLoop(runCtx, rotation)

slog.Info("peer client started",
"external_ip", externalIP,
Expand Down Expand Up @@ -317,6 +359,10 @@ func (c *Client) Stop(ctx context.Context) error {
c.forwarder = nil
c.box = nil
c.routeID = ""
c.externalPort = 0
c.internalPort = 0
c.boxOptions = ""
c.runCtx = nil
c.status = Status{}
c.mu.Unlock()

Expand Down Expand Up @@ -385,6 +431,20 @@ func (c *Client) heartbeatLoop(ctx context.Context, interval time.Duration, done
// later heartbeat as a 404.
slog.Warn("peer heartbeat failed", "err", err, "route_id", routeID)
if isNotRegistered(err) {
// Re-check current routeID under lock. If credRotationLoop
// swapped routeID + deregistered the prior route between
// our heartbeat-prepare and heartbeat-response, the 404
// applies to a stale route and is expected, not a reason
// to stop. Skip the auto-Stop and let the next tick
// heartbeat the new route.
c.mu.Lock()
currentRouteID := c.routeID
c.mu.Unlock()
if currentRouteID != routeID {
slog.Info("peer heartbeat 404 on stale route_id; rotation in flight, continuing",
"stale_route_id", routeID, "current_route_id", currentRouteID)
continue
}
slog.Info("peer route no longer registered server-side, stopping client")
// Stop runs in a separate goroutine to avoid the cyclic
// Stop → cancelRun → loop-exit deadlock.
Expand All @@ -408,6 +468,265 @@ func isNotRegistered(err error) bool {
return errors.As(err, &apiErr) && apiErr.Status == 404
}

// credRotationLoop periodically rotates the peer's samizdat credentials
// (X25519 keypair, shortID, masquerade) by re-registering with
// lantern-cloud, rebuilding the libbox inbound, and deregistering the
// prior route. Caps blast radius from credential leakage to ~interval
// regardless of peer process lifetime — see peerCredRotationInterval.
//
// Closes done is the responsibility of heartbeatLoop; this loop just
// exits when ctx is cancelled. We deliberately don't add another close
// channel: heartbeatLoop's done already gates Stop, and rotation
// failures are non-fatal (log + retry next tick), so there's nothing
// the Stop path needs to wait on from this goroutine.
func (c *Client) credRotationLoop(ctx context.Context, interval time.Duration) {
Comment thread
myleshorton marked this conversation as resolved.
// Non-positive interval would panic time.NewTicker. Treat it the same
// as the zero case Start handles when CredRotationInterval is unset:
// fall back to the default cap rather than disabling rotation
// silently (an unset value still wants rotation; a negative value is
// almost certainly a test/config bug).
if interval <= 0 {
interval = peerCredRotationInterval
}
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
c.runRotation(ctx)
}
}
}

// runRotation wraps rotateCreds with a panic recover. libbox.Start can
// panic — the main tunnel start path already wraps it with recover for
// the same reason — and an unrecovered panic here would crash the host
// process during a background rotation, taking the user's main VPN
// with it. Treat a panic the same as any other rotation failure: log,
// keep the existing box serving, try again next tick.
func (c *Client) runRotation(ctx context.Context) {
defer func() {
if r := recover(); r != nil {
slog.Error("peer cred rotation panicked; current creds remain in use",
"panic", r, "stack", string(debug.Stack()))
}
}()
if err := c.rotateCreds(ctx); err != nil {
// Don't kill the loop on a single failure — current
// box / route is still serving. Try again next tick.
slog.Warn("peer cred rotation failed; current creds remain in use", "err", err)
}
}

// rotateCreds atomically swaps the peer's samizdat credentials. On
// success: a fresh route_id and keypair are in use, the libbox inbound
// has been rebuilt against the new options, the prior route is
// deregistered server-side, and the FlutterEvent stream sees no gap.
// On failure: the prior creds and box continue serving — rotation is
// best-effort. The router-side port mapping is preserved across the
Comment thread
myleshorton marked this conversation as resolved.
Outdated
// rotation; only the in-process samizdat state changes.
//
// Sequence:
// 1. Re-register with the same (externalIP, externalPort) as Start.
// 2. Patch the new server-supplied options for VPN bypass.
// 3. Build a new libbox service against the new options.
// 4. Close the old box (releases the listening port).
// 5. Start the new box (re-binds the same port, now with new creds).
// 6. Atomic swap: c.box, c.routeID, c.boxOptions point at the new box.
// 7. Best-effort deregister of the prior route_id so the bandit
// catalog stops handing the old (now-invalid) creds to clients.
//
// Steps 4-5 leave a brief (~hundreds of ms) window where the port
// isn't bound; samizdat clients see TCP RST and reconnect. Acceptable
// trade-off vs. the security cost of holding the same cred for the
// peer process lifetime.
func (c *Client) rotateCreds(ctx context.Context) error {
c.mu.Lock()
if !c.active {
c.mu.Unlock()
return errors.New("not active")
}
fwd := c.forwarder
extPort := c.externalPort
intPort := c.internalPort
oldRouteID := c.routeID
oldBox := c.box
// Capture sessionRunCtx upfront so the final swap can detect a
// Stop→Start cycle that happened mid-rotation. Just checking
// c.active at the swap isn't enough: Stop clears active, then a
// new Start can set it back to true before this rotation reaches
// the swap, and the old rotation would clobber the new session's
// state. The runCtx is unique per session, so a pointer-identity
// check at the swap is sufficient.
sessionRunCtx := c.runCtx
c.mu.Unlock()

if fwd == nil || oldBox == nil {
return errors.New("rotateCreds: client state inconsistent")
}

externalIP, err := fwd.ExternalIP(ctx)
if err != nil {
return fmt.Errorf("get external ip: %w", err)
}
regResp, err := c.cfg.API.Register(ctx, RegisterRequest{
ExternalIP: externalIP,
ExternalPort: extPort,
InternalPort: intPort,
})
if err != nil {
return fmt.Errorf("re-register: %w", err)
}
// From here on, any error path must deregister regResp.RouteID —
// otherwise the newly-created server-side row leaks until TTL expiry
// and the bandit catalog may briefly hand out creds for a route
// whose box never came up.
cleanupNewRoute := func(reason error) {
// Use a fresh ctx so a cancelled rotation ctx doesn't skip the
// cleanup we just made necessary.
cleanupCtx, cancel := context.WithTimeout(context.Background(), peerCleanupTimeout)
defer cancel()
if dErr := c.cfg.API.Deregister(cleanupCtx, regResp.RouteID); dErr != nil {
slog.Warn("deregister orphan route after rotation failure",
"reason", reason, "err", dErr, "orphan_route_id", regResp.RouteID)
}
}

options, err := ensurePeerOutboundsBypassVPN(regResp.ServerConfig)
if err != nil {
cleanupNewRoute(err)
return fmt.Errorf("patch sing-box options: %w", err)
}

c.mu.Lock()
currentRunCtx := c.runCtx
c.mu.Unlock()
if currentRunCtx == nil || currentRunCtx != sessionRunCtx {
// Stop ran (runCtx==nil), or a Stop→Start cycle replaced the
// session (runCtx pointer differs from the one captured at the
// top). Either way the build below would tie a libbox to the
// wrong session; skip it and clean up the just-created route.
cleanupNewRoute(errors.New("client stopped during rotation"))
return errors.New("client stopped during rotation")
}
newBox, err := c.cfg.BuildBoxService(sessionRunCtx, options)
if err != nil {
cleanupNewRoute(err)
return fmt.Errorf("build new sing-box: %w", err)
}
Comment thread
myleshorton marked this conversation as resolved.

// Close old, start new. Order matters — both want the same port.
// If newBox.Start fails after oldBox.Close, retry briefly to absorb
// router-side TIME_WAIT / EADDRINUSE windows before giving up.
if closeErr := oldBox.Close(); closeErr != nil {
slog.Warn("close old box during rotation", "err", closeErr)
}
if err := startNewBoxWithRetry(ctx, newBox); err != nil {
// Catastrophic: port is now unbound. Leave c.box pointing at
// oldBox so a future Stop tries to close it (idempotent on
// already-closed); the next rotation tick will try again. Also
// deregister the now-orphan new route so the bandit doesn't
// hand its creds out for a non-listening port.
cleanupNewRoute(err)
return fmt.Errorf("start new sing-box: %w", err)
}
Comment thread
myleshorton marked this conversation as resolved.

// Final swap under lock. Re-check that the session is still the
// one we started against. Per-session runCtx identity is a stricter
// check than c.active alone: a Stop→Start cycle between the
// runCtx-check above and now would have cleared active AND set it
// back true, but the new session's runCtx differs from sessionRunCtx
// — so we'd otherwise resurrect old-session state into the new one.
c.mu.Lock()
if !c.active || c.runCtx != sessionRunCtx {
c.mu.Unlock()
// Either Stop cleared state, or Stop→Start replaced the session.
// Close the new box we just brought up (the current session has
// no reference to it) and deregister the new route. Don't touch
// the prior route: in the Stop-only case, Stop already
// deregistered it; in the Stop→Start case, deregistering it
// would defeat the rotation point of cutting off the old creds.
if err := newBox.Close(); err != nil {
slog.Warn("close new box after session changed during rotation", "err", err)
}
cleanupNewRoute(errors.New("session changed during rotation swap"))
return errors.New("session changed during rotation")
}
c.box = newBox
c.routeID = regResp.RouteID
c.boxOptions = options
c.status.RouteID = regResp.RouteID
Comment thread
myleshorton marked this conversation as resolved.
c.status.ExternalIP = externalIP
c.mu.Unlock()
Comment thread
myleshorton marked this conversation as resolved.
Comment thread
myleshorton marked this conversation as resolved.

// Deregister the prior route so the bandit stops handing the old
// (now-invalid) creds to clients. Use a fresh ctx so a Stop that
// races us between the swap above and the deregister doesn't cancel
// the cleanup — leaving the old (now-invalid-locally) route in the
// server catalog until TTL would defeat the rotation's stale-cred
// cap, which is the whole point of the feature.
deregCtx, cancelDereg := context.WithTimeout(context.Background(), peerCleanupTimeout)
if err := c.cfg.API.Deregister(deregCtx, oldRouteID); err != nil {
slog.Warn("deregister prior route after rotation",
"err", err, "old_route_id", oldRouteID)
}
Comment thread
myleshorton marked this conversation as resolved.
cancelDereg()

slog.Info("peer cred rotation succeeded",
"new_route_id", regResp.RouteID,
"old_route_id", oldRouteID,
)
return nil
}

// startNewBoxWithRetry retries newBox.Start a handful of times with a
// short backoff to absorb router-side TIME_WAIT / EADDRINUSE between
// oldBox.Close releasing the port and newBox.Start re-binding it.
// Inter-attempt backoff totals 750ms (50+100+200+400 across 4 sleeps;
// no sleep after the final attempt) so a healthy rotation isn't
// delayed noticeably; the alternative is leaving the peer's listener
// down for the full rotation interval (default 1h) on a transient
// bind failure.
//
// libbox.Start can panic; convert that to an error here rather than
// letting it propagate. Without this, the recover in runRotation would
// catch the panic but only after rotateCreds' cleanupNewRoute path
// has been skipped — leaving the freshly-registered route orphaned
// and the port unbound until next rotation. Returning the panic as
// an error lets rotateCreds' deferred cleanup deregister the orphan.
func startNewBoxWithRetry(ctx context.Context, newBox boxService) (retErr error) {
defer func() {
if r := recover(); r != nil {
retErr = fmt.Errorf("start new sing-box panicked: %v", r)
}
}()
const attempts = 5
backoff := 50 * time.Millisecond
var lastErr error
for i := 0; i < attempts; i++ {
if err := newBox.Start(); err == nil {
return nil
} else {
lastErr = err
}
// Skip the sleep on the final attempt — we won't try again,
// so the wait is pure latency that would push total backoff
// above the documented sub-1s budget.
if i == attempts-1 {
break
}
select {
case <-ctx.Done():
return fmt.Errorf("start new sing-box (ctx cancelled after %d attempts): %w", i+1, lastErr)
case <-time.After(backoff):
}
backoff *= 2
}
return fmt.Errorf("start new sing-box (%d attempts): %w", attempts, lastErr)
}

// ensurePeerOutboundsBypassVPN guarantees the peer sing-box's outbound dials
// bind to the physical interface rather than whatever the OS routing table
// picks. Without this, when the user's own Lantern VPN is up its TUN holds
Expand Down
Loading
Loading