diff --git a/backend/radiance.go b/backend/radiance.go index f90476e2..c0e1c473 100644 --- a/backend/radiance.go +++ b/backend/radiance.go @@ -38,6 +38,7 @@ import ( "github.com/getlantern/radiance/servers" "github.com/getlantern/radiance/telemetry" "github.com/getlantern/radiance/traces" + "github.com/getlantern/radiance/unbounded" "github.com/getlantern/radiance/vpn" "github.com/sagernet/sing-box/adapter" @@ -232,6 +233,18 @@ func (r *LocalBackend) Start() { r.resumePeerShareIfEnabled() + // Wire the broflake / Unbounded widget proxy lifecycle to config + // updates. This single subscription handles all three start/stop + // triggers (local toggle, server feature flag, server-supplied + // config); InitSubscription is sync.Once-guarded so a future Start + // retry after Close won't double-subscribe. + // + // Seed with the already-cached config (loaded from disk before + // Start runs) so an opted-in user auto-starts the widget on + // launch instead of waiting for the next config refresh. + cachedCfg, _ := r.confHandler.GetConfig() + unbounded.InitSubscription(cachedCfg) + // set country code in settings when new config is received so it can be included in issue reports events.SubscribeOnce(func(evt config.NewConfigEvent) { if env.GetString(env.Country) != "" { @@ -318,6 +331,17 @@ func (r *LocalBackend) Close() { r.closeOnce.Do(func() { slog.Debug("Closing Radiance") r.closePeerClient() + // unbounded.start spawns its worker on a context.Background- + // derived ctx (it has to outlive any single NewConfigEvent), + // so Close has to explicitly tell it to shut down — otherwise + // the broflake widget goroutine survives backend close and + // leaks until process exit. Use a fresh ctx so a cancelled + // shutdown path doesn't skip the Stop. + stopCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + if err := unbounded.Stop(stopCtx); err != nil { + slog.Warn("unbounded stop on backend close returned error", "error", err) + } + cancel() // vpnClient is always set in production via NewLocalBackend, but // peer-focused unit tests construct partial LocalBackends without // one. Guard the call so Close stays robust under those paths @@ -502,6 +526,19 @@ func (r *LocalBackend) PatchSettings(updates settings.Settings) error { } } + // Drive the Unbounded widget proxy off the toggle change immediately + // rather than waiting for the next NewConfigEvent to re-evaluate. + // settings.Patch above has already persisted the new value, so go + // straight to Apply() — SetEnabled would short-circuit on the + // already-matching persisted value and never re-evaluate the + // manager. Apply re-checks the three-condition predicate against + // the cached server-side state and starts or stops accordingly. + if _, ok := diff[settings.UnboundedKey]; ok { + if err := unbounded.Apply(); err != nil { + slog.Warn("unbounded apply failed", "error", err) + } + } + return nil } diff --git a/backend/radiance_test.go b/backend/radiance_test.go index 238d19b9..e885fc1b 100644 --- a/backend/radiance_test.go +++ b/backend/radiance_test.go @@ -13,6 +13,7 @@ import ( "github.com/getlantern/radiance/common/settings" "github.com/getlantern/radiance/peer" + "github.com/getlantern/radiance/unbounded" ) func TestBackend(t *testing.T) {} @@ -216,3 +217,28 @@ func TestPatchSettings_PeerShareDispatches(t *testing.T) { assert.Equal(t, int64(1), fake.stopCalls.Load()) assert.False(t, fake.IsActive()) } + +// Verify PatchSettings routes UnboundedKey to unbounded.Apply via the +// SetApplyHookForTest hook. A typo on the diff key or a removal of +// the Apply call would silently leave the Unbounded toggle persisted +// but inert. The hook fires regardless of the Enabled() gate inside +// Apply, so this catches the dispatch even though we don't prime the +// rest of the manager state. +func TestPatchSettings_UnboundedDispatches(t *testing.T) { + r := newPeerTestBackend(t, &fakePeerController{}) + + var applyCalls atomic.Int32 + unbounded.SetApplyHookForTest(func() { applyCalls.Add(1) }) + t.Cleanup(func() { unbounded.SetApplyHookForTest(nil) }) + + require.NoError(t, r.PatchSettings(settings.Settings{settings.UnboundedKey: true})) + assert.Equal(t, int32(1), applyCalls.Load(), "PatchSettings({UnboundedKey: true}) must dispatch to unbounded.Apply") + + require.NoError(t, r.PatchSettings(settings.Settings{settings.UnboundedKey: false})) + assert.Equal(t, int32(2), applyCalls.Load(), "PatchSettings({UnboundedKey: false}) must dispatch to unbounded.Apply") + + // A PATCH without UnboundedKey must NOT trigger Apply — confirms + // the diff check is in place rather than always firing. + require.NoError(t, r.PatchSettings(settings.Settings{settings.PeerShareEnabledKey: false})) + assert.Equal(t, int32(2), applyCalls.Load(), "PatchSettings without UnboundedKey must not dispatch to unbounded.Apply") +} diff --git a/common/settings/settings.go b/common/settings/settings.go index 29c68d37..55743b03 100644 --- a/common/settings/settings.go +++ b/common/settings/settings.go @@ -65,6 +65,14 @@ const ( // Advanced setting in the Share My Connection UI for users on // networks where UPnP is disabled or unavailable. PeerManualPortKey _key = "peer_manual_port" // int (0 = unset; 1..65535 = manual port) + // UnboundedKey is the local opt-in for the broflake / Unbounded + // widget proxy. When true AND the server-side Features[unbounded] + // flag is on AND the server provides UnboundedConfig (discovery + // + egress URLs), the widget proxy starts. Surfaced as a "Basic + // mode" option in the Share My Connection UI for networks where + // UPnP isn't workable but the user still wants to contribute via + // the WebRTC-based donor path. + UnboundedKey _key = "unbounded" // bool SelectedServerKey _key = "selected_server" // [servers.Server] Server.Options is not stored PreferredLocationKey _key = "preferred_location" // [common.PreferredLocation] diff --git a/go.mod b/go.mod index 07d0cc5d..45242f1d 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,7 @@ require ( github.com/alexflint/go-arg v1.6.1 github.com/alitto/pond v1.9.2 github.com/getlantern/amp v0.0.0-20260305201851-782bc8045e58 + github.com/getlantern/broflake v0.0.0-20260504215251-ed3cf75062d1 github.com/getlantern/common v1.2.1-0.20260326210434-cb69537aaf46 github.com/getlantern/dnstt v0.0.0-20260112160750-05100563bd0d github.com/getlantern/domainfront v0.0.0-20260419161617-0bff0b2169f4 @@ -113,7 +114,6 @@ require ( github.com/gaissmai/bart v0.11.1 // indirect github.com/gaukas/wazerofs v0.1.0 // indirect github.com/getlantern/algeneva v0.0.0-20250307163401-1824e7b54f52 // indirect - github.com/getlantern/broflake v0.0.0-20260504215251-ed3cf75062d1 // indirect github.com/getlantern/lantern-water v0.0.0-20260520145825-958775d51395 // indirect github.com/getlantern/samizdat v0.0.3-0.20260529191731-5ea8ae61ddbf // indirect github.com/go-chi/chi/v5 v5.2.2 // indirect diff --git a/ipc/client_events_mobile.go b/ipc/client_events_mobile.go index a3a8dfd2..fb327ac8 100644 --- a/ipc/client_events_mobile.go +++ b/ipc/client_events_mobile.go @@ -10,6 +10,7 @@ import ( "github.com/getlantern/radiance/config" "github.com/getlantern/radiance/events" "github.com/getlantern/radiance/peer" + "github.com/getlantern/radiance/unbounded" "github.com/getlantern/radiance/vpn" ) @@ -62,10 +63,15 @@ func (c *Client) DataCapStream(ctx context.Context, handler func(account.DataCap return c.dataCapStream(ctx, handler) } -// PeerStatusEvents — see client_events_nonmobile.go for the full -// docstring. Mobile builds may share a process with radiance (localOnly) -// in which case events.SubscribeContext delivers directly; otherwise the -// SSE retry loop matches the desktop path. +// PeerStatusEvents streams peer-share lifecycle phase changes (mapping_port +// → registering → verifying → serving on Start, stopping → idle on Stop, +// error on failure). Each frame is a peer.StatusEvent JSON whose .Status +// is the live snapshot at the moment the event fired — consumers SHOULD +// re-render on every frame rather than diffing, since events.Emit's +// per-callback goroutine can land Start phases out of order. Mobile builds +// may share a process with radiance (localOnly), in which case +// events.SubscribeContext delivers directly; otherwise the SSE retry loop +// is used. Blocks until ctx is cancelled. func (c *Client) PeerStatusEvents(ctx context.Context, handler func(peer.StatusEvent)) error { events.SubscribeContext(ctx, handler) if c.localOnly { @@ -80,8 +86,12 @@ func (c *Client) PeerStatusEvents(ctx context.Context, handler func(peer.StatusE }) } -// PeerConnectionEvents — see client_events_nonmobile.go for the full -// docstring. Same mobile dual-path as PeerStatusEvents. +// PeerConnectionEvents streams accept/close events for the local +// samizdat-in inbound. State is +1 on accept and -1 on close; Source is +// the remote "ip:port" string for geo-lookup / abuse attribution. +// Same mobile dual-path as PeerStatusEvents (localOnly delivers via +// the in-process event bus; otherwise the SSE retry loop is used). +// Blocks until ctx is cancelled. func (c *Client) PeerConnectionEvents(ctx context.Context, handler func(peer.ConnectionEvent)) error { events.SubscribeContext(ctx, handler) if c.localOnly { @@ -95,3 +105,26 @@ func (c *Client) PeerConnectionEvents(ctx context.Context, handler func(peer.Con } }) } + +// UnboundedConnectionEvents streams accept/close events for the local +// broflake widget proxy ("Unbounded" / Basic mode). The JSON shape +// matches peer.ConnectionEvent but the Go type is distinct — in-process +// subscribers must subscribe to both event types separately to see all +// peer activity. State is +1 on consumer accept, -1 on close; Source +// is the consumer's IP if broflake exposes it, otherwise empty. Same +// mobile dual-path: localOnly subscribes directly to the in-process +// event bus; otherwise the SSE retry loop is used. Blocks until ctx +// is cancelled. +func (c *Client) UnboundedConnectionEvents(ctx context.Context, handler func(unbounded.ConnectionEvent)) error { + events.SubscribeContext(ctx, handler) + if c.localOnly { + <-ctx.Done() + return ctx.Err() + } + return c.sseRetryLoop(ctx, unboundedConnectionEventsEndpoint, func(data []byte) { + var evt unbounded.ConnectionEvent + if err := json.Unmarshal(data, &evt); err == nil { + handler(evt) + } + }) +} diff --git a/ipc/client_events_nonmobile.go b/ipc/client_events_nonmobile.go index e0330fe1..ee15f9ce 100644 --- a/ipc/client_events_nonmobile.go +++ b/ipc/client_events_nonmobile.go @@ -8,6 +8,7 @@ import ( "github.com/getlantern/radiance/account" "github.com/getlantern/radiance/peer" + "github.com/getlantern/radiance/unbounded" "github.com/getlantern/radiance/vpn" ) @@ -74,3 +75,19 @@ func (c *Client) PeerConnectionEvents(ctx context.Context, handler func(peer.Con } }) } + +// UnboundedConnectionEvents streams accept/close events for the +// local broflake widget proxy ("Unbounded" / Basic mode). The JSON +// shape matches peer.ConnectionEvent but the Go type is distinct — +// in-process subscribers must subscribe to both event types separately +// to see all peer activity. State is +1 on consumer accept, -1 on +// close; Source is the consumer's IP if broflake exposes it, +// otherwise empty. Blocks until ctx is cancelled. +func (c *Client) UnboundedConnectionEvents(ctx context.Context, handler func(unbounded.ConnectionEvent)) error { + return c.sseRetryLoop(ctx, unboundedConnectionEventsEndpoint, func(data []byte) { + var evt unbounded.ConnectionEvent + if err := json.Unmarshal(data, &evt); err == nil { + handler(evt) + } + }) +} diff --git a/ipc/server.go b/ipc/server.go index 6825e7b8..6d9cca19 100644 --- a/ipc/server.go +++ b/ipc/server.go @@ -23,6 +23,7 @@ import ( "github.com/getlantern/radiance/events" rlog "github.com/getlantern/radiance/log" "github.com/getlantern/radiance/peer" + "github.com/getlantern/radiance/unbounded" "github.com/getlantern/radiance/vpn" sjson "github.com/sagernet/sing/common/json" @@ -69,6 +70,11 @@ const ( peerStatusEventsEndpoint = "/peer/status/events" peerConnectionEventsEndpoint = "/peer/connection/events" + // Unbounded ("Basic mode") endpoints. Connection events have the same + // JSON shape as /peer/connection/events but are emitted from a + // different in-process event type, so they need their own SSE bridge. + unboundedConnectionEventsEndpoint = "/unbounded/connection/events" + // Split tunnel endpoint splitTunnelEndpoint = "/split-tunnel" @@ -236,6 +242,7 @@ func newLocalAPI(b *backend.LocalBackend, withAuth bool) *localapi { // SSE skips the tracer middleware since it buffers the entire response body. mux.HandleFunc("GET "+peerStatusEventsEndpoint, s.peerStatusEventsHandler) mux.HandleFunc("GET "+peerConnectionEventsEndpoint, s.peerConnectionEventsHandler) + mux.HandleFunc("GET "+unboundedConnectionEventsEndpoint, s.unboundedConnectionEventsHandler) // Split tunnel mux.HandleFunc(splitTunnelEndpoint, traced(s.splitTunnelHandler)) @@ -567,6 +574,45 @@ func (s *localapi) peerConnectionEventsHandler(w http.ResponseWriter, r *http.Re } } +// unboundedConnectionEventsHandler streams unbounded.ConnectionEvent over SSE +// for each consumer accept/disconnect on the broflake widget proxy. Mirrors +// peerConnectionEventsHandler — the JSON shape is identical, but events.Emit +// dispatches by concrete Go type so the two streams need separate +// subscriptions. Cross-process consumers (Flutter via Liblantern) merge the +// two SSE endpoints client-side into one peer-connection feed for the globe. +func (s *localapi) unboundedConnectionEventsHandler(w http.ResponseWriter, r *http.Request) { + flusher := sseWriter(w) + if flusher == nil { + return + } + queue := make(chan unbounded.ConnectionEvent, 64) + sub := events.Subscribe(func(evt unbounded.ConnectionEvent) { + select { + case queue <- evt: + default: + // queue full — drop. Same rationale as the peer handler: + // better to lose this event than back up events.Emit. + } + }) + defer sub.Unsubscribe() + + for { + select { + case evt := <-queue: + data, err := json.Marshal(evt) + if err != nil { + continue + } + if _, err := fmt.Fprintf(w, "data: %s\n\n", data); err != nil { + return + } + flusher.Flush() + case <-r.Context().Done(): + return + } + } +} + /////////////////////// // Server selection // /////////////////////// diff --git a/unbounded/test_helpers_test.go b/unbounded/test_helpers_test.go new file mode 100644 index 00000000..c124a5cc --- /dev/null +++ b/unbounded/test_helpers_test.go @@ -0,0 +1,16 @@ +package unbounded + +// countingWidget is a fakeWidget variant whose Stop runs a caller- +// supplied callback before returning. The callback lets the test +// observe shutdown ordering — typically by decrementing a live- +// widget counter so the test can assert that at most one widget is +// alive across a stop/start transition. +type countingWidget struct { + onStop func() +} + +func (w *countingWidget) Stop() { + if w.onStop != nil { + w.onStop() + } +} diff --git a/unbounded/unbounded.go b/unbounded/unbounded.go new file mode 100644 index 00000000..2e12ef12 --- /dev/null +++ b/unbounded/unbounded.go @@ -0,0 +1,568 @@ +// Package unbounded manages the broflake / Unbounded widget-proxy lifecycle. +// +// Unbounded is the WebRTC-based donor mode for Lantern's Share My Connection +// feature: the local user contributes bandwidth to censored users via short- +// lived WebRTC sessions brokered through a discovery server, without exposing +// a long-lived inbound port the way the samizdat-over-UPnP "Share My +// Connection" mode does. It's the lower-bandwidth, lower-risk, universally- +// applicable alternative to SmC — works on networks where UPnP is disabled +// or unavailable, and the peer's residential IP isn't tied to a single +// long-lived inbound listener. +// +// Three conditions must all hold for the widget proxy to actually run: +// +// 1. settings.UnboundedKey is true (local opt-in via the UI toggle) +// 2. server-side cfg.Features[UNBOUNDED] is enabled (server says go) +// 3. server-side cfg.Unbounded provides discovery + egress URLs +// +// The manager subscribes to config.NewConfigEvent and recomputes the +// running state on every config update; it also re-evaluates when +// SetEnabled flips the local toggle. Each consumer connection change +// (accept / disconnect) emits a ConnectionEvent on the radiance event +// bus so the same Flutter globe used for SmC can render arcs without +// caring which protocol produced them. +package unbounded + +import ( + "context" + "log/slog" + "net" + "sync" + "time" + + C "github.com/getlantern/common" + + "github.com/getlantern/broflake/clientcore" + + "github.com/getlantern/radiance/common/settings" + "github.com/getlantern/radiance/config" + "github.com/getlantern/radiance/events" +) + +// ConnectionEvent fires every time a consumer (i.e. a censored client +// being routed through this widget proxy) connects or disconnects via +// the broflake mesh. +// +// State +1 on accept, -1 on close +// Source consumer's IP if broflake exposes it, otherwise empty +// Timestamp emit time in Unix milliseconds +// +// JSON shape is identical to peer.ConnectionEvent so a consumer +// reading both SSE streams can deserialize each frame with the +// same struct. The in-process event bus, however, keys +// subscriptions by concrete Go type, so subscribing to +// peer.ConnectionEvent does NOT also deliver unbounded +// ConnectionEvents — in-process consumers that want a unified +// view of all peer activity must subscribe to both. Broflake's +// internal worker-slot identifier is not surfaced; a consumer +// that needs to pair accept/close events for the same arc keys +// off Source (or arrival sequence within a single connection +// lifetime). +type ConnectionEvent struct { + events.Event + State int `json:"state"` + Source string `json:"source"` + Timestamp int64 `json:"timestamp"` +} + +var manager = &unboundedManager{} + +// widget is the minimum interface the manager needs from a running +// broflake instance. Defined locally (vs using clientcore.UI) so +// tests can supply a tiny fake without implementing the full +// clientcore.UI surface area. +type widget interface { + Stop() +} + +// newWidget builds the live broflake widget. Package var so unit +// tests can swap it for a fake that records start/stop calls +// without spinning up real WebRTC. +var newWidget = func(bfOpt *clientcore.BroflakeOptions, rtcOpt *clientcore.WebRTCOptions, egOpt *clientcore.EgressOptions) (widget, error) { + // BroflakeConn is for clients routing traffic *through* the mesh. + // A widget proxy only donates bandwidth, so the conn is unused — + // discard it. + _, ui, err := clientcore.NewBroflake(bfOpt, rtcOpt, egOpt) + if err != nil { + return nil, err + } + return ui, nil +} + +type unboundedManager struct { + // transitionMu serializes start/stop. It's held for the full + // duration of a stop (including the wait for the worker goroutine + // to actually exit) and for the full duration of a start. Without + // it, stop's signal-then-return path could race a concurrent start + // — the worker is still running ui.Stop while cancel/done get + // re-armed under a fresh worker, leaving two broflake widgets + // alive simultaneously. + transitionMu sync.Mutex + + // mu protects the fields below. Held only for the brief window of + // reading or mutating manager state; never held across the wait on + // done or any broflake call. + mu sync.Mutex + // armed gates every start path. InitSubscription flips it true; + // public Stop flips it false. Without this gate, a config event + // (or any other applyConfig caller) racing the public Stop's + // transitionMu hold could observe cancel==nil after Stop's wait, + // pile up at transitionMu, and start a new widget *after* the + // shutdown caller has already returned — the LocalBackend.Close + // docstring is explicit that Stop is the final teardown, so a + // post-Stop revival breaks that contract. start() and applyConfig + // re-check armed under mu (inside transitionMu for start) so a + // concurrent flip is honored even when the caller has been queued + // at transitionMu the whole time. + armed bool + cancel context.CancelFunc + // done is closed by the worker goroutine when it actually exits + // (after NewBroflake returns and ui.Stop runs). stop and Stop wait + // on this under transitionMu so backend shutdown blocks until the + // broflake widget is actually torn down. Nil when nothing is + // running. + done chan struct{} + // lastCfg + lastFeatureOn cache the server-side half of the + // three-condition predicate so SetEnabled can re-evaluate + // immediately when the local toggle flips, without waiting for + // the next NewConfigEvent. Both are updated atomically when a + // new config arrives. + lastCfg *C.UnboundedConfig + lastFeatureOn bool + + // runningCfg is the snapshot of UnboundedConfig the live worker + // was started with. broflake consumes its discovery/egress + // options once in clientcore.NewBroflake, so a server-side config + // change while the worker is alive would otherwise leave it + // running on stale parameters. applyConfig compares this against + // the freshly-cached lastCfg and triggers stop+start when they + // differ, with the predicate still otherwise satisfied. Nil + // whenever cancel is nil. + runningCfg *C.UnboundedConfig +} + +// shouldStart reports whether all three start conditions hold. Caller +// must hold m.mu. +func (m *unboundedManager) shouldStart() bool { + return settings.GetBool(settings.UnboundedKey) && m.lastFeatureOn && cfgUsable(m.lastCfg) +} + +// cfgUsable reports whether the cached UnboundedConfig supplies the +// minimum fields broflake needs to route real consumer traffic: +// discovery (server + endpoint) and egress (address + endpoint). The +// server contract is that all four are required; broflake's +// clientcore defaults exist for unit-test convenience and point at +// the upstream maintainer's infra — running them in a Lantern build +// would bypass the server's per-environment endpoint selection and +// the "is this user opted in?" feature-flag gate, so a partially- +// populated config is treated as "not yet ready to start" rather +// than "fall back to defaults". +// +// CTableSize / PTableSize are not required; defaults are reasonable +// and the server sends them only when it wants to override. +func cfgUsable(cfg *C.UnboundedConfig) bool { + if cfg == nil { + return false + } + return cfg.DiscoverySrv != "" && cfg.DiscoveryEndpoint != "" && + cfg.EgressAddr != "" && cfg.EgressEndpoint != "" +} + +// cfgEqual reports whether two UnboundedConfig pointers refer to +// configurations broflake would consume identically. UnboundedConfig +// is a flat struct of strings and ints, so value equality is well- +// defined. Nil pointers compare equal to themselves and unequal to +// any non-nil pointer. +func cfgEqual(a, b *C.UnboundedConfig) bool { + if a == b { + return true + } + if a == nil || b == nil { + return false + } + return *a == *b +} + +// Enabled reports whether the local opt-in is set. Doesn't say whether +// the proxy is currently running (server flag and config can override). +func Enabled() bool { + return settings.GetBool(settings.UnboundedKey) +} + +// SetEnabled persists the local opt-in (if it differs from the current +// persisted value) and re-evaluates the manager. Use this from direct +// callers (FFI, programmatic use) where the new toggle value hasn't +// been written to settings yet. +// +// PatchSettings persists settings itself before calling into the +// unbounded package, so it should use Apply() directly instead of +// going through SetEnabled — otherwise SetEnabled's no-change short- +// circuit (Enabled() == enable) returns before Apply runs and the +// manager never re-evaluates. +func SetEnabled(enable bool) error { + if Enabled() != enable { + if err := settings.Set(settings.UnboundedKey, enable); err != nil { + return err + } + slog.Info("Unbounded widget proxy local opt-in changed", "enabled", enable) + } + return Apply() +} + +// Apply re-evaluates the three-condition predicate (local toggle + +// server feature flag + server config cached) against the currently +// persisted setting and starts or stops the manager accordingly. Used +// by PatchSettings (which already persisted UnboundedKey itself) and +// by SetEnabled (after its persist step). Safe to call when nothing +// has changed — start is a no-op if the worker is already running and +// stop is a no-op if it isn't. +// +// No-op once Stop has disarmed the manager (post-shutdown): the +// armed gate is also checked inside start, so even a queued +// transition stays a no-op after Stop. +func Apply() error { + if h := applyHook; h != nil { + h() + } + if !Enabled() { + manager.stop() + return nil + } + manager.mu.Lock() + armed := manager.armed + shouldStart := manager.shouldStart() + cfg := manager.lastCfg + feature := manager.lastFeatureOn + running := manager.cancel != nil + manager.mu.Unlock() + if !armed { + return nil + } + if shouldStart { + if !running { + manager.start() + } + return nil + } + switch { + case cfg == nil: + slog.Info("Unbounded: enabled locally, waiting for server config") + case !feature: + slog.Info("Unbounded: enabled locally, but server feature flag is off") + } + return nil +} + +// InitSubscription wires the manager into radiance's config event bus +// and applies any already-cached config. Called once at LocalBackend +// startup; the underlying subscription lives for the process lifetime +// (sync.Once-guarded), but the armed flag is set on every call so a +// Start-after-Close re-enables the manager that public Stop had +// disarmed. +// +// initial is the config that ConfigHandler has already loaded by the +// time Start reaches this line — typically the previously-persisted +// config from disk. Without seeding the manager state from it, the +// three-condition predicate stays stuck at lastCfg=nil/lastFeatureOn= +// false until the next config refresh arrives, and an already-opted-in +// user wouldn't auto-start the widget proxy until then. Pass nil if +// no config is available yet. +func InitSubscription(initial *config.Config) { + initOnce.Do(func() { + events.Subscribe(func(evt config.NewConfigEvent) { + if evt.New == nil { + return + } + applyConfig(*evt.New) + }) + }) + manager.mu.Lock() + manager.armed = true + manager.mu.Unlock() + if initial != nil { + applyConfig(*initial) + } +} + +// applyConfig caches the server-side half of the start predicate and +// transitions the manager start/stop accordingly. Shared by +// InitSubscription's NewConfigEvent handler and the initial-config +// seeding path so cached and live configs follow identical logic. +// No-op when the manager is disarmed (post-Stop) so a late event +// arriving after backend shutdown doesn't revive the widget. +func applyConfig(cfg config.Config) { + manager.mu.Lock() + if !manager.armed { + manager.mu.Unlock() + return + } + // config.Config is a type alias for C.ConfigResponse on the + // current radiance branch — no nested .ConfigResponse field, + // just dereference and use directly. + manager.lastCfg = cfg.Unbounded + manager.lastFeatureOn = cfg.Features[C.UNBOUNDED] + shouldRun := manager.shouldStart() + running := manager.cancel != nil + ucfg := manager.lastCfg + cfgChanged := running && !cfgEqual(manager.runningCfg, ucfg) + manager.mu.Unlock() + + switch { + case shouldRun && !running: + manager.start() + case shouldRun && cfgChanged: + // Broflake consumed its options at construction time and has + // no live-reconfigure API; the only way to pick up new + // discovery/egress endpoints or table sizes is to tear the + // worker down and bring it back up with the new config. + // stop blocks until the prior worker fully exits, so start + // always sees a clean slate. + manager.stop() + manager.start() + case !shouldRun && running: + manager.stop() + } +} + +var initOnce sync.Once + +// Stop tears down a running widget proxy and waits for the worker +// goroutine to actually exit (or the supplied ctx to expire). Used +// as a LocalBackend shutdown hook — without the wait, Close would +// return as soon as the cancel signal was queued and the broflake +// goroutine could still be inside NewBroflake or ui.Stop when the +// rest of the process tears down. +// +// Stop also disarms the manager: any subsequent start path (Apply, +// applyConfig from a config event, manager.start directly) becomes +// a no-op until InitSubscription re-arms. The config subscription +// callback stays installed but short-circuits via the armed gate, +// so a late config event arriving during or after Stop can't +// revive the widget. Future Start (after Close) re-arms via +// InitSubscription. +// +// Idempotent: no-op if no worker is running. Returns ctx.Err() if +// the wait deadline expires before the worker exits — in that case +// the worker has been signalled to cancel and will exit on its own +// schedule, but the caller has given up waiting. m.cancel and +// m.done stay set until the worker eventually clears them, so a +// subsequent start observes "already running" and is a no-op. +func Stop(ctx context.Context) error { + return manager.stopCtx(ctx, true) +} + +// internalStopTimeout bounds how long Apply / applyConfig wait for +// the worker to exit after signalling cancel. broflake's ui.Stop +// should drain in well under this; a longer-than-expected ui.Stop +// must not block a settings PATCH or a config-event handler +// indefinitely. Variable (not const) so tests can install a short +// timeout to exercise the timeout path without holding up the +// suite. Production code reads this on every call, so the override +// applies for the duration of the test. +var internalStopTimeout = 5 * time.Second + +// applyHook is invoked at the top of Apply, before any state check +// or transition. nil in production; backend tests install a counter +// or assertion to verify that PatchSettings actually dispatches the +// UnboundedKey diff to this package. Keep this minimal — exposing +// the manager's internals across packages just for test wiring +// would be a much bigger surface. +var applyHook func() + +// SetApplyHookForTest installs h to be invoked at the start of +// every Apply call. Pass nil to remove. Test-only; production code +// must not call this. The hook fires regardless of the Enabled() +// gate so callers can verify dispatch happened even when no +// transition results. +func SetApplyHookForTest(h func()) { + applyHook = h +} + +// stopCtx is the shared implementation for both the public Stop +// (disarm=true) and internal manager.stop (disarm=false). Holds +// transitionMu for the entire signal+wait so a concurrent start +// cannot interleave. On ctx expiration the cancel signal has been +// delivered and the worker will exit on its own schedule; manager +// state is left intact so future transitions see "already running" +// until the worker clears it. +func (m *unboundedManager) stopCtx(ctx context.Context, disarm bool) error { + m.transitionMu.Lock() + defer m.transitionMu.Unlock() + m.mu.Lock() + if disarm { + m.armed = false + } + cancel := m.cancel + done := m.done + m.mu.Unlock() + if cancel == nil { + return nil + } + cancel() + select { + case <-done: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// start brings up the broflake worker if all preconditions hold at +// the moment it acquires the locks: manager armed, no worker already +// running, and the three-condition predicate (toggle + feature flag + +// cached config) still satisfied. Every check is done INSIDE +// transitionMu so a caller queued behind a stop or another start +// observes the freshest state rather than a snapshot from when it +// decided to start — a concurrent SetEnabled(false) or config update +// between the caller's predicate read and start's lock acquisition +// is honored. +// +// The config used by the worker is the LIVE m.lastCfg, not a snapshot +// captured by the caller. applyConfig updates m.lastCfg before +// calling start, so this gives identical behavior for the normal +// path; for the race case (config updated after caller decided to +// start), the worker comes up with the latest parameters. +func (m *unboundedManager) start() { + m.transitionMu.Lock() + defer m.transitionMu.Unlock() + + m.mu.Lock() + if !m.armed { + // Disarmed by public Stop. Re-check inside transitionMu so a + // start that got queued at transitionMu while Stop was waiting + // for the worker still bails out instead of reviving the widget + // after Stop's caller has returned. + m.mu.Unlock() + return + } + if m.cancel != nil { + m.mu.Unlock() + return // already running; transitionMu prevents overlap with stop + } + if !m.shouldStart() { + // Predicate flipped while we were queued at transitionMu — a + // SetEnabled(false), a config event that cleared the feature + // flag or unset the cfg, or any other concurrent change. Bail + // rather than start a worker that's already been decided + // against. + m.mu.Unlock() + return + } + ucfg := m.lastCfg + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + m.cancel = cancel + m.done = done + // Snapshot the config the worker is being started with so a + // later applyConfig can detect parameter changes and restart. + // Pointer-stored (not value-stored) because the upstream + // lastCfg is also a pointer and equality is value-based via + // cfgEqual. + m.runningCfg = ucfg + m.mu.Unlock() + + go func() { + defer close(done) + slog.Info("Unbounded: starting broflake widget proxy") + + bfOpt := clientcore.NewDefaultBroflakeOptions() + bfOpt.ClientType = "widget" + if ucfg != nil { + if ucfg.CTableSize > 0 { + bfOpt.CTableSize = ucfg.CTableSize + } + if ucfg.PTableSize > 0 { + bfOpt.PTableSize = ucfg.PTableSize + } + } + + // Wire the broflake connection callback into the radiance event + // bus so the Flutter globe (and any future abuse aggregation) + // sees consumer connect/disconnect. + // + // Cancellation drain: broflake's per-worker connection-change + // goroutines can fire callbacks concurrently with ui.Stop, and + // the broflake API doesn't promise no-callbacks-after-Stop. + // Check ctx.Err() at the top so callbacks delivered after stop + // signals cancel — but before broflake's internal teardown + // drained — short-circuit instead of pushing a stale connection + // event onto the bus after the consumer thinks Unbounded is + // off. broflake exposes no registration point we could + // disarm directly (the callback IS the registration), so the + // inline ctx check is the next-best place. + bfOpt.OnConnectionChangeFunc = func(state int, workerIdx int, addr net.IP) { + if ctx.Err() != nil { + return + } + addrStr := "" + if addr != nil { + addrStr = addr.String() + } + slog.Debug("Unbounded: consumer connection change", + "state", state, "workerIdx", workerIdx, "source", addrStr) + events.Emit(ConnectionEvent{ + State: state, + Source: addrStr, + Timestamp: time.Now().UnixMilli(), + }) + } + + rtcOpt := clientcore.NewDefaultWebRTCOptions() + if ucfg != nil { + if ucfg.DiscoverySrv != "" { + rtcOpt.DiscoverySrv = ucfg.DiscoverySrv + } + if ucfg.DiscoveryEndpoint != "" { + rtcOpt.Endpoint = ucfg.DiscoveryEndpoint + } + } + + egOpt := clientcore.NewDefaultEgressOptions() + if ucfg != nil { + if ucfg.EgressAddr != "" { + egOpt.Addr = ucfg.EgressAddr + } + if ucfg.EgressEndpoint != "" { + egOpt.Endpoint = ucfg.EgressEndpoint + } + } + + ui, err := newWidget(bfOpt, rtcOpt, egOpt) + if err != nil { + slog.Error("Unbounded: failed to create broflake widget", "error", err) + cancel() + m.mu.Lock() + m.cancel = nil + m.done = nil + m.runningCfg = nil + m.mu.Unlock() + return + } + + slog.Info("Unbounded: broflake widget proxy started") + <-ctx.Done() + slog.Info("Unbounded: stopping broflake widget proxy") + ui.Stop() + m.mu.Lock() + m.cancel = nil + m.done = nil + m.runningCfg = nil + m.mu.Unlock() + slog.Info("Unbounded: broflake widget proxy stopped") + }() +} + +// stop is the internal (no-arg) variant called by Apply and +// applyConfig. It wraps stopCtx with a default timeout so a hung +// ui.Stop doesn't block settings PATCHes or config-event handling +// indefinitely. The timeout error is logged because the call site +// has no useful action to take; subsequent transitions observe +// "already running" until the worker eventually exits. +func (m *unboundedManager) stop() { + ctx, cancel := context.WithTimeout(context.Background(), internalStopTimeout) + defer cancel() + if err := m.stopCtx(ctx, false); err != nil { + slog.Warn("Unbounded: internal stop timed out before worker exited", + "error", err, "timeout", internalStopTimeout) + } +} diff --git a/unbounded/unbounded_test.go b/unbounded/unbounded_test.go new file mode 100644 index 00000000..9b0ff737 --- /dev/null +++ b/unbounded/unbounded_test.go @@ -0,0 +1,742 @@ +package unbounded + +import ( + "context" + "net" + "os" + "sync" + "sync/atomic" + "testing" + "time" + + C "github.com/getlantern/common" + "github.com/getlantern/broflake/clientcore" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/getlantern/radiance/common/settings" + "github.com/getlantern/radiance/config" + "github.com/getlantern/radiance/events" +) + +// TestMain initializes the settings package once for the whole test +// binary. settings.InitSettings is itself a sync.Once-guarded +// installer (it persists the path to k.filePath), so calling it per +// t.TempDir() leaves the settings layer pointing at a directory the +// testing infra has already cleaned up by the time the second test +// runs — every subsequent settings.Set then fails with ENOENT. +// +// os.Exit bypasses deferred calls, so the tmp-dir cleanup is done +// explicitly: capture m.Run's exit code, RemoveAll, then exit with +// that code. A naked defer + os.Exit would silently leak a directory +// per test invocation. +func TestMain(m *testing.M) { + dir, err := os.MkdirTemp("", "radiance-unbounded-settings-*") + if err != nil { + panic(err) + } + if err := settings.InitSettings(dir); err != nil { + os.RemoveAll(dir) + panic(err) + } + code := m.Run() + os.RemoveAll(dir) + os.Exit(code) +} + +// fakeWidget is a stand-in for the live broflake UI. stopBlock, if +// non-nil, lets a test pin Stop() until the test releases it — used +// to drive the "stop must wait for the worker" assertions. +type fakeWidget struct { + stopCalled atomic.Int32 + stopBlock chan struct{} +} + +func (w *fakeWidget) Stop() { + w.stopCalled.Add(1) + if w.stopBlock != nil { + <-w.stopBlock + } +} + +// resetManager swaps the package-level manager + widget factory and +// resets the UnboundedKey setting. Cleanup restores everything so +// tests don't bleed into each other. +func resetManager(t *testing.T, build func(*clientcore.BroflakeOptions, *clientcore.WebRTCOptions, *clientcore.EgressOptions) (widget, error)) { + t.Helper() + require.NoError(t, settings.Set(settings.UnboundedKey, false)) + + prevManager := manager + prevWidget := newWidget + prevInit := initOnce + // armed: true so direct manager.start calls in tests don't bail on + // the disarmed gate. Tests that exercise Stop's disarm behavior + // (TestStopDisarmsManager) flip it explicitly. + manager = &unboundedManager{armed: true} + newWidget = build + initOnce = sync.Once{} + t.Cleanup(func() { + // Wait for any still-live worker on the test's manager to + // exit before swapping the package-level newWidget back. + // Without this, the worker's read of newWidget (when the + // fake created it) races with the cleanup's write at test + // teardown — the race detector flags it even though the + // worker has already finished its call. Tests that leave a + // pinned worker (e.g. TestStopCtx_TimesOut) must release + // it before returning so this wait completes. + manager.mu.Lock() + done := manager.done + manager.mu.Unlock() + if done != nil { + <-done + } + manager = prevManager + newWidget = prevWidget + initOnce = prevInit + _ = settings.Set(settings.UnboundedKey, false) + }) +} + +// waitForRunning polls m.cancel under m.mu until it matches expected, +// primeManager seeds the predicate fields so a direct manager.start() +// in a test will satisfy the shouldStart() check inside the lock: the +// UnboundedKey setting goes true, manager.lastFeatureOn = true, and +// manager.lastCfg = the supplied cfg (or testCfg if nil). Tests that +// exercise the start path directly call this once after resetManager. +// +// The default test cfg populates all four required URL fields so +// cfgUsable passes — a zero-value UnboundedConfig would fail the +// "all fields supplied" gate and start() would bail. +func primeManager(t *testing.T, cfg *C.UnboundedConfig) { + t.Helper() + require.NoError(t, settings.Set(settings.UnboundedKey, true)) + if cfg == nil { + cfg = testCfg() + } + manager.mu.Lock() + manager.lastFeatureOn = true + manager.lastCfg = cfg + manager.mu.Unlock() +} + +// testCfg returns a UnboundedConfig that passes cfgUsable. Use this +// instead of a bare `&C.UnboundedConfig{}` literal in tests; the +// zero-value literal would now fail the runnable predicate. +func testCfg() *C.UnboundedConfig { + return &C.UnboundedConfig{ + DiscoverySrv: "https://discovery.test.example", + DiscoveryEndpoint: "/v1/disco", + EgressAddr: "https://egress.test.example", + EgressEndpoint: "/v1/egress", + } +} + +// or the deadline expires. The start goroutine sets m.cancel under +// m.mu before kicking off the worker, so this is a sufficient signal +// that a transition has been requested — note that for "true" the +// worker's newWidget call may still be pending. +func waitForRunning(t *testing.T, m *unboundedManager, expected bool, dur time.Duration) { + t.Helper() + deadline := time.Now().Add(dur) + for time.Now().Before(deadline) { + m.mu.Lock() + running := m.cancel != nil + m.mu.Unlock() + if running == expected { + return + } + time.Sleep(5 * time.Millisecond) + } + t.Fatalf("waitForRunning: timed out waiting for running=%v", expected) +} + +// waitForCount polls an int32 atomic until it equals want, or the +// deadline expires. Used in place of a flat-sleep when waiting for +// the start goroutine to call newWidget. +func waitForCount(t *testing.T, v *atomic.Int32, want int32, dur time.Duration) { + t.Helper() + deadline := time.Now().Add(dur) + for time.Now().Before(deadline) { + if v.Load() == want { + return + } + time.Sleep(5 * time.Millisecond) + } + t.Fatalf("waitForCount: timed out waiting for count=%d, got %d", want, v.Load()) +} + +func TestShouldStart(t *testing.T) { + resetManager(t, func(*clientcore.BroflakeOptions, *clientcore.WebRTCOptions, *clientcore.EgressOptions) (widget, error) { + return &fakeWidget{}, nil + }) + + tests := []struct { + name string + toggle bool + feature bool + cfg *C.UnboundedConfig + want bool + }{ + {"all off", false, false, nil, false}, + {"toggle only", true, false, nil, false}, + {"feature+cfg, no toggle", false, true, testCfg(), false}, + {"toggle+feature, no cfg", true, true, nil, false}, + {"toggle+cfg, no feature", true, false, testCfg(), false}, + {"all three", true, true, testCfg(), true}, + // Partial cfg (missing required URLs) treated as not-yet-ready + // — broflake's client defaults would otherwise route real + // traffic through upstream-maintainer infra, bypassing the + // server's per-environment endpoint selection. + {"partial cfg, no egress", true, true, + &C.UnboundedConfig{ + DiscoverySrv: "https://d.example", + DiscoveryEndpoint: "/v1/disco", + }, false}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + require.NoError(t, settings.Set(settings.UnboundedKey, tc.toggle)) + manager.mu.Lock() + manager.lastFeatureOn = tc.feature + manager.lastCfg = tc.cfg + got := manager.shouldStart() + manager.mu.Unlock() + assert.Equal(t, tc.want, got) + }) + } +} + +// TestApply_DisabledIsNoop: Apply() returns immediately when the +// local toggle is off, regardless of cached server state. +func TestApply_DisabledIsNoop(t *testing.T) { + starts := atomic.Int32{} + resetManager(t, func(*clientcore.BroflakeOptions, *clientcore.WebRTCOptions, *clientcore.EgressOptions) (widget, error) { + starts.Add(1) + return &fakeWidget{}, nil + }) + + manager.mu.Lock() + manager.lastFeatureOn = true + manager.lastCfg = testCfg() + manager.mu.Unlock() + require.NoError(t, Apply()) + + // Allow any spurious goroutine a beat to land. + time.Sleep(20 * time.Millisecond) + assert.Equal(t, int32(0), starts.Load(), "Apply() must not start widget when toggle is off") +} + +// TestApply_StartsWhenAllConditionsHold: with toggle on + cached +// feature flag + cached config, Apply() spins up exactly one widget. +func TestApply_StartsWhenAllConditionsHold(t *testing.T) { + fw := &fakeWidget{} + starts := atomic.Int32{} + resetManager(t, func(*clientcore.BroflakeOptions, *clientcore.WebRTCOptions, *clientcore.EgressOptions) (widget, error) { + starts.Add(1) + return fw, nil + }) + + require.NoError(t, settings.Set(settings.UnboundedKey, true)) + manager.mu.Lock() + manager.lastFeatureOn = true + manager.lastCfg = testCfg() + manager.mu.Unlock() + require.NoError(t, Apply()) + + waitForRunning(t, manager, true, 1*time.Second) + waitForCount(t, &starts, 1, 1*time.Second) + + // Double-Apply should not start a second widget. + require.NoError(t, Apply()) + time.Sleep(20 * time.Millisecond) + assert.Equal(t, int32(1), starts.Load()) + + // Tear down. + require.NoError(t, settings.Set(settings.UnboundedKey, false)) + require.NoError(t, Apply()) + waitForRunning(t, manager, false, 1*time.Second) + assert.Equal(t, int32(1), fw.stopCalled.Load()) +} + +// TestStop_WaitsForWorker: stop() blocks until the worker's ui.Stop +// returns. Pin ui.Stop with stopBlock and observe stop()'s wait. +func TestStop_WaitsForWorker(t *testing.T) { + block := make(chan struct{}) + fw := &fakeWidget{stopBlock: block} + resetManager(t, func(*clientcore.BroflakeOptions, *clientcore.WebRTCOptions, *clientcore.EgressOptions) (widget, error) { + return fw, nil + }) + + require.NoError(t, settings.Set(settings.UnboundedKey, true)) + manager.mu.Lock() + manager.lastFeatureOn = true + manager.lastCfg = testCfg() + manager.mu.Unlock() + require.NoError(t, Apply()) + waitForRunning(t, manager, true, 1*time.Second) + + stopReturned := make(chan struct{}) + go func() { + manager.stop() + close(stopReturned) + }() + select { + case <-stopReturned: + t.Fatal("stop() returned before fake widget's Stop unblocked") + case <-time.After(100 * time.Millisecond): + } + close(block) + select { + case <-stopReturned: + case <-time.After(1 * time.Second): + t.Fatal("stop() did not return after fake widget's Stop unblocked") + } + assert.Equal(t, int32(1), fw.stopCalled.Load()) +} + +// TestStartDuringStop_NoOverlap pins the transitionMu invariant: at +// any instant, at most one broflake widget is between newWidget and +// Stop's return. If stop() merely signalled cancel and returned +// (without holding transitionMu through the wait on done), a +// concurrent start() could observe m.cancel == nil and bring up a +// second widget while the first is still inside ui.Stop. The test +// exercises the manager directly (skipping Apply's predicate check) +// because the property being verified is local to start/stop +// serialization — two widgets alive simultaneously means +// transitionMu failed to serialize. +func TestStartDuringStop_NoOverlap(t *testing.T) { + var ( + liveCount atomic.Int32 + maxLive atomic.Int32 + stopGate = make(chan struct{}) + ) + resetManager(t, func(*clientcore.BroflakeOptions, *clientcore.WebRTCOptions, *clientcore.EgressOptions) (widget, error) { + v := liveCount.Add(1) + if v > maxLive.Load() { + maxLive.Store(v) + } + return &countingWidget{onStop: func() { + <-stopGate + liveCount.Add(-1) + }}, nil + }) + + // Prime predicate so direct manager.start calls satisfy + // shouldStart() inside the lock, then start the first widget. + primeManager(t, nil) + manager.start() + waitForCount(t, &liveCount, 1, 1*time.Second) + + // Kick off a stop — it'll block on stopGate inside the fake's + // onStop until we release it. + stopDone := make(chan struct{}) + go func() { + manager.stop() + close(stopDone) + }() + + // Once the stop is in flight (worker has received cancel and + // entered onStop), launch a concurrent start. transitionMu must + // hold this until the prior stop returns. + time.Sleep(50 * time.Millisecond) + startDone := make(chan struct{}) + go func() { + manager.start() + close(startDone) + }() + + // Neither should have completed yet. + time.Sleep(50 * time.Millisecond) + select { + case <-stopDone: + t.Fatal("stop returned before stopGate released") + case <-startDone: + t.Fatal("start returned before prior stop completed") + default: + } + + // Release the first widget's Stop. stop() returns, transitionMu + // frees, start() acquires it and creates widget #2. + close(stopGate) + select { + case <-stopDone: + case <-time.After(1 * time.Second): + t.Fatal("stop did not return after gate release") + } + select { + case <-startDone: + case <-time.After(1 * time.Second): + t.Fatal("start did not return after stop completed") + } + + // Widget #2 should now be live. + waitForCount(t, &liveCount, 1, 1*time.Second) + require.Equal(t, int32(1), maxLive.Load(), "two widgets ran concurrently — transitionMu failed") + + // Cleanup: tear down widget #2. stopGate is already closed, so + // the worker's Stop returns immediately. + manager.stop() + waitForCount(t, &liveCount, 0, 1*time.Second) +} + +// TestInitSubscription_SeedsCachedConfig: passing a non-nil initial +// config to InitSubscription kicks off the same applyConfig path +// the live event subscriber takes. With all three conditions met, +// the widget should auto-start without a fresh NewConfigEvent. +func TestInitSubscription_SeedsCachedConfig(t *testing.T) { + starts := atomic.Int32{} + fw := &fakeWidget{} + resetManager(t, func(*clientcore.BroflakeOptions, *clientcore.WebRTCOptions, *clientcore.EgressOptions) (widget, error) { + starts.Add(1) + return fw, nil + }) + + require.NoError(t, settings.Set(settings.UnboundedKey, true)) + cfg := &config.Config{ + Features: map[string]bool{C.UNBOUNDED: true}, + Unbounded: testCfg(), + } + InitSubscription(cfg) + + waitForRunning(t, manager, true, 1*time.Second) + waitForCount(t, &starts, 1, 1*time.Second) + + // Cleanup. + require.NoError(t, settings.Set(settings.UnboundedKey, false)) + require.NoError(t, Apply()) + waitForRunning(t, manager, false, 1*time.Second) +} + +// TestInitSubscription_FutureEventStillFires: even with a nil +// initial, the subscription still reacts to a subsequent +// NewConfigEvent — confirms the seed didn't replace the live path. +func TestInitSubscription_FutureEventStillFires(t *testing.T) { + starts := atomic.Int32{} + resetManager(t, func(*clientcore.BroflakeOptions, *clientcore.WebRTCOptions, *clientcore.EgressOptions) (widget, error) { + starts.Add(1) + return &fakeWidget{}, nil + }) + + require.NoError(t, settings.Set(settings.UnboundedKey, true)) + InitSubscription(nil) + time.Sleep(20 * time.Millisecond) + assert.Equal(t, int32(0), starts.Load(), "should not start with no cached config") + + // Fire a NewConfigEvent with all three conditions satisfied. + cfg := config.Config{ + Features: map[string]bool{C.UNBOUNDED: true}, + Unbounded: testCfg(), + } + events.Emit(config.NewConfigEvent{New: &cfg}) + waitForRunning(t, manager, true, 1*time.Second) + waitForCount(t, &starts, 1, 1*time.Second) + + // Cleanup. + require.NoError(t, settings.Set(settings.UnboundedKey, false)) + require.NoError(t, Apply()) + waitForRunning(t, manager, false, 1*time.Second) +} + +// TestApplyConfig_RestartsOnParamChange: broflake consumes its +// options once in clientcore.NewBroflake. A server-side config +// change while the widget is alive must therefore tear down the +// current worker and bring up a new one — otherwise the running +// proxy stays on stale discovery/egress endpoints. applyConfig +// compares the new cfg against runningCfg (the snapshot the worker +// was started with) and triggers stop+start when they differ, +// provided the three-condition predicate still holds. +func TestApplyConfig_RestartsOnParamChange(t *testing.T) { + starts := atomic.Int32{} + resetManager(t, func(*clientcore.BroflakeOptions, *clientcore.WebRTCOptions, *clientcore.EgressOptions) (widget, error) { + starts.Add(1) + return &fakeWidget{}, nil + }) + + require.NoError(t, settings.Set(settings.UnboundedKey, true)) + + cfgA := testCfg() + cfgB := testCfg() + cfgB.DiscoverySrv = "https://b.example/" + + // First config — bring up widget #1. + applyConfig(config.Config{ + Features: map[string]bool{C.UNBOUNDED: true}, + Unbounded: cfgA, + }) + waitForRunning(t, manager, true, 1*time.Second) + waitForCount(t, &starts, 1, 1*time.Second) + + // Same config — no restart. + applyConfig(config.Config{ + Features: map[string]bool{C.UNBOUNDED: true}, + Unbounded: testCfg(), // value-equal to cfgA + }) + time.Sleep(50 * time.Millisecond) + assert.Equal(t, int32(1), starts.Load(), "identical config must not restart") + + // Changed config — restart with new params. + applyConfig(config.Config{ + Features: map[string]bool{C.UNBOUNDED: true}, + Unbounded: cfgB, + }) + waitForCount(t, &starts, 2, 2*time.Second) + + // Cleanup. + require.NoError(t, settings.Set(settings.UnboundedKey, false)) + require.NoError(t, Apply()) + waitForRunning(t, manager, false, 1*time.Second) +} + +// TestStopDisarmsManager verifies the public-Stop shutdown +// contract: after Stop returns, NO subsequent start path may revive +// the widget — not Apply, not applyConfig from a late config event, +// not manager.start directly. The armed gate enforces this; +// transitionMu alone is not enough because it just serializes +// transitions, it doesn't decide whether a queued one should still +// proceed after a shutdown. Without this gate, a config event +// firing during Stop's wait-for-worker window would block at +// transitionMu and then start a fresh widget the moment Stop's +// caller returned from LocalBackend.Close, silently keeping +// broflake alive past the documented shutdown contract. +func TestStopDisarmsManager(t *testing.T) { + starts := atomic.Int32{} + resetManager(t, func(*clientcore.BroflakeOptions, *clientcore.WebRTCOptions, *clientcore.EgressOptions) (widget, error) { + starts.Add(1) + return &fakeWidget{}, nil + }) + + // Bring up a widget so Stop has something to tear down. + primeManager(t, nil) + manager.start() + waitForCount(t, &starts, 1, 1*time.Second) + + require.NoError(t, Stop(context.Background())) + manager.mu.Lock() + armed := manager.armed + manager.mu.Unlock() + require.False(t, armed, "Stop must disarm the manager") + + // All three start paths must be no-ops now. Predicate is still + // satisfied (toggle, feature, cfg) — only armed gates the start. + primeManager(t, nil) + + require.NoError(t, Apply()) + applyConfig(config.Config{ + Features: map[string]bool{C.UNBOUNDED: true}, + Unbounded: testCfg(), + }) + manager.start() + time.Sleep(50 * time.Millisecond) + assert.Equal(t, int32(1), starts.Load(), "no start path may revive the widget post-Stop") + + // A fresh InitSubscription re-arms — Start-after-Close path. + initOnce = sync.Once{} // simulate re-arm path; InitSubscription's once guards re-subscription + InitSubscription(&config.Config{ + Features: map[string]bool{C.UNBOUNDED: true}, + Unbounded: testCfg(), + }) + waitForCount(t, &starts, 2, 1*time.Second) + + // Cleanup. + require.NoError(t, Stop(context.Background())) +} + +// TestStartAfterStopWaiting_NoRevival: pin Stop in its <-done wait +// (worker can't exit because its Stop is blocked), then race a +// config-event-driven start against the disarm. Confirms that even a +// start queued at transitionMu while Stop is waiting bails out after +// transitionMu releases — the armed re-check inside start catches it. +func TestStartAfterStopWaiting_NoRevival(t *testing.T) { + block := make(chan struct{}) + starts := atomic.Int32{} + resetManager(t, func(*clientcore.BroflakeOptions, *clientcore.WebRTCOptions, *clientcore.EgressOptions) (widget, error) { + starts.Add(1) + return &fakeWidget{stopBlock: block}, nil + }) + + primeManager(t, nil) + manager.start() + waitForCount(t, &starts, 1, 1*time.Second) + + // Stop in a goroutine — it'll set armed=false, signal cancel, + // then block on <-done waiting for the worker, which is in turn + // pinned by stopBlock. + stopDone := make(chan struct{}) + go func() { + _ = Stop(context.Background()) + close(stopDone) + }() + + // Give Stop a beat to acquire transitionMu and disarm. + time.Sleep(50 * time.Millisecond) + manager.mu.Lock() + armed := manager.armed + manager.mu.Unlock() + require.False(t, armed, "Stop must disarm before waiting on done") + + // Now race a start. It blocks at transitionMu until Stop returns. + startDone := make(chan struct{}) + go func() { + manager.start() + close(startDone) + }() + time.Sleep(20 * time.Millisecond) + select { + case <-startDone: + t.Fatal("start completed before Stop unblocked") + default: + } + + // Release the worker so Stop returns. + close(block) + select { + case <-stopDone: + case <-time.After(1 * time.Second): + t.Fatal("Stop did not return after worker released") + } + select { + case <-startDone: + case <-time.After(1 * time.Second): + t.Fatal("start did not return after Stop returned") + } + + // Critically: starts must still be 1 — the queued start saw + // armed=false and bailed. + assert.Equal(t, int32(1), starts.Load(), "start queued during Stop must not revive the widget") +} + +// TestStopCtx_TimesOut: Stop(ctx) returns ctx.Err() when the worker +// is still mid-shutdown past the deadline. The worker is left to +// exit on its own schedule afterwards. +func TestStopCtx_TimesOut(t *testing.T) { + block := make(chan struct{}) + fw := &fakeWidget{stopBlock: block} + resetManager(t, func(*clientcore.BroflakeOptions, *clientcore.WebRTCOptions, *clientcore.EgressOptions) (widget, error) { + return fw, nil + }) + + require.NoError(t, settings.Set(settings.UnboundedKey, true)) + manager.mu.Lock() + manager.lastFeatureOn = true + manager.lastCfg = testCfg() + manager.mu.Unlock() + require.NoError(t, Apply()) + waitForRunning(t, manager, true, 1*time.Second) + + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + err := Stop(ctx) + assert.ErrorIs(t, err, context.DeadlineExceeded) + + // Release the worker so the test exits cleanly. + close(block) +} + +// TestConnectionEventBridge pins the observable API this package +// adds: broflake's OnConnectionChangeFunc callback must translate +// (state, workerIdx, addr) into a ConnectionEvent with the matching +// State, the addr.String() Source (empty when addr is nil), and a +// freshly-stamped Timestamp. Capture the callback that start +// installs on bfOpt via a fake newWidget, invoke it with both nil +// and non-nil addrs, then assert the events arriving via +// events.Subscribe. +func TestConnectionEventBridge(t *testing.T) { + var captured atomic.Pointer[clientcore.ConnectionChangeFunc] + resetManager(t, func(bfOpt *clientcore.BroflakeOptions, _ *clientcore.WebRTCOptions, _ *clientcore.EgressOptions) (widget, error) { + cb := bfOpt.OnConnectionChangeFunc + captured.Store(&cb) + return &fakeWidget{}, nil + }) + + primeManager(t, nil) + manager.start() + // Worker may still be inside the goroutine setup when start + // returns; wait until newWidget has been called and the callback + // captured. + deadline := time.Now().Add(1 * time.Second) + for captured.Load() == nil && time.Now().Before(deadline) { + time.Sleep(5 * time.Millisecond) + } + require.NotNil(t, captured.Load(), "newWidget was never invoked") + cb := *captured.Load() + require.NotNil(t, cb, "OnConnectionChangeFunc must be installed on bfOpt") + + // Buffered enough that events.Emit's per-callback goroutines + // can deposit before the test reads. + ch := make(chan ConnectionEvent, 4) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + events.SubscribeContext(ctx, func(evt ConnectionEvent) { ch <- evt }) + + before := time.Now().UnixMilli() + cb(1, 7, net.ParseIP("198.51.100.42")) + cb(-1, 7, nil) + after := time.Now().UnixMilli() + + // events.Emit dispatches each subscriber on a per-callback + // goroutine, so the two events can arrive in either order. + // Assert set-membership keyed by State (unique in this test) + // rather than positional equality. + byState := make(map[int]ConnectionEvent, 2) + for i := 0; i < 2; i++ { + select { + case evt := <-ch: + byState[evt.State] = evt + case <-time.After(1 * time.Second): + t.Fatalf("timed out waiting for ConnectionEvent #%d", i+1) + } + } + + require.Contains(t, byState, 1, "expected an accept event (State=+1)") + require.Contains(t, byState, -1, "expected a close event (State=-1)") + assert.Equal(t, "198.51.100.42", byState[1].Source, "accept Source") + assert.Equal(t, "", byState[-1].Source, "close Source (nil addr -> empty string)") + for state, evt := range byState { + assert.GreaterOrEqual(t, evt.Timestamp, before, "State=%d Timestamp not before emit", state) + assert.LessOrEqual(t, evt.Timestamp, after, "State=%d Timestamp not after emit", state) + } + + // Cleanup. + require.NoError(t, Stop(context.Background())) +} + +// TestInternalStop_TimesOut: internal stop (called by Apply and +// applyConfig) must not block forever when ui.Stop hangs. If it +// did, toggling Unbounded off via PatchSettings or a disabling +// config event would block the caller indefinitely while holding +// transitionMu. Pin ui.Stop with stopBlock past a short +// internalStopTimeout and confirm Apply returns within a sane +// budget. +func TestInternalStop_TimesOut(t *testing.T) { + prevTimeout := internalStopTimeout + internalStopTimeout = 50 * time.Millisecond + t.Cleanup(func() { internalStopTimeout = prevTimeout }) + + block := make(chan struct{}) + resetManager(t, func(*clientcore.BroflakeOptions, *clientcore.WebRTCOptions, *clientcore.EgressOptions) (widget, error) { + return &fakeWidget{stopBlock: block}, nil + }) + + primeManager(t, nil) + require.NoError(t, Apply()) + waitForRunning(t, manager, true, 1*time.Second) + + // Flip toggle off so Apply's !Enabled branch calls manager.stop. + // With the worker pinned by stopBlock, internal stop must time + // out rather than hang. Apply itself returns nil — the timeout + // is logged but not propagated. + require.NoError(t, settings.Set(settings.UnboundedKey, false)) + applyReturned := make(chan struct{}) + go func() { + _ = Apply() + close(applyReturned) + }() + select { + case <-applyReturned: + case <-time.After(2 * time.Second): + t.Fatal("Apply blocked past timeout — internal stop is not context-bounded") + } + + // Release the worker so the test's cleanup wait completes. + close(block) +}