Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions backend/radiance.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/getlantern/radiance/servers"
"github.com/getlantern/radiance/telemetry"
"github.com/getlantern/radiance/traces"
"github.com/getlantern/radiance/unbounded"
"github.com/getlantern/radiance/vpn"

"github.com/sagernet/sing-box/adapter"
Expand Down Expand Up @@ -232,6 +233,18 @@ func (r *LocalBackend) Start() {

r.resumePeerShareIfEnabled()

// Wire the broflake / Unbounded widget proxy lifecycle to config
// updates. This single subscription handles all three start/stop
// triggers (local toggle, server feature flag, server-supplied
// config); InitSubscription is sync.Once-guarded so a future Start
// retry after Close won't double-subscribe.
//
// Seed with the already-cached config (loaded from disk before
// Start runs) so an opted-in user auto-starts the widget on
// launch instead of waiting for the next config refresh.
cachedCfg, _ := r.confHandler.GetConfig()
unbounded.InitSubscription(cachedCfg)

// set country code in settings when new config is received so it can be included in issue reports
events.SubscribeOnce(func(evt config.NewConfigEvent) {
if env.GetString(env.Country) != "" {
Expand Down Expand Up @@ -318,6 +331,17 @@ func (r *LocalBackend) Close() {
r.closeOnce.Do(func() {
slog.Debug("Closing Radiance")
r.closePeerClient()
// unbounded.start spawns its worker on a context.Background-
// derived ctx (it has to outlive any single NewConfigEvent),
// so Close has to explicitly tell it to shut down — otherwise
// the broflake widget goroutine survives backend close and
// leaks until process exit. Use a fresh ctx so a cancelled
// shutdown path doesn't skip the Stop.
stopCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if err := unbounded.Stop(stopCtx); err != nil {
Comment thread
myleshorton marked this conversation as resolved.
slog.Warn("unbounded stop on backend close returned error", "error", err)
}
cancel()
// vpnClient is always set in production via NewLocalBackend, but
// peer-focused unit tests construct partial LocalBackends without
// one. Guard the call so Close stays robust under those paths
Expand Down Expand Up @@ -502,6 +526,19 @@ func (r *LocalBackend) PatchSettings(updates settings.Settings) error {
}
}

// Drive the Unbounded widget proxy off the toggle change immediately
// rather than waiting for the next NewConfigEvent to re-evaluate.
// settings.Patch above has already persisted the new value, so go
// straight to Apply() — SetEnabled would short-circuit on the
// already-matching persisted value and never re-evaluate the
// manager. Apply re-checks the three-condition predicate against
// the cached server-side state and starts or stops accordingly.
if _, ok := diff[settings.UnboundedKey]; ok {
if err := unbounded.Apply(); err != nil {
slog.Warn("unbounded apply failed", "error", err)
}
Comment thread
myleshorton marked this conversation as resolved.
}

return nil
}

Expand Down
26 changes: 26 additions & 0 deletions backend/radiance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/getlantern/radiance/common/settings"
"github.com/getlantern/radiance/peer"
"github.com/getlantern/radiance/unbounded"
)

func TestBackend(t *testing.T) {}
Expand Down Expand Up @@ -216,3 +217,28 @@ func TestPatchSettings_PeerShareDispatches(t *testing.T) {
assert.Equal(t, int64(1), fake.stopCalls.Load())
assert.False(t, fake.IsActive())
}

// Verify PatchSettings routes UnboundedKey to unbounded.Apply via the
// SetApplyHookForTest hook. A typo on the diff key or a removal of
// the Apply call would silently leave the Unbounded toggle persisted
// but inert. The hook fires regardless of the Enabled() gate inside
// Apply, so this catches the dispatch even though we don't prime the
// rest of the manager state.
func TestPatchSettings_UnboundedDispatches(t *testing.T) {
r := newPeerTestBackend(t, &fakePeerController{})

var applyCalls atomic.Int32
unbounded.SetApplyHookForTest(func() { applyCalls.Add(1) })
t.Cleanup(func() { unbounded.SetApplyHookForTest(nil) })

require.NoError(t, r.PatchSettings(settings.Settings{settings.UnboundedKey: true}))
assert.Equal(t, int32(1), applyCalls.Load(), "PatchSettings({UnboundedKey: true}) must dispatch to unbounded.Apply")

require.NoError(t, r.PatchSettings(settings.Settings{settings.UnboundedKey: false}))
assert.Equal(t, int32(2), applyCalls.Load(), "PatchSettings({UnboundedKey: false}) must dispatch to unbounded.Apply")

// A PATCH without UnboundedKey must NOT trigger Apply — confirms
// the diff check is in place rather than always firing.
require.NoError(t, r.PatchSettings(settings.Settings{settings.PeerShareEnabledKey: false}))
assert.Equal(t, int32(2), applyCalls.Load(), "PatchSettings without UnboundedKey must not dispatch to unbounded.Apply")
}
8 changes: 8 additions & 0 deletions common/settings/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ const (
// Advanced setting in the Share My Connection UI for users on
// networks where UPnP is disabled or unavailable.
PeerManualPortKey _key = "peer_manual_port" // int (0 = unset; 1..65535 = manual port)
// UnboundedKey is the local opt-in for the broflake / Unbounded
// widget proxy. When true AND the server-side Features[unbounded]
// flag is on AND the server provides UnboundedConfig (discovery
// + egress URLs), the widget proxy starts. Surfaced as a "Basic
// mode" option in the Share My Connection UI for networks where
// UPnP isn't workable but the user still wants to contribute via
// the WebRTC-based donor path.
UnboundedKey _key = "unbounded" // bool
Comment thread
myleshorton marked this conversation as resolved.
SelectedServerKey _key = "selected_server" // [servers.Server] Server.Options is not stored

PreferredLocationKey _key = "preferred_location" // [common.PreferredLocation]
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ require (
github.com/alexflint/go-arg v1.6.1
github.com/alitto/pond v1.9.2
github.com/getlantern/amp v0.0.0-20260305201851-782bc8045e58
github.com/getlantern/broflake v0.0.0-20260504215251-ed3cf75062d1
github.com/getlantern/common v1.2.1-0.20260326210434-cb69537aaf46
github.com/getlantern/dnstt v0.0.0-20260112160750-05100563bd0d
github.com/getlantern/domainfront v0.0.0-20260419161617-0bff0b2169f4
Expand Down Expand Up @@ -113,7 +114,6 @@ require (
github.com/gaissmai/bart v0.11.1 // indirect
github.com/gaukas/wazerofs v0.1.0 // indirect
github.com/getlantern/algeneva v0.0.0-20250307163401-1824e7b54f52 // indirect
github.com/getlantern/broflake v0.0.0-20260504215251-ed3cf75062d1 // indirect
github.com/getlantern/lantern-water v0.0.0-20260520145825-958775d51395 // indirect
github.com/getlantern/samizdat v0.0.3-0.20260529191731-5ea8ae61ddbf // indirect
github.com/go-chi/chi/v5 v5.2.2 // indirect
Expand Down
45 changes: 39 additions & 6 deletions ipc/client_events_mobile.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/getlantern/radiance/config"
"github.com/getlantern/radiance/events"
"github.com/getlantern/radiance/peer"
"github.com/getlantern/radiance/unbounded"
"github.com/getlantern/radiance/vpn"
)

Expand Down Expand Up @@ -62,10 +63,15 @@ func (c *Client) DataCapStream(ctx context.Context, handler func(account.DataCap
return c.dataCapStream(ctx, handler)
}

// PeerStatusEvents — see client_events_nonmobile.go for the full
// docstring. Mobile builds may share a process with radiance (localOnly)
// in which case events.SubscribeContext delivers directly; otherwise the
// SSE retry loop matches the desktop path.
// PeerStatusEvents streams peer-share lifecycle phase changes (mapping_port
// → registering → verifying → serving on Start, stopping → idle on Stop,
// error on failure). Each frame is a peer.StatusEvent JSON whose .Status
// is the live snapshot at the moment the event fired — consumers SHOULD
// re-render on every frame rather than diffing, since events.Emit's
// per-callback goroutine can land Start phases out of order. Mobile builds
// may share a process with radiance (localOnly), in which case
// events.SubscribeContext delivers directly; otherwise the SSE retry loop
// is used. Blocks until ctx is cancelled.
func (c *Client) PeerStatusEvents(ctx context.Context, handler func(peer.StatusEvent)) error {
events.SubscribeContext(ctx, handler)
if c.localOnly {
Expand All @@ -80,8 +86,12 @@ func (c *Client) PeerStatusEvents(ctx context.Context, handler func(peer.StatusE
})
}

// PeerConnectionEvents — see client_events_nonmobile.go for the full
// docstring. Same mobile dual-path as PeerStatusEvents.
// PeerConnectionEvents streams accept/close events for the local
// samizdat-in inbound. State is +1 on accept and -1 on close; Source is
// the remote "ip:port" string for geo-lookup / abuse attribution.
// Same mobile dual-path as PeerStatusEvents (localOnly delivers via
// the in-process event bus; otherwise the SSE retry loop is used).
// Blocks until ctx is cancelled.
func (c *Client) PeerConnectionEvents(ctx context.Context, handler func(peer.ConnectionEvent)) error {
events.SubscribeContext(ctx, handler)
if c.localOnly {
Expand All @@ -95,3 +105,26 @@ func (c *Client) PeerConnectionEvents(ctx context.Context, handler func(peer.Con
}
})
}

// UnboundedConnectionEvents streams accept/close events for the local
// broflake widget proxy ("Unbounded" / Basic mode). The JSON shape
// matches peer.ConnectionEvent but the Go type is distinct — in-process
// subscribers must subscribe to both event types separately to see all
// peer activity. State is +1 on consumer accept, -1 on close; Source
// is the consumer's IP if broflake exposes it, otherwise empty. Same
// mobile dual-path: localOnly subscribes directly to the in-process
// event bus; otherwise the SSE retry loop is used. Blocks until ctx
// is cancelled.
func (c *Client) UnboundedConnectionEvents(ctx context.Context, handler func(unbounded.ConnectionEvent)) error {
events.SubscribeContext(ctx, handler)
if c.localOnly {
<-ctx.Done()
return ctx.Err()
}
return c.sseRetryLoop(ctx, unboundedConnectionEventsEndpoint, func(data []byte) {
var evt unbounded.ConnectionEvent
if err := json.Unmarshal(data, &evt); err == nil {
handler(evt)
}
})
}
17 changes: 17 additions & 0 deletions ipc/client_events_nonmobile.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/getlantern/radiance/account"
"github.com/getlantern/radiance/peer"
"github.com/getlantern/radiance/unbounded"
"github.com/getlantern/radiance/vpn"
)

Expand Down Expand Up @@ -74,3 +75,19 @@ func (c *Client) PeerConnectionEvents(ctx context.Context, handler func(peer.Con
}
})
}

// UnboundedConnectionEvents streams accept/close events for the
// local broflake widget proxy ("Unbounded" / Basic mode). The JSON
// shape matches peer.ConnectionEvent but the Go type is distinct —
// in-process subscribers must subscribe to both event types separately
// to see all peer activity. State is +1 on consumer accept, -1 on
// close; Source is the consumer's IP if broflake exposes it,
// otherwise empty. Blocks until ctx is cancelled.
func (c *Client) UnboundedConnectionEvents(ctx context.Context, handler func(unbounded.ConnectionEvent)) error {
return c.sseRetryLoop(ctx, unboundedConnectionEventsEndpoint, func(data []byte) {
var evt unbounded.ConnectionEvent
if err := json.Unmarshal(data, &evt); err == nil {
handler(evt)
}
})
}
46 changes: 46 additions & 0 deletions ipc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/getlantern/radiance/events"
rlog "github.com/getlantern/radiance/log"
"github.com/getlantern/radiance/peer"
"github.com/getlantern/radiance/unbounded"
"github.com/getlantern/radiance/vpn"

sjson "github.com/sagernet/sing/common/json"
Expand Down Expand Up @@ -69,6 +70,11 @@ const (
peerStatusEventsEndpoint = "/peer/status/events"
peerConnectionEventsEndpoint = "/peer/connection/events"

// Unbounded ("Basic mode") endpoints. Connection events have the same
// JSON shape as /peer/connection/events but are emitted from a
// different in-process event type, so they need their own SSE bridge.
unboundedConnectionEventsEndpoint = "/unbounded/connection/events"

// Split tunnel endpoint
splitTunnelEndpoint = "/split-tunnel"

Expand Down Expand Up @@ -236,6 +242,7 @@ func newLocalAPI(b *backend.LocalBackend, withAuth bool) *localapi {
// SSE skips the tracer middleware since it buffers the entire response body.
mux.HandleFunc("GET "+peerStatusEventsEndpoint, s.peerStatusEventsHandler)
mux.HandleFunc("GET "+peerConnectionEventsEndpoint, s.peerConnectionEventsHandler)
mux.HandleFunc("GET "+unboundedConnectionEventsEndpoint, s.unboundedConnectionEventsHandler)
Comment thread
myleshorton marked this conversation as resolved.

// Split tunnel
mux.HandleFunc(splitTunnelEndpoint, traced(s.splitTunnelHandler))
Expand Down Expand Up @@ -567,6 +574,45 @@ func (s *localapi) peerConnectionEventsHandler(w http.ResponseWriter, r *http.Re
}
}

// unboundedConnectionEventsHandler streams unbounded.ConnectionEvent over SSE
// for each consumer accept/disconnect on the broflake widget proxy. Mirrors
// peerConnectionEventsHandler — the JSON shape is identical, but events.Emit
// dispatches by concrete Go type so the two streams need separate
// subscriptions. Cross-process consumers (Flutter via Liblantern) merge the
// two SSE endpoints client-side into one peer-connection feed for the globe.
func (s *localapi) unboundedConnectionEventsHandler(w http.ResponseWriter, r *http.Request) {
flusher := sseWriter(w)
if flusher == nil {
return
}
queue := make(chan unbounded.ConnectionEvent, 64)
sub := events.Subscribe(func(evt unbounded.ConnectionEvent) {
select {
case queue <- evt:
default:
// queue full — drop. Same rationale as the peer handler:
// better to lose this event than back up events.Emit.
}
})
defer sub.Unsubscribe()

for {
select {
case evt := <-queue:
data, err := json.Marshal(evt)
if err != nil {
continue
}
if _, err := fmt.Fprintf(w, "data: %s\n\n", data); err != nil {
return
}
flusher.Flush()
case <-r.Context().Done():
return
}
}
}

///////////////////////
// Server selection //
///////////////////////
Expand Down
16 changes: 16 additions & 0 deletions unbounded/test_helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package unbounded

// countingWidget is a fakeWidget variant whose Stop runs a caller-
// supplied callback before returning. The callback lets the test
// observe shutdown ordering — typically by decrementing a live-
// widget counter so the test can assert that at most one widget is
// alive across a stop/start transition.
type countingWidget struct {
onStop func()
}

func (w *countingWidget) Stop() {
if w.onStop != nil {
w.onStop()
}
}
Loading
Loading