Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 30 additions & 5 deletions events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,35 @@ func (e *Subscription[T]) Unsubscribe() {
// asynchronously in separate goroutines.
func Emit[T Event](evt T) {
subscriptionsMu.RLock()
defer subscriptionsMu.RUnlock()
if subs, ok := subscriptions[reflect.TypeFor[T]()]; ok {
for _, cb := range subs {
go cb(evt)
}
key := reflect.TypeFor[T]()
subs, ok := subscriptions[key]
subscriptionsMu.RUnlock()
// Diagnostic hook; default no-op so high-frequency event types don't
// flood logs in prod. Tests / debugging swap in a real logger via
// SetEmitDebugLogger. Called after releasing subscriptionsMu so a
// blocking logger can't amplify lock contention on hot event types.
emitDebugLogger(key, len(subs))
if !ok {
return
}
for _, cb := range subs {
go cb(evt)
}
}
Comment thread
myleshorton marked this conversation as resolved.

// emitDebugLogger is invoked once per Emit with the event type and
// current subscriber count. Default is a no-op; callers (tests,
// diagnostic builds) swap in a real logger via SetEmitDebugLogger.
var emitDebugLogger = func(reflect.Type, int) {}

// SetEmitDebugLogger replaces the no-op diagnostic hook for the
// duration of an investigation (e.g., tracking "events vanish" paths).
// Pass nil to restore the no-op default. Safe to call from main /
// init; not safe to call concurrently with Emit on the hot path.
func SetEmitDebugLogger(fn func(eventType reflect.Type, subscriberCount int)) {
if fn == nil {
emitDebugLogger = func(reflect.Type, int) {}
return
}
emitDebugLogger = fn
}
Comment thread
myleshorton marked this conversation as resolved.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ require (
github.com/getlantern/domainfront v0.0.0-20260419161617-0bff0b2169f4
github.com/getlantern/keepcurrent v0.0.0-20260422161259-54a4d9a93694
github.com/getlantern/kindling v0.0.0-20260529141244-21f8b144afab
github.com/getlantern/lantern-box v0.0.86
github.com/getlantern/lantern-box v0.0.87-0.20260529195337-0b63c0f42962
github.com/getlantern/pluriconfig v0.0.0-20251126214241-8cc8bc561535
github.com/getlantern/publicip v0.0.0-20260328175246-2c460fe80c6b
github.com/getlantern/semconv v0.0.0-20260327040646-21845dda05cb
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,8 @@ github.com/getlantern/keepcurrent v0.0.0-20260422161259-54a4d9a93694 h1:iLWm6S/4
github.com/getlantern/keepcurrent v0.0.0-20260422161259-54a4d9a93694/go.mod h1:ag5g9aWUw2FJcX5RVRpJ9EBQBy5yJuy2WXDouIn/m4w=
github.com/getlantern/kindling v0.0.0-20260529141244-21f8b144afab h1:PitYhTvo3oHRKYl4pVAoOIN8bhM+Bw+JBWncMglvHSg=
github.com/getlantern/kindling v0.0.0-20260529141244-21f8b144afab/go.mod h1:TGTxpoNVwc8Be4qkBNtf5oj2psJaEIZEq47GOPS7zkA=
github.com/getlantern/lantern-box v0.0.86 h1:myJa+Crg/oMgqSFhX7DOox4XcVIx8VFiPnkel8x8YT4=
github.com/getlantern/lantern-box v0.0.86/go.mod h1:BVXPyEicSu7m4nQY1OHPkOZNj87M7sYrzmY9AgyiPkc=
github.com/getlantern/lantern-box v0.0.87-0.20260529195337-0b63c0f42962 h1:VSSC7BIn42+tQmhoYg7Wc+ilkXC4SdoJ0LQ6+4kvtC0=
github.com/getlantern/lantern-box v0.0.87-0.20260529195337-0b63c0f42962/go.mod h1:BVXPyEicSu7m4nQY1OHPkOZNj87M7sYrzmY9AgyiPkc=
github.com/getlantern/lantern-water v0.0.0-20260520145825-958775d51395 h1:grfGavAUp2E9w9ZoJuM3FyWyQ0sCJ64V4ZMKtZKRqTc=
github.com/getlantern/lantern-water v0.0.0-20260520145825-958775d51395/go.mod h1:3JpJgwi4KEI6rS9loOAvcBp+F2jP65d0tTg2GQcTPBU=
github.com/getlantern/ops v0.0.0-20231025133620-f368ab734534 h1:3BwvWj0JZzFEvNNiMhCu4bf60nqcIuQpTYb00Ezm1ag=
Expand Down
35 changes: 35 additions & 0 deletions ipc/client_events_mobile.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/getlantern/radiance/account"
"github.com/getlantern/radiance/config"
"github.com/getlantern/radiance/events"
"github.com/getlantern/radiance/peer"
"github.com/getlantern/radiance/vpn"
)

Expand Down Expand Up @@ -60,3 +61,37 @@ func (c *Client) DataCapStream(ctx context.Context, handler func(account.DataCap
}
return c.dataCapStream(ctx, handler)
}

// PeerStatusEvents — see client_events_nonmobile.go for the full
// docstring. Mobile builds may share a process with radiance (localOnly)
// in which case events.SubscribeContext delivers directly; otherwise the
// SSE retry loop matches the desktop path.
func (c *Client) PeerStatusEvents(ctx context.Context, handler func(peer.StatusEvent)) error {
events.SubscribeContext(ctx, handler)
if c.localOnly {
<-ctx.Done()
return ctx.Err()
}
return c.sseRetryLoop(ctx, peerStatusEventsEndpoint, func(data []byte) {
var evt peer.StatusEvent
if err := json.Unmarshal(data, &evt); err == nil {
handler(evt)
}
})
}

// PeerConnectionEvents — see client_events_nonmobile.go for the full
// docstring. Same mobile dual-path as PeerStatusEvents.
func (c *Client) PeerConnectionEvents(ctx context.Context, handler func(peer.ConnectionEvent)) error {
events.SubscribeContext(ctx, handler)
if c.localOnly {
<-ctx.Done()
return ctx.Err()
}
return c.sseRetryLoop(ctx, peerConnectionEventsEndpoint, func(data []byte) {
var evt peer.ConnectionEvent
if err := json.Unmarshal(data, &evt); err == nil {
handler(evt)
}
})
}
34 changes: 34 additions & 0 deletions ipc/client_events_nonmobile.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/json"

"github.com/getlantern/radiance/account"
"github.com/getlantern/radiance/peer"
"github.com/getlantern/radiance/vpn"
)

Expand Down Expand Up @@ -40,3 +41,36 @@ func (c *Client) VPNStatusEvents(ctx context.Context, handler func(vpn.StatusUpd
func (c *Client) DataCapStream(ctx context.Context, handler func(account.DataCapInfo)) error {
return c.dataCapStream(ctx, handler)
}

// PeerStatusEvents streams peer-share lifecycle phase changes (mapping_port
// → registering → verifying → serving on Start, stopping → idle on Stop,
// error on failure). Each frame is a peer.StatusEvent JSON whose .Status
// is the live snapshot at the moment the event fired — consumers SHOULD
// re-render on every frame rather than diffing, since events.Emit's
// per-callback goroutine can land Start phases out of order. Blocks until
// ctx is cancelled.
func (c *Client) PeerStatusEvents(ctx context.Context, handler func(peer.StatusEvent)) error {
return c.sseRetryLoop(ctx, peerStatusEventsEndpoint, func(data []byte) {
var evt peer.StatusEvent
if err := json.Unmarshal(data, &evt); err == nil {
handler(evt)
}
})
}

// PeerConnectionEvents streams accept/close events for the local
// samizdat-in inbound. State is +1 on accept and -1 on close; Source
// is the remote "ip:port" string for geo-lookup / abuse attribution.
// Blocks until ctx is cancelled.
//
// Why this exists alongside events.Subscribe[peer.ConnectionEvent]:
// the events package's globals are process-scoped, so a subscriber in
// Liblantern can't see emits in lanternd. The SSE path bridges them.
func (c *Client) PeerConnectionEvents(ctx context.Context, handler func(peer.ConnectionEvent)) error {
return c.sseRetryLoop(ctx, peerConnectionEventsEndpoint, func(data []byte) {
var evt peer.ConnectionEvent
if err := json.Unmarshal(data, &evt); err == nil {
handler(evt)
}
})
}
55 changes: 53 additions & 2 deletions ipc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ const (
settingsEndpoint = "/settings"

// Peer-share ("Share My Connection") endpoints
peerStatusEndpoint = "/peer/status"
peerStatusEventsEndpoint = "/peer/status/events"
peerStatusEndpoint = "/peer/status"
peerStatusEventsEndpoint = "/peer/status/events"
peerConnectionEventsEndpoint = "/peer/connection/events"

// Split tunnel endpoint
splitTunnelEndpoint = "/split-tunnel"
Expand Down Expand Up @@ -234,6 +235,7 @@ func newLocalAPI(b *backend.LocalBackend, withAuth bool) *localapi {
mux.HandleFunc("GET "+peerStatusEndpoint, traced(s.peerStatusHandler))
// SSE skips the tracer middleware since it buffers the entire response body.
mux.HandleFunc("GET "+peerStatusEventsEndpoint, s.peerStatusEventsHandler)
mux.HandleFunc("GET "+peerConnectionEventsEndpoint, s.peerConnectionEventsHandler)

// Split tunnel
mux.HandleFunc(splitTunnelEndpoint, traced(s.splitTunnelHandler))
Expand Down Expand Up @@ -516,6 +518,55 @@ func (s *localapi) peerStatusEventsHandler(w http.ResponseWriter, r *http.Reques
}
}

// peerConnectionEventsHandler streams peer.ConnectionEvent over SSE for
// each accept/close on the local samizdat-in. Unlike peerStatusEventsHandler
// (which always sends the live snapshot), each emit's captured value is
// what the consumer needs here — the Source IP and +1/-1 state ARE the
// payload, not a periodic poll. Out-of-order +1/-1 from events.Emit's
// per-callback goroutine is fine: the consumer (lantern-core's globe-arc
// renderer) keys arcs by source, so it handles re-orderings naturally.
//
// The events package lives in this process (lanternd); cross-process
// consumers in Liblantern can only receive these via this SSE stream,
// since events.Subscribe in the Liblantern process sees a different
// (empty) subscriptions map.
func (s *localapi) peerConnectionEventsHandler(w http.ResponseWriter, r *http.Request) {
flusher := sseWriter(w)
if flusher == nil {
return
}
// Buffered channel so a slow SSE consumer doesn't apply backpressure
// to events.Emit (which spawns a goroutine per subscriber but blocks
// nothing). 64 holds ~one second of accept/close pairs under heavy
// load; beyond that we drop to avoid unbounded memory growth.
queue := make(chan peer.ConnectionEvent, 64)
sub := events.Subscribe(func(evt peer.ConnectionEvent) {
select {
case queue <- evt:
default:
// queue full — drop. SSE consumer is too slow; better to
// lose this event than to back up the events.Emit goroutine.
}
})
defer sub.Unsubscribe()

for {
select {
case evt := <-queue:
data, err := json.Marshal(evt)
if err != nil {
continue
}
if _, err := fmt.Fprintf(w, "data: %s\n\n", data); err != nil {
return
}
flusher.Flush()
case <-r.Context().Done():
return
}
}
}

///////////////////////
// Server selection //
///////////////////////
Expand Down
Loading
Loading