From 5223e6dd2616d2613f2bc20fea1e338f35ec9434 Mon Sep 17 00:00:00 2001 From: Adam Fisk Date: Thu, 7 May 2026 12:37:02 -0600 Subject: [PATCH 01/12] peer: emit ConnectionEvent on samizdat accept/close Plumb lantern-box's peerconn listener registry through to the radiance event bus so consumers (Flutter globe view, future abuse aggregation) can subscribe to a per-connection accept/close stream. Listener is registered after libbox.Start so the box's accept loop is already serving when notifications start flowing; cleared on Stop and in the Start rollback path so post-teardown callbacks land on a no-op rather than emitting events to a torn-down consumer. Source field carries the remote "ip:port" string verbatim from M.Socksaddr.String(); consumers extract the IP for geo-lookup or rate-limit attribution. Pinned to local lantern-box via a replace directive while the peerconn package is in flight; remove once lantern-box tags a release. Co-Authored-By: Claude Opus 4.7 (1M context) (cherry picked from commit d4fc0cb240b8ec83878ebc605a25eea903c1fece) --- go.mod | 4 ++++ go.sum | 14 ++++++++++++++ peer/peer.go | 34 ++++++++++++++++++++++++++++++++++ 3 files changed, 52 insertions(+) diff --git a/go.mod b/go.mod index 228186c0..4f8586bd 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,10 @@ module github.com/getlantern/radiance go 1.26.2 +// Local while peerconn listener registry is in flight; remove once +// lantern-box tags a release that includes tracker/peerconn. +replace github.com/getlantern/lantern-box => ../lantern-box + replace github.com/sagernet/sing => github.com/getlantern/sing v0.7.18-lantern replace github.com/sagernet/sing-box => github.com/getlantern/sing-box-minimal v1.12.22-lantern diff --git a/go.sum b/go.sum index 5fd5538f..be09ac43 100644 --- a/go.sum +++ b/go.sum @@ -246,12 +246,26 @@ github.com/getlantern/hidden v0.0.0-20220104173330-f221c5a24770 h1:cSrD9ryDfTV2y github.com/getlantern/hidden v0.0.0-20220104173330-f221c5a24770/go.mod h1:GOQsoDnEHl6ZmNIL+5uVo+JWRFWozMEp18Izcb++H+A= github.com/getlantern/keepcurrent v0.0.0-20260422161259-54a4d9a93694 h1:iLWm6S/47Hfk7FjW6yaD+1h6kO7C/iauV0DkVia/bXU= github.com/getlantern/keepcurrent v0.0.0-20260422161259-54a4d9a93694/go.mod h1:ag5g9aWUw2FJcX5RVRpJ9EBQBy5yJuy2WXDouIn/m4w= +<<<<<<< HEAD github.com/getlantern/kindling v0.0.0-20260529141244-21f8b144afab h1:PitYhTvo3oHRKYl4pVAoOIN8bhM+Bw+JBWncMglvHSg= github.com/getlantern/kindling v0.0.0-20260529141244-21f8b144afab/go.mod h1:TGTxpoNVwc8Be4qkBNtf5oj2psJaEIZEq47GOPS7zkA= github.com/getlantern/lantern-box v0.0.86 h1:myJa+Crg/oMgqSFhX7DOox4XcVIx8VFiPnkel8x8YT4= github.com/getlantern/lantern-box v0.0.86/go.mod h1:BVXPyEicSu7m4nQY1OHPkOZNj87M7sYrzmY9AgyiPkc= +======= +<<<<<<< HEAD +github.com/getlantern/kindling v0.0.0-20260516120759-a9712f95df03 h1:dUTN7mnTTBcSvsURNs1rTlyKrD1uXUEPqxEZDfl+hb4= +github.com/getlantern/kindling v0.0.0-20260516120759-a9712f95df03/go.mod h1:TGTxpoNVwc8Be4qkBNtf5oj2psJaEIZEq47GOPS7zkA= +github.com/getlantern/lantern-box v0.0.84 h1:y+nezmu0LZDlzcS2A4oKDu3f1UTFAgA24vT1htvEiX0= +github.com/getlantern/lantern-box v0.0.84/go.mod h1:6SO1p22tAq9y8JLjNnAbr4/GZ4VjmlcQGYn0qF4aD/k= +>>>>>>> cd64ed8 (peer: emit ConnectionEvent on samizdat accept/close) github.com/getlantern/lantern-water v0.0.0-20260520145825-958775d51395 h1:grfGavAUp2E9w9ZoJuM3FyWyQ0sCJ64V4ZMKtZKRqTc= github.com/getlantern/lantern-water v0.0.0-20260520145825-958775d51395/go.mod h1:3JpJgwi4KEI6rS9loOAvcBp+F2jP65d0tTg2GQcTPBU= +======= +github.com/getlantern/kindling v0.0.0-20260428171407-6143132aaf40 h1:P5pkaBGxWOGBn7bKzjzdln/ro+ShG1RUbOuy+7pSzXE= +github.com/getlantern/kindling v0.0.0-20260428171407-6143132aaf40/go.mod h1:TGTxpoNVwc8Be4qkBNtf5oj2psJaEIZEq47GOPS7zkA= +github.com/getlantern/lantern-water v0.0.0-20260317143726-e0ee64a11d90 h1:P9JX1yAu2uq3b5YiT0sLtHkTrkZuttV8gPZh81nUuag= +github.com/getlantern/lantern-water v0.0.0-20260317143726-e0ee64a11d90/go.mod h1:3JpJgwi4KEI6rS9loOAvcBp+F2jP65d0tTg2GQcTPBU= +>>>>>>> 231462b (peer: emit ConnectionEvent on samizdat accept/close) github.com/getlantern/ops v0.0.0-20231025133620-f368ab734534 h1:3BwvWj0JZzFEvNNiMhCu4bf60nqcIuQpTYb00Ezm1ag= github.com/getlantern/ops v0.0.0-20231025133620-f368ab734534/go.mod h1:ZsLfOY6gKQOTyEcPYNA9ws5/XHZQFroxqCOhHjGcs9Y= github.com/getlantern/osversion v0.0.0-20240418205916-2e84a4a4e175 h1:JWH5BB2o0eAeGs0tZnFPpQGx+nMIo/WmxKnj2hnGjgE= diff --git a/peer/peer.go b/peer/peer.go index 73222e10..c77051d3 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -14,6 +14,7 @@ import ( "github.com/sagernet/sing-box/experimental/libbox" box "github.com/getlantern/lantern-box" + "github.com/getlantern/lantern-box/tracker/peerconn" "github.com/getlantern/radiance/common/env" "github.com/getlantern/radiance/events" "github.com/getlantern/radiance/portforward" @@ -61,6 +62,19 @@ type StatusEvent struct { Status Status `json:"status"` } +// ConnectionEvent fires every time a remote client opens or closes a +// samizdat session against the local peer's inbound. Source carries the +// remote "ip:port" string; consumers (the globe view, abuse aggregation) +// extract the IP for geo-lookup or rate-limit attribution. +// +// State +1 on accept, -1 on close +// Source remote peer "ip:port" +type ConnectionEvent struct { + events.Event + State int `json:"state"` + Source string `json:"source"` +} + // Port range chosen to minimize collision risk on the typical home network, // not to guarantee one. 30000–50000 sits above the well-known/system range // (0–1023) and above the ports most services use by default (web/dev/dbs @@ -213,6 +227,11 @@ func (c *Client) Start(ctx context.Context) error { // registered route + router rule. cleanupCtx, cancel := context.WithTimeout(context.Background(), peerCleanupTimeout) defer cancel() + // Always clear the connection listener on rollback. The listener is + // only Set on the success path, so this is a no-op if Start failed + // before reaching it — but cheap insurance against a future re-order + // that registers earlier. + peerconn.SetListener(nil) if box != nil { _ = box.Close() } @@ -276,6 +295,16 @@ func (c *Client) Start(ctx context.Context) error { return fmt.Errorf("start sing-box: %w", err) } + // Forward inbound accept/close events from lantern-box's samizdat + // inbound to the radiance event bus, so consumers (the Flutter globe, + // future abuse aggregation) get a per-connection stream. Listener is + // process-wide single-active; cleared on Stop. Register BEFORE Verify + // so the verify-dial connection itself emits an event — the listener + // must be in place by the time the server dials back. + peerconn.SetListener(func(state int, source string) { + events.Emit(ConnectionEvent{State: state, Source: source}) + }) + // Now that sing-box is listening with the just-built creds, ask the // server to dial back through them. Splitting verify out of Register // into this explicit follow-up avoids the chicken-and-egg where the @@ -374,6 +403,11 @@ func (c *Client) Stop(ctx context.Context) error { c.status = Status{} c.mu.Unlock() + // Clear the connection listener BEFORE box.Close so any in-flight + // accept-loop callbacks land on a no-op rather than emit ConnectionEvents + // after the consumer side has already torn down its subscription. + peerconn.SetListener(nil) + cancel() <-done From ee8c93faf7152056878a8765562a1a836dc71911 Mon Sep 17 00:00:00 2001 From: Adam Fisk Date: Thu, 7 May 2026 12:41:52 -0600 Subject: [PATCH 02/12] peer: serve live connection snapshot on 127.0.0.1:17099/peer/connections MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a localhost HTTP endpoint exposing the active samizdat connection set as JSON, fed by the lantern-box peerconn listener registered when peer.Client.Start succeeds. Replaces the planned full Go→FFI→Dart event channel for the prototype with poll-driven Dart consumption — much smaller surface, same data shape, swap with a streaming FFI events path later without changing the Dart side. Loopback-only: net.Listen 127.0.0.1 enforces it at the kernel level, plus a defense-in-depth host check on each request in case someone later misconfigures RADIANCE_PEER_STATS_ADDR to a non-loopback bind. The endpoint reveals connected client IPs which we don't want surfaced beyond the local machine. Co-Authored-By: Claude Opus 4.7 (1M context) (cherry picked from commit 48e0f6f197cf0d0b3017b35161c7f52bc8ebb549) --- peer/connstats.go | 199 ++++++++++++++++++++++++++++++++++++++++++++++ peer/peer.go | 33 ++++++-- 2 files changed, 224 insertions(+), 8 deletions(-) create mode 100644 peer/connstats.go diff --git a/peer/connstats.go b/peer/connstats.go new file mode 100644 index 00000000..7aa3520b --- /dev/null +++ b/peer/connstats.go @@ -0,0 +1,199 @@ +package peer + +import ( + "context" + "encoding/json" + "net" + "net/http" + "os" + "strings" + "sync" + "time" + + "github.com/getlantern/lantern-box/tracker/peerconn" +) + +// connStatsServer is the localhost HTTP endpoint Flutter polls to render +// the live globe. It maintains an in-memory set of active source IPs by +// subscribing to peerconn lifecycle notifications, and serves the current +// snapshot as JSON on GET /peer/connections. +// +// This is a deliberately simple bridge for the prototype: it skips the +// proper Go→FFI→Dart event channel (which Adam's lantern#8492 had a +// pattern for but is on a stale branch with merge conflicts) in favour of +// a poll loop. Replace with a streaming FFI events path once the broader +// peer-share / unbounded plumbing lands; the data shape is intentionally +// the same so Dart consumers don't need to change. +// +// Listen address: +// - RADIANCE_PEER_STATS_ADDR env var if set (e.g. "127.0.0.1:17099") +// - default 127.0.0.1:17099 +// +// 127.0.0.1 only — never bound to public interfaces. The endpoint reveals +// active proxy clients' IP addresses, which we don't want surfaced to +// anyone outside the local user's machine. +const defaultConnStatsAddr = "127.0.0.1:17099" + +type connEntry struct { + Source string `json:"source"` + Since time.Time `json:"since"` + Inbound int `json:"-"` // for refcount on duplicate accepts (re-uses) + id int // monotonic id for stable equality across snapshots +} + +type connSnapshot struct { + Sources []string `json:"sources"` + ActiveCount int `json:"active_count"` + GeneratedAt time.Time `json:"generated_at"` + ListenerHits int64 `json:"listener_hits"` +} + +type connStats struct { + mu sync.Mutex + active map[string]*connEntry + hits int64 + server *http.Server + listener net.Listener +} + +func newConnStats() *connStats { + return &connStats{active: make(map[string]*connEntry)} +} + +// note records a +1 or -1 transition. Source is "ip:port". +func (s *connStats) note(state int, source string) { + s.mu.Lock() + defer s.mu.Unlock() + s.hits++ + if state == +1 { + if e, ok := s.active[source]; ok { + e.Inbound++ + return + } + s.active[source] = &connEntry{ + Source: source, + Since: time.Now(), + Inbound: 1, + } + } else if state == -1 { + if e, ok := s.active[source]; ok { + e.Inbound-- + if e.Inbound <= 0 { + delete(s.active, source) + } + } + } +} + +func (s *connStats) snapshot() connSnapshot { + s.mu.Lock() + defer s.mu.Unlock() + out := connSnapshot{ + Sources: make([]string, 0, len(s.active)), + ActiveCount: len(s.active), + GeneratedAt: time.Now(), + ListenerHits: s.hits, + } + for src := range s.active { + out.Sources = append(out.Sources, src) + } + return out +} + +// start spins up the HTTP server. Returns an error if the listen address +// is already in use; falls back to a kernel-assigned port (":0" suffix) +// only if the configured address conflicts and the env var was unset, so +// users who explicitly pinned a port get a clean failure. +func (s *connStats) start(parent context.Context) error { + addr := os.Getenv("RADIANCE_PEER_STATS_ADDR") + envSet := addr != "" + if !envSet { + addr = defaultConnStatsAddr + } + + ln, err := net.Listen("tcp", addr) + if err != nil { + if envSet { + return err + } + // Default already taken — try a random localhost port so a second + // app instance still surfaces some endpoint rather than failing. + ln, err = net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return err + } + } + s.listener = ln + + mux := http.NewServeMux() + mux.HandleFunc("/peer/connections", func(w http.ResponseWriter, r *http.Request) { + // Strict localhost gate. net.Listen on 127.0.0.1 already prevents + // remote connections, but a misconfigured listener (e.g. someone + // changing addr to ":17099" later) would happily accept LAN + // requests; this is a defense-in-depth check. + host, _, splitErr := net.SplitHostPort(r.RemoteAddr) + if splitErr != nil || !isLoopback(host) { + http.Error(w, "loopback only", http.StatusForbidden) + return + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(s.snapshot()) + }) + + s.server = &http.Server{ + Handler: mux, + ReadHeaderTimeout: 5 * time.Second, + } + go func() { + _ = s.server.Serve(ln) + }() + + // Tear down when the parent context is cancelled. + go func() { + <-parent.Done() + shutdownCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + _ = s.server.Shutdown(shutdownCtx) + }() + return nil +} + +func (s *connStats) addr() string { + if s.listener == nil { + return "" + } + return s.listener.Addr().String() +} + +func isLoopback(host string) bool { + host = strings.TrimSuffix(strings.TrimPrefix(host, "["), "]") + if host == "localhost" { + return true + } + ip := net.ParseIP(host) + return ip != nil && ip.IsLoopback() +} + +// startConnStats wires the lantern-box peerconn listener through to a new +// connStats instance and starts its HTTP server. Returns the stats object +// (so peer.Client can read snapshots for its own internal stats) and an +// error if the HTTP listener can't be bound. +// +// On success the connection-event listener registered via peerconn is the +// stats notifier; callers SHOULD NOT register a competing listener while +// stats is running. Stop is by cancelling the supplied ctx. +func startConnStats(ctx context.Context) (*connStats, error) { + s := newConnStats() + if err := s.start(ctx); err != nil { + return nil, err + } + peerconn.SetListener(func(state int, source string) { + s.note(state, source) + }) + go func() { + <-ctx.Done() + peerconn.SetListener(nil) + }() + return s, nil +} + diff --git a/peer/peer.go b/peer/peer.go index c77051d3..b28500b5 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -296,14 +296,31 @@ func (c *Client) Start(ctx context.Context) error { } // Forward inbound accept/close events from lantern-box's samizdat - // inbound to the radiance event bus, so consumers (the Flutter globe, - // future abuse aggregation) get a per-connection stream. Listener is - // process-wide single-active; cleared on Stop. Register BEFORE Verify - // so the verify-dial connection itself emits an event — the listener - // must be in place by the time the server dials back. - peerconn.SetListener(func(state int, source string) { - events.Emit(ConnectionEvent{State: state, Source: source}) - }) + // inbound to the radiance event bus AND a localhost HTTP stats + // endpoint that Flutter polls to render the live globe. Listener is + // process-wide single-active; cleared automatically when runCtx + // cancels (in Stop / rollback). Must run AFTER box.Start so the + // box's accept loop is serving when notifications start flowing. + // Register BEFORE Verify so the verify-dial connection itself emits + // an event — the listener must be in place by the time the server + // dials back. + stats, statsErr := startConnStats(runCtx) + if statsErr != nil { + // Don't fail Start over a stats-endpoint error — a bound port + // shouldn't kill the user's peer-share session. Log and continue. + slog.Warn("peer connection stats endpoint failed to start", "err", statsErr) + } else { + // startConnStats sets a peerconn listener that feeds the snapshot + // HTTP server. Layer ConnectionEvent emission alongside, since + // Go-side consumers (e.g. metrics) may want the stream too. + peerconn.SetListener(func(state int, source string) { + stats.note(state, source) + events.Emit(ConnectionEvent{State: state, Source: source}) + }) + slog.Info("peer connection stats endpoint listening", + "addr", stats.addr(), + ) + } // Now that sing-box is listening with the just-built creds, ask the // server to dial back through them. Splitting verify out of Register From 52c8f30e2a9182012a5be9ad4e8883e1699b6b77 Mon Sep 17 00:00:00 2001 From: Adam Fisk Date: Thu, 7 May 2026 13:14:06 -0600 Subject: [PATCH 03/12] peer: drop localhost HTTP stats endpoint, keep ConnectionEvent emit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The HTTP endpoint at 127.0.0.1:17099/peer/connections was added to bridge peer connection lifecycle to Flutter without writing FFI plumbing, but two problems with that approach: 1. Detectability — a fixed loopback port is a Lantern-specific fingerprint any local process (incl. malware) can probe. Sandboxed adversary on the user's machine could detect Lantern is running. 2. Local server adds attack surface for free. Reverting to ConnectionEvent emission only; Flutter consumption rides on the existing FlutterEventEmitter / Dart api_dl bridge in lantern-core (separate commit) which has no port footprint. Co-Authored-By: Claude Opus 4.7 (1M context) (cherry picked from commit a1c10cfe36fbd80e8b295e19a74c285388590f9c) --- peer/connstats.go | 199 ---------------------------------------------- peer/peer.go | 47 +++-------- 2 files changed, 10 insertions(+), 236 deletions(-) delete mode 100644 peer/connstats.go diff --git a/peer/connstats.go b/peer/connstats.go deleted file mode 100644 index 7aa3520b..00000000 --- a/peer/connstats.go +++ /dev/null @@ -1,199 +0,0 @@ -package peer - -import ( - "context" - "encoding/json" - "net" - "net/http" - "os" - "strings" - "sync" - "time" - - "github.com/getlantern/lantern-box/tracker/peerconn" -) - -// connStatsServer is the localhost HTTP endpoint Flutter polls to render -// the live globe. It maintains an in-memory set of active source IPs by -// subscribing to peerconn lifecycle notifications, and serves the current -// snapshot as JSON on GET /peer/connections. -// -// This is a deliberately simple bridge for the prototype: it skips the -// proper Go→FFI→Dart event channel (which Adam's lantern#8492 had a -// pattern for but is on a stale branch with merge conflicts) in favour of -// a poll loop. Replace with a streaming FFI events path once the broader -// peer-share / unbounded plumbing lands; the data shape is intentionally -// the same so Dart consumers don't need to change. -// -// Listen address: -// - RADIANCE_PEER_STATS_ADDR env var if set (e.g. "127.0.0.1:17099") -// - default 127.0.0.1:17099 -// -// 127.0.0.1 only — never bound to public interfaces. The endpoint reveals -// active proxy clients' IP addresses, which we don't want surfaced to -// anyone outside the local user's machine. -const defaultConnStatsAddr = "127.0.0.1:17099" - -type connEntry struct { - Source string `json:"source"` - Since time.Time `json:"since"` - Inbound int `json:"-"` // for refcount on duplicate accepts (re-uses) - id int // monotonic id for stable equality across snapshots -} - -type connSnapshot struct { - Sources []string `json:"sources"` - ActiveCount int `json:"active_count"` - GeneratedAt time.Time `json:"generated_at"` - ListenerHits int64 `json:"listener_hits"` -} - -type connStats struct { - mu sync.Mutex - active map[string]*connEntry - hits int64 - server *http.Server - listener net.Listener -} - -func newConnStats() *connStats { - return &connStats{active: make(map[string]*connEntry)} -} - -// note records a +1 or -1 transition. Source is "ip:port". -func (s *connStats) note(state int, source string) { - s.mu.Lock() - defer s.mu.Unlock() - s.hits++ - if state == +1 { - if e, ok := s.active[source]; ok { - e.Inbound++ - return - } - s.active[source] = &connEntry{ - Source: source, - Since: time.Now(), - Inbound: 1, - } - } else if state == -1 { - if e, ok := s.active[source]; ok { - e.Inbound-- - if e.Inbound <= 0 { - delete(s.active, source) - } - } - } -} - -func (s *connStats) snapshot() connSnapshot { - s.mu.Lock() - defer s.mu.Unlock() - out := connSnapshot{ - Sources: make([]string, 0, len(s.active)), - ActiveCount: len(s.active), - GeneratedAt: time.Now(), - ListenerHits: s.hits, - } - for src := range s.active { - out.Sources = append(out.Sources, src) - } - return out -} - -// start spins up the HTTP server. Returns an error if the listen address -// is already in use; falls back to a kernel-assigned port (":0" suffix) -// only if the configured address conflicts and the env var was unset, so -// users who explicitly pinned a port get a clean failure. -func (s *connStats) start(parent context.Context) error { - addr := os.Getenv("RADIANCE_PEER_STATS_ADDR") - envSet := addr != "" - if !envSet { - addr = defaultConnStatsAddr - } - - ln, err := net.Listen("tcp", addr) - if err != nil { - if envSet { - return err - } - // Default already taken — try a random localhost port so a second - // app instance still surfaces some endpoint rather than failing. - ln, err = net.Listen("tcp", "127.0.0.1:0") - if err != nil { - return err - } - } - s.listener = ln - - mux := http.NewServeMux() - mux.HandleFunc("/peer/connections", func(w http.ResponseWriter, r *http.Request) { - // Strict localhost gate. net.Listen on 127.0.0.1 already prevents - // remote connections, but a misconfigured listener (e.g. someone - // changing addr to ":17099" later) would happily accept LAN - // requests; this is a defense-in-depth check. - host, _, splitErr := net.SplitHostPort(r.RemoteAddr) - if splitErr != nil || !isLoopback(host) { - http.Error(w, "loopback only", http.StatusForbidden) - return - } - w.Header().Set("Content-Type", "application/json") - _ = json.NewEncoder(w).Encode(s.snapshot()) - }) - - s.server = &http.Server{ - Handler: mux, - ReadHeaderTimeout: 5 * time.Second, - } - go func() { - _ = s.server.Serve(ln) - }() - - // Tear down when the parent context is cancelled. - go func() { - <-parent.Done() - shutdownCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - _ = s.server.Shutdown(shutdownCtx) - }() - return nil -} - -func (s *connStats) addr() string { - if s.listener == nil { - return "" - } - return s.listener.Addr().String() -} - -func isLoopback(host string) bool { - host = strings.TrimSuffix(strings.TrimPrefix(host, "["), "]") - if host == "localhost" { - return true - } - ip := net.ParseIP(host) - return ip != nil && ip.IsLoopback() -} - -// startConnStats wires the lantern-box peerconn listener through to a new -// connStats instance and starts its HTTP server. Returns the stats object -// (so peer.Client can read snapshots for its own internal stats) and an -// error if the HTTP listener can't be bound. -// -// On success the connection-event listener registered via peerconn is the -// stats notifier; callers SHOULD NOT register a competing listener while -// stats is running. Stop is by cancelling the supplied ctx. -func startConnStats(ctx context.Context) (*connStats, error) { - s := newConnStats() - if err := s.start(ctx); err != nil { - return nil, err - } - peerconn.SetListener(func(state int, source string) { - s.note(state, source) - }) - go func() { - <-ctx.Done() - peerconn.SetListener(nil) - }() - return s, nil -} - diff --git a/peer/peer.go b/peer/peer.go index b28500b5..f33b5ea6 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -13,7 +13,6 @@ import ( "github.com/sagernet/sing-box/experimental/libbox" - box "github.com/getlantern/lantern-box" "github.com/getlantern/lantern-box/tracker/peerconn" "github.com/getlantern/radiance/common/env" "github.com/getlantern/radiance/events" @@ -296,42 +295,16 @@ func (c *Client) Start(ctx context.Context) error { } // Forward inbound accept/close events from lantern-box's samizdat - // inbound to the radiance event bus AND a localhost HTTP stats - // endpoint that Flutter polls to render the live globe. Listener is - // process-wide single-active; cleared automatically when runCtx - // cancels (in Stop / rollback). Must run AFTER box.Start so the - // box's accept loop is serving when notifications start flowing. - // Register BEFORE Verify so the verify-dial connection itself emits - // an event — the listener must be in place by the time the server - // dials back. - stats, statsErr := startConnStats(runCtx) - if statsErr != nil { - // Don't fail Start over a stats-endpoint error — a bound port - // shouldn't kill the user's peer-share session. Log and continue. - slog.Warn("peer connection stats endpoint failed to start", "err", statsErr) - } else { - // startConnStats sets a peerconn listener that feeds the snapshot - // HTTP server. Layer ConnectionEvent emission alongside, since - // Go-side consumers (e.g. metrics) may want the stream too. - peerconn.SetListener(func(state int, source string) { - stats.note(state, source) - events.Emit(ConnectionEvent{State: state, Source: source}) - }) - slog.Info("peer connection stats endpoint listening", - "addr", stats.addr(), - ) - } - - // Now that sing-box is listening with the just-built creds, ask the - // server to dial back through them. Splitting verify out of Register - // into this explicit follow-up avoids the chicken-and-egg where the - // server tried to verify before the peer could possibly be listening - // (the cert/key only arrive in the Register response). Failure here - // is fatal — the server has already deprecated the row, so the - // deferred cleanup tears the rest of the session down. - if err := c.cfg.API.Verify(ctx, regResp.RouteID); err != nil { - return fmt.Errorf("verify with lantern-cloud: %w", err) - } + // inbound to the radiance event bus. Consumers (lantern-core's + // FlutterEventEmitter, future abuse aggregation) subscribe via + // events.Subscribe[ConnectionEvent]. Listener is process-wide + // single-active; cleared on Stop and in the rollback defer so + // post-teardown accept-loop callbacks land on a no-op rather than + // emit events to a torn-down consumer. Must run AFTER box.Start so + // the accept loop is serving when notifications start flowing. + peerconn.SetListener(func(state int, source string) { + events.Emit(ConnectionEvent{State: state, Source: source}) + }) // HeartbeatIntervalSeconds is server-driven so lantern-cloud can dial up // the cadence on registrations it wants to expire faster. Honor any From 3aa47f68d1f86d7719b19366fa9ae3ac35ec5fc5 Mon Sep 17 00:00:00 2001 From: Adam Fisk Date: Wed, 6 May 2026 14:42:04 -0600 Subject: [PATCH 04/12] peer: register lantern-box protocols in box ctx + regression test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit defaultBuildBoxService used to call libbox.NewServiceWithContext with the caller's bare ctx, which has no lantern-box protocol registries plumbed in. The samizdat inbound type ServerConfig sends back from /peer/register isn't a built-in sing-box protocol, so libbox's JSON decoder couldn't resolve inbounds[0].type="samizdat" and returned "missing inbound fields registry in context". The integration tests stub BuildBoxService entirely, so this layer was never exercised in CI — only surfaced live during the eero end-to-end test. Two pieces: 1. Use box.BaseContext() (from getlantern/lantern-box) when calling libbox.NewServiceWithContext. That ctx has the InboundOptionsRegistry populated with samizdat / reflex / etc. so the decode succeeds. Coexists with the user's VPN tunnel (vpn/tunnel.go) — libbox.Setup is process-global, the ctx registries are per-box. 2. TestDefaultBuildBoxService_DecodesSamizdatInbound walks the actual decode path with a minimal samizdat-inbound JSON. Verified to fail with the exact production error message under the pre-fix code, pass under the fix. Cuts the diagnostic loop from a 5-minute rebuild+redeploy+toggle cycle to a 0.5s test failure. Co-Authored-By: Claude Opus 4.7 (1M context) (cherry picked from commit b25b01b1cee85e21b8c6df583bb69c74880ecc8e) --- peer/peer.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/peer/peer.go b/peer/peer.go index f33b5ea6..de5f780d 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -13,6 +13,7 @@ import ( "github.com/sagernet/sing-box/experimental/libbox" + box "github.com/getlantern/lantern-box" "github.com/getlantern/lantern-box/tracker/peerconn" "github.com/getlantern/radiance/common/env" "github.com/getlantern/radiance/events" @@ -529,7 +530,10 @@ func pickInternalPort() uint16 { // (samizdat, reflex, etc.) into the ctx so libbox can decode the // inbounds[0].type="samizdat" stanza coming back from /peer/register. // Without it the user's ctx is missing InboundOptionsRegistry and -// libbox returns "missing inbound fields registry in context". +// libbox returns "missing inbound fields registry in context" — the +// failure mode is silent in CI because the integration tests stub +// BuildBoxService entirely; only TestDefaultBuildBoxService_DecodesSamizdatInbound +// exercises the real decode path. // // We wrap so libbox sees the caller's Deadline/Done (so a Stop-induced // ctx cancel propagates to box internals) AND can still resolve the From d6d950e44a70b7b8d870e7570b596536f817b309 Mon Sep 17 00:00:00 2001 From: Adam Fisk Date: Mon, 11 May 2026 12:44:44 -0600 Subject: [PATCH 05/12] peer: silence connection-event cascade during box.Close MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the user toggles SmC off while real client traffic is flowing, box.Close fires per-connection disconnect callbacks for every in-flight inbound. peerconn.Notify reads its registered listener under an RLock and releases the lock before invoking — SetListener(nil) alone races against goroutines that have already snapshotted the listener (one per live connection). Each surviving callback hits events.Emit, which spawns yet another goroutine per subscriber. The Flutter-side subscriber posts main-thread tasks per event, and a hundred-task flood against an engine that's simultaneously handling the SmC-off state change reproduced as a Flutter mutex abort on the main thread. Add a sync/atomic flag the listener wrapper checks inline. Flip it before box.Close in both Stop and the Start-rollback defer; re-arm it at the top of Start so a Stop→Start cycle doesn't leave the wrapper muted. SetListener(nil) still runs for cleanliness, but the flag is what actually halts the cascade. Co-Authored-By: Claude Opus 4.7 (1M context) (cherry picked from commit f6774c61f1793296dfe5a4673157c3480c287bee) --- peer/peer.go | 38 ++++++++++++++++++++++++++++++++++---- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/peer/peer.go b/peer/peer.go index de5f780d..6307e973 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -9,6 +9,7 @@ import ( "math/rand/v2" "strconv" "sync" + "sync/atomic" "time" "github.com/sagernet/sing-box/experimental/libbox" @@ -149,6 +150,19 @@ type Client struct { forwarder portForwarder box boxService routeID string + + // listenerDraining short-circuits the peerconn listener wrapper while + // box.Close is firing per-connection disconnect callbacks. peerconn.Notify + // reads its registered listener under an RLock and then releases the lock + // before invoking it, so SetListener(nil) alone races against in-flight + // Notify calls — under load (real client traffic), Close fires N disconnect + // callbacks from N goroutines that have already snapshotted the listener, + // each then events.Emit spawns one more goroutine per subscriber. The + // Flutter-side subscriber posts main-thread tasks per event, and a + // hundred-task flood against a Flutter engine that's simultaneously + // processing the SmC-off state change is the Flutter mutex crash we hit. + // Setting this flag before box.Close drops the cascade inline. + listenerDraining atomic.Bool } // peerCleanupTimeout caps how long Start's rollback path waits for @@ -204,6 +218,11 @@ func (c *Client) Start(ctx context.Context) error { c.startingDone = make(chan struct{}) c.mu.Unlock() + // Re-arm the listener wrapper. Stop / rollback flips this to true to + // silence the disconnect cascade during box.Close; if we don't reset + // here, a Stop→Start cycle would leave the wrapper permanently muted. + c.listenerDraining.Store(false) + var ( success bool fwd portForwarder @@ -230,7 +249,9 @@ func (c *Client) Start(ctx context.Context) error { // Always clear the connection listener on rollback. The listener is // only Set on the success path, so this is a no-op if Start failed // before reaching it — but cheap insurance against a future re-order - // that registers earlier. + // that registers earlier. Drain-flag first so any in-flight Notify + // callbacks short-circuit even if SetListener races (see Stop). + c.listenerDraining.Store(true) peerconn.SetListener(nil) if box != nil { _ = box.Close() @@ -304,6 +325,9 @@ func (c *Client) Start(ctx context.Context) error { // emit events to a torn-down consumer. Must run AFTER box.Start so // the accept loop is serving when notifications start flowing. peerconn.SetListener(func(state int, source string) { + if c.listenerDraining.Load() { + return + } events.Emit(ConnectionEvent{State: state, Source: source}) }) @@ -394,9 +418,15 @@ func (c *Client) Stop(ctx context.Context) error { c.status = Status{} c.mu.Unlock() - // Clear the connection listener BEFORE box.Close so any in-flight - // accept-loop callbacks land on a no-op rather than emit ConnectionEvents - // after the consumer side has already torn down its subscription. + // Suppress the connection listener BEFORE box.Close. peerconn.Notify + // reads its registered listener under an RLock and releases it before + // invoking — SetListener(nil) alone races against in-flight Notify + // goroutines that have already snapshotted the listener (one per live + // inbound connection at Close time). Flipping listenerDraining first + // short-circuits the wrapper inline so even the racing invocations + // become no-ops. SetListener(nil) is still called for cleanliness and + // to release the listener closure's reference to this Client. + c.listenerDraining.Store(true) peerconn.SetListener(nil) cancel() From 1fd22b4c6810c0745fddb768ff1fbe866b370685 Mon Sep 17 00:00:00 2001 From: Adam Fisk Date: Mon, 11 May 2026 13:32:06 -0600 Subject: [PATCH 06/12] peer: emit phase-granular StatusEvents through Start/Stop lifecycle MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The UI today sees a single active/inactive flip — toggling SmC on looks "hung" through the multi-second sequence of port-forwarding, registering, starting the local box, and verifying. This adds a Phase field to Status and emits one StatusEvent per stage: Start: mapping_port → detecting_ip → registering → starting_proxy → verifying → serving Stop: stopping → idle on err: error (Status.Error populated with the wrapped fmt.Errorf message, e.g. "map port 33445: upnp gateway refused mapping") Phase is a stable string so Flutter / web consumers can switch on it without depending on Go enum ordering. Active stays as a derived bool (true only on PhaseServing) for subscribers that just want the binary. Co-Authored-By: Claude Opus 4.7 (1M context) (cherry picked from commit 39b6b454965b256c24dc4204b9240f895d124187) --- peer/peer.go | 84 +++++++++++++++++++++++++++++++++++++++++-- peer/peer_test.go | 90 +++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 157 insertions(+), 17 deletions(-) diff --git a/peer/peer.go b/peer/peer.go index 6307e973..869a12f0 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -103,7 +103,34 @@ type boxService interface { type boxFactory func(ctx context.Context, options string) (boxService, error) +// Phase is the peer.Client lifecycle stage surfaced to the UI. Granular +// enough that "Share My Connection" can render a real progress sequence +// (mapping port → registering → verifying → serving) instead of a single +// active/inactive boolean. Values are stable strings so Flutter / web +// consumers can switch on them without depending on Go enum ordering. +type Phase string + +const ( + PhaseIdle Phase = "idle" + PhaseMappingPort Phase = "mapping_port" + PhaseDetectingIP Phase = "detecting_ip" + PhaseRegistering Phase = "registering" + PhaseStartingBox Phase = "starting_proxy" + PhaseVerifying Phase = "verifying" + PhaseServing Phase = "serving" + PhaseStopping Phase = "stopping" + PhaseError Phase = "error" +) + type Status struct { + Phase Phase `json:"phase"` + // Error is the human-readable failure reason when Phase == PhaseError. + // Empty for every other phase; consumers should render this only when + // the UI is in the error state. + Error string `json:"error,omitempty"` + // Active is true only when Phase == PhaseServing. Kept distinct from + // Phase so subscribers that just want a boolean "is sharing?" don't + // have to switch on the phase enum. Active bool `json:"active"` SharingSince time.Time `json:"sharing_since,omitempty"` ExternalIP string `json:"external_ip,omitempty"` @@ -208,7 +235,7 @@ func NewClient(cfg Config) (*Client, error) { // Start opens the peer-proxy session. On success a background heartbeat // goroutine is running; on error any partial setup is torn down before // returning. -func (c *Client) Start(ctx context.Context) error { +func (c *Client) Start(ctx context.Context) (retErr error) { c.mu.Lock() if c.active || c.starting { c.mu.Unlock() @@ -265,8 +292,20 @@ func (c *Client) Start(ctx context.Context) error { if fwd != nil { _ = fwd.UnmapPort(cleanupCtx) } + // Surface the failure to the UI. Emitted AFTER cleanup so the UI + // sees the error phase as the terminal state of this Start attempt, + // not as a transient between phases. retErr carries whichever + // fmt.Errorf the failing branch returned, which is the most + // human-readable diagnostic we have ("map port %d: ...", + // "register with lantern-cloud: ...", etc.). + var errMsg string + if retErr != nil { + errMsg = retErr.Error() + } + c.emitPhase(PhaseError, errMsg) }() + c.emitPhase(PhaseMappingPort, "") fwd, err := c.cfg.NewForwarder(ctx) if err != nil { return fmt.Errorf("discover gateway: %w", err) @@ -277,10 +316,13 @@ func (c *Client) Start(ctx context.Context) error { return fmt.Errorf("map port %d: %w", internalPort, err) } + c.emitPhase(PhaseDetectingIP, "") externalIP, err := fwd.ExternalIP(ctx) if err != nil { return fmt.Errorf("get external ip: %w", err) } + + c.emitPhase(PhaseRegistering, "") regResp, err = c.cfg.API.Register(ctx, RegisterRequest{ ExternalIP: externalIP, ExternalPort: mapping.ExternalPort, @@ -297,6 +339,7 @@ func (c *Client) Start(ctx context.Context) error { // auto_detect_interface tells sing-box to bind outbound dials to the // underlying physical interface rather than whatever the OS routing // table picks (which would be the VPN TUN if the VPN is up). + c.emitPhase(PhaseStartingBox, "") options, err := ensurePeerOutboundsBypassVPN(regResp.ServerConfig) if err != nil { return fmt.Errorf("patch sing-box options: %w", err) @@ -316,6 +359,18 @@ func (c *Client) Start(ctx context.Context) error { return fmt.Errorf("start sing-box: %w", err) } + c.emitPhase(PhaseVerifying, "") + // Now that sing-box is listening with the just-built creds, ask the + // server to dial back through them. Splitting verify out of Register + // into this explicit follow-up avoids the chicken-and-egg where the + // server tried to verify before the peer could possibly be listening + // (the cert/key only arrive in the Register response). Failure here + // is fatal — the server has already deprecated the row, so the + // deferred cleanup tears the rest of the session down. + if err := c.cfg.API.Verify(ctx, regResp.RouteID); err != nil { + return fmt.Errorf("verify with lantern-cloud: %w", err) + } + // Forward inbound accept/close events from lantern-box's samizdat // inbound to the radiance event bus. Consumers (lantern-core's // FlutterEventEmitter, future abuse aggregation) subscribe via @@ -354,6 +409,7 @@ func (c *Client) Start(ctx context.Context) error { c.cancelRun = cancelRun c.runDone = runDone c.status = Status{ + Phase: PhaseServing, Active: true, SharingSince: time.Now(), ExternalIP: externalIP, @@ -415,8 +471,10 @@ func (c *Client) Stop(ctx context.Context) error { c.forwarder = nil c.box = nil c.routeID = "" - c.status = Status{} + c.status = Status{Phase: PhaseStopping} + stoppingSnapshot := c.status c.mu.Unlock() + events.Emit(StatusEvent{Status: stoppingSnapshot}) // Suppress the connection listener BEFORE box.Close. peerconn.Notify // reads its registered listener under an RLock and releases it before @@ -450,7 +508,11 @@ func (c *Client) Stop(ctx context.Context) error { slog.Warn("peer client unmap port failed", "err", err) } slog.Info("peer client stopped", "route_id", routeID) - events.Emit(StatusEvent{Status: Status{}}) + c.mu.Lock() + c.status = Status{Phase: PhaseIdle} + idleSnapshot := c.status + c.mu.Unlock() + events.Emit(StatusEvent{Status: idleSnapshot}) return firstErr } @@ -466,6 +528,22 @@ func (c *Client) CurrentStatus() Status { return c.status } +// emitPhase updates c.status.Phase under the lock and emits a snapshot. +// Used at each lifecycle boundary in Start / Stop so the UI sees progress +// instead of a binary active/inactive flip. Active is recomputed here: +// only PhaseServing implies active=true; every other phase clears it so +// subscribers using just the Active flag don't see e.g. "active=true with +// Phase=verifying" mid-Start. +func (c *Client) emitPhase(p Phase, errMsg string) { + c.mu.Lock() + c.status.Phase = p + c.status.Error = errMsg + c.status.Active = (p == PhaseServing) + snapshot := c.status + c.mu.Unlock() + events.Emit(StatusEvent{Status: snapshot}) +} + // heartbeatLoop closes done on exit so Stop can wait for the loop before // tearing down resources. The channel is passed in rather than read off the // Client because Stop nils c.runDone before waiting on its local copy. diff --git a/peer/peer_test.go b/peer/peer_test.go index e465d2ea..29e9c0fe 100644 --- a/peer/peer_test.go +++ b/peer/peer_test.go @@ -685,35 +685,97 @@ func TestAPIError_StringFormat(t *testing.T) { assert.Contains(t, e.Error(), "could not connect") } -// Subscribers (the IPC SSE handler in production) need both edges so the UI -// can render fresh state without polling. +// TestClient_StatusEventEmittedOnStartAndStop pins the full lifecycle +// phase sequence: Start fires one StatusEvent per stage so the UI can +// render granular progress (mapping port → registering → verifying → +// serving) instead of a single active/inactive flip. Stop fires +// stopping → idle on the way back down. +// +// Subscribers (the IPC SSE handler in production) need every edge so the +// UI can render fresh state without polling. func TestClient_StatusEventEmittedOnStartAndStop(t *testing.T) { fwd := &fakeForwarder{} box := &fakeBoxService{} srv := newStubServer(t) c := newTestClient(t, fwd, box, srv) - got := make(chan StatusEvent, 4) + // Buffer must exceed total emit count (6 on Start: mapping → detecting + // → registering → starting_proxy → verifying → serving; 2 on Stop: + // stopping → idle) or the subscriber's send blocks and emits drop. + got := make(chan StatusEvent, 16) sub := events.Subscribe(func(evt StatusEvent) { got <- evt }) defer sub.Unsubscribe() require.NoError(t, c.Start(context.Background())) - select { - case evt := <-got: - assert.True(t, evt.Status.Active) - assert.NotEmpty(t, evt.Status.RouteID) - case <-time.After(time.Second): - t.Fatal("no Start status event within 1s") + + wantStartPhases := []Phase{ + PhaseMappingPort, + PhaseDetectingIP, + PhaseRegistering, + PhaseStartingBox, + PhaseVerifying, + PhaseServing, + } + for _, want := range wantStartPhases { + select { + case evt := <-got: + assert.Equal(t, want, evt.Status.Phase, "wrong phase in Start sequence") + if want == PhaseServing { + assert.True(t, evt.Status.Active, "active must be true on serving") + assert.NotEmpty(t, evt.Status.RouteID, "route_id must be set on serving") + } else { + assert.False(t, evt.Status.Active, "active must be false on intermediate phase %q", want) + } + case <-time.After(time.Second): + t.Fatalf("no Start status event for phase %q within 1s", want) + } } require.NoError(t, c.Stop(context.Background())) - select { - case evt := <-got: - assert.False(t, evt.Status.Active) - case <-time.After(time.Second): - t.Fatal("no Stop status event within 1s") + for _, want := range []Phase{PhaseStopping, PhaseIdle} { + select { + case evt := <-got: + assert.Equal(t, want, evt.Status.Phase, "wrong phase in Stop sequence") + assert.False(t, evt.Status.Active, "active must be false during stop") + case <-time.After(time.Second): + t.Fatalf("no Stop status event for phase %q within 1s", want) + } + } +} + +// TestClient_StatusEventOnStartError surfaces a Start failure to the UI +// via PhaseError with the wrapped error message. Without this, a user +// who clicks SmC-on and hits e.g. a UPnP failure sees the toggle silently +// flip back without any diagnostic. +func TestClient_StatusEventOnStartError(t *testing.T) { + fwd := &fakeForwarder{mapErr: errors.New("upnp gateway refused mapping")} + box := &fakeBoxService{} + srv := newStubServer(t) + c := newTestClient(t, fwd, box, srv) + + got := make(chan StatusEvent, 16) + sub := events.Subscribe(func(evt StatusEvent) { got <- evt }) + defer sub.Unsubscribe() + + err := c.Start(context.Background()) + require.Error(t, err) + + var sawError bool + deadline := time.After(time.Second) + for !sawError { + select { + case evt := <-got: + if evt.Status.Phase == PhaseError { + sawError = true + assert.False(t, evt.Status.Active) + assert.Contains(t, evt.Status.Error, "upnp gateway refused mapping", + "error message must surface so the UI can render a real diagnostic") + } + case <-deadline: + t.Fatal("no PhaseError status event within 1s") + } } } From 88c3345fff5efb12c58274d5e93e0e6f027b4594 Mon Sep 17 00:00:00 2001 From: Adam Fisk Date: Mon, 11 May 2026 14:30:02 -0600 Subject: [PATCH 07/12] peer: instrument peerconn listener registration + per-event forwarding MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The "no globe arcs despite 200+ samizdat connections" pattern is unobservable from current logs: peerconn.SetListener and events.Emit don't log, so when the chain breaks between samizdat-in's Notify and the Flutter bridge, there's no trace. This adds three breadcrumbs to make the failure mode diagnosable on the next rebuild: - "peer listener: registered with peerconn" — one line per Start that confirms the listener actually got installed - "peer listener: forwarding connection event" — one line per accept AND per close; pairs with the lantern-core subscriber breadcrumb so we can see if events bus delivers what the listener emits - "peer listener: dropping post-Stop Notify" — DEBUG-level for the race window the listenerDraining flag silences; makes that bucket countable instead of silently discarding events Co-Authored-By: Claude Opus 4.7 (1M context) (cherry picked from commit bf26ce2e04119ef9c86f2a90eb7c40527cf544f4) --- peer/peer.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/peer/peer.go b/peer/peer.go index 869a12f0..fb8cf8c2 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -381,10 +381,23 @@ func (c *Client) Start(ctx context.Context) (retErr error) { // the accept loop is serving when notifications start flowing. peerconn.SetListener(func(state int, source string) { if c.listenerDraining.Load() { + // Diagnostic: if Notify reaches this point but we drop because + // the drain flag is set, that's the post-Stop racing-Notify case + // the flag was added to silence. Logging makes its frequency + // observable instead of "events silently vanish." + slog.Debug("peer listener: dropping post-Stop Notify", + "state", state, "source", source) return } + // One-line breadcrumb per accept/close so we can correlate samizdat-in + // activity with peer-connection FlutterEvents on the consumer side + // — without this, "no globe arcs despite samizdat traffic" is + // indistinguishable from "events fire but the bridge swallows them." + slog.Info("peer listener: forwarding connection event", + "state", state, "source", source) events.Emit(ConnectionEvent{State: state, Source: source}) }) + slog.Info("peer listener: registered with peerconn", "route_id", regResp.RouteID) // HeartbeatIntervalSeconds is server-driven so lantern-cloud can dial up // the cadence on registrations it wants to expire faster. Honor any From 23ccb09b71315566006d1ada9411c65069059a49 Mon Sep 17 00:00:00 2001 From: Adam Fisk Date: Mon, 11 May 2026 15:21:27 -0600 Subject: [PATCH 08/12] events: log Emit subscriber count to debug "events vanish" path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The radiance peer listener fires (42 ConnectionEvents observed) but lantern-core's subscriber breadcrumb never fires, suggesting either Subscribe never ran or Emit is looking at a different subscriptions map. Logs the type key + subscriber count at every Emit so we can distinguish "no subscribers registered" (init bug) from "subscribers registered but callback panics" (rare, but possible). Uses stdlib log to avoid pulling slog into the events package (and a possible import cycle with slog-forwarding handlers that subscribe to events). Temporary diagnostic — should be downgraded to Debug or removed once the chain works end-to-end. Co-Authored-By: Claude Opus 4.7 (1M context) (cherry picked from commit 810ef9b82e62ae36bc3cef2b8dbb71654fd0d53c) --- events/events.go | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/events/events.go b/events/events.go index fba0d7a6..a5e37cd4 100644 --- a/events/events.go +++ b/events/events.go @@ -28,6 +28,7 @@ package events import ( "context" + stdlog "log" "reflect" "sync" "sync/atomic" @@ -120,9 +121,27 @@ func (e *Subscription[T]) Unsubscribe() { func Emit[T Event](evt T) { subscriptionsMu.RLock() defer subscriptionsMu.RUnlock() - if subs, ok := subscriptions[reflect.TypeFor[T]()]; ok { - for _, cb := range subs { - go cb(evt) - } + key := reflect.TypeFor[T]() + subs, ok := subscriptions[key] + // Diagnostic: surfaces the subscriber count at emit time so a missing + // FlutterEvent on the consumer side is distinguishable from "no + // subscribers registered for this type" vs "subscribers registered + // but callback panics silently." Spam-friendly when traffic spikes, + // but we're investigating a zero-callback path so the noise is + // short-lived; remove (or downgrade to Debug) once the chain works. + emitDebugLogger(key, len(subs)) + if !ok { + return + } + for _, cb := range subs { + go cb(evt) } } + +// emitDebugLogger is a package-level var so tests can suppress the +// per-emit log, and so prod can swap in slog. Default uses Go's stdlib +// log so events package doesn't need to import slog (and avoid a cycle +// with anything that imports events for its own log forwarding). +var emitDebugLogger = func(key reflect.Type, subCount int) { + stdlog.Printf("events.Emit type=%s subscribers=%d", key, subCount) +} From e93d8a43815e5e5bd6568a820f4b2b7677888482 Mon Sep 17 00:00:00 2001 From: Adam Fisk Date: Mon, 11 May 2026 15:47:52 -0600 Subject: [PATCH 09/12] ipc: stream peer-status + peer-connection events over IPC SSE MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The events package's globals are process-scoped — events.Emit in lanternd (where radiance/peer runs) doesn't reach events.Subscribe in Liblantern. Diagnostic at events.go showed subscribers=0 for every peer.ConnectionEvent emit despite Subscribe being called. Adds the cross-process bridge: - New /peer/connection/events SSE endpoint (mirrors /peer/status/events). peerConnectionEventsHandler buffers 64 events to absorb slow consumers without backpressuring events.Emit; drops on overflow rather than growing unbounded. - Client.PeerStatusEvents(ctx, handler) and Client.PeerConnectionEvents( ctx, handler) in both mobile and nonmobile client variants. Mobile keeps the events.SubscribeContext path so in-process delivery still works for builds that bundle radiance with the consumer; otherwise falls through to SSE. The peer-status SSE endpoint and handler were already there; this PR just adds the matching client method so lantern-core can actually consume it. Co-Authored-By: Claude Opus 4.7 (1M context) (cherry picked from commit 29a4b7e3bec82e3ce701734bd9ca80e6db015474) --- ipc/client_events_mobile.go | 35 ++++++++++++++++++++++ ipc/client_events_nonmobile.go | 34 +++++++++++++++++++++ ipc/server.go | 55 ++++++++++++++++++++++++++++++++-- 3 files changed, 122 insertions(+), 2 deletions(-) diff --git a/ipc/client_events_mobile.go b/ipc/client_events_mobile.go index d03fca66..a3a8dfd2 100644 --- a/ipc/client_events_mobile.go +++ b/ipc/client_events_mobile.go @@ -9,6 +9,7 @@ import ( "github.com/getlantern/radiance/account" "github.com/getlantern/radiance/config" "github.com/getlantern/radiance/events" + "github.com/getlantern/radiance/peer" "github.com/getlantern/radiance/vpn" ) @@ -60,3 +61,37 @@ func (c *Client) DataCapStream(ctx context.Context, handler func(account.DataCap } return c.dataCapStream(ctx, handler) } + +// PeerStatusEvents — see client_events_nonmobile.go for the full +// docstring. Mobile builds may share a process with radiance (localOnly) +// in which case events.SubscribeContext delivers directly; otherwise the +// SSE retry loop matches the desktop path. +func (c *Client) PeerStatusEvents(ctx context.Context, handler func(peer.StatusEvent)) error { + events.SubscribeContext(ctx, handler) + if c.localOnly { + <-ctx.Done() + return ctx.Err() + } + return c.sseRetryLoop(ctx, peerStatusEventsEndpoint, func(data []byte) { + var evt peer.StatusEvent + if err := json.Unmarshal(data, &evt); err == nil { + handler(evt) + } + }) +} + +// PeerConnectionEvents — see client_events_nonmobile.go for the full +// docstring. Same mobile dual-path as PeerStatusEvents. +func (c *Client) PeerConnectionEvents(ctx context.Context, handler func(peer.ConnectionEvent)) error { + events.SubscribeContext(ctx, handler) + if c.localOnly { + <-ctx.Done() + return ctx.Err() + } + return c.sseRetryLoop(ctx, peerConnectionEventsEndpoint, func(data []byte) { + var evt peer.ConnectionEvent + if err := json.Unmarshal(data, &evt); err == nil { + handler(evt) + } + }) +} diff --git a/ipc/client_events_nonmobile.go b/ipc/client_events_nonmobile.go index 16d3184e..e0330fe1 100644 --- a/ipc/client_events_nonmobile.go +++ b/ipc/client_events_nonmobile.go @@ -7,6 +7,7 @@ import ( "encoding/json" "github.com/getlantern/radiance/account" + "github.com/getlantern/radiance/peer" "github.com/getlantern/radiance/vpn" ) @@ -40,3 +41,36 @@ func (c *Client) VPNStatusEvents(ctx context.Context, handler func(vpn.StatusUpd func (c *Client) DataCapStream(ctx context.Context, handler func(account.DataCapInfo)) error { return c.dataCapStream(ctx, handler) } + +// PeerStatusEvents streams peer-share lifecycle phase changes (mapping_port +// → registering → verifying → serving on Start, stopping → idle on Stop, +// error on failure). Each frame is a peer.StatusEvent JSON whose .Status +// is the live snapshot at the moment the event fired — consumers SHOULD +// re-render on every frame rather than diffing, since events.Emit's +// per-callback goroutine can land Start phases out of order. Blocks until +// ctx is cancelled. +func (c *Client) PeerStatusEvents(ctx context.Context, handler func(peer.StatusEvent)) error { + return c.sseRetryLoop(ctx, peerStatusEventsEndpoint, func(data []byte) { + var evt peer.StatusEvent + if err := json.Unmarshal(data, &evt); err == nil { + handler(evt) + } + }) +} + +// PeerConnectionEvents streams accept/close events for the local +// samizdat-in inbound. State is +1 on accept and -1 on close; Source +// is the remote "ip:port" string for geo-lookup / abuse attribution. +// Blocks until ctx is cancelled. +// +// Why this exists alongside events.Subscribe[peer.ConnectionEvent]: +// the events package's globals are process-scoped, so a subscriber in +// Liblantern can't see emits in lanternd. The SSE path bridges them. +func (c *Client) PeerConnectionEvents(ctx context.Context, handler func(peer.ConnectionEvent)) error { + return c.sseRetryLoop(ctx, peerConnectionEventsEndpoint, func(data []byte) { + var evt peer.ConnectionEvent + if err := json.Unmarshal(data, &evt); err == nil { + handler(evt) + } + }) +} diff --git a/ipc/server.go b/ipc/server.go index 114443b7..6825e7b8 100644 --- a/ipc/server.go +++ b/ipc/server.go @@ -65,8 +65,9 @@ const ( settingsEndpoint = "/settings" // Peer-share ("Share My Connection") endpoints - peerStatusEndpoint = "/peer/status" - peerStatusEventsEndpoint = "/peer/status/events" + peerStatusEndpoint = "/peer/status" + peerStatusEventsEndpoint = "/peer/status/events" + peerConnectionEventsEndpoint = "/peer/connection/events" // Split tunnel endpoint splitTunnelEndpoint = "/split-tunnel" @@ -234,6 +235,7 @@ func newLocalAPI(b *backend.LocalBackend, withAuth bool) *localapi { mux.HandleFunc("GET "+peerStatusEndpoint, traced(s.peerStatusHandler)) // SSE skips the tracer middleware since it buffers the entire response body. mux.HandleFunc("GET "+peerStatusEventsEndpoint, s.peerStatusEventsHandler) + mux.HandleFunc("GET "+peerConnectionEventsEndpoint, s.peerConnectionEventsHandler) // Split tunnel mux.HandleFunc(splitTunnelEndpoint, traced(s.splitTunnelHandler)) @@ -516,6 +518,55 @@ func (s *localapi) peerStatusEventsHandler(w http.ResponseWriter, r *http.Reques } } +// peerConnectionEventsHandler streams peer.ConnectionEvent over SSE for +// each accept/close on the local samizdat-in. Unlike peerStatusEventsHandler +// (which always sends the live snapshot), each emit's captured value is +// what the consumer needs here — the Source IP and +1/-1 state ARE the +// payload, not a periodic poll. Out-of-order +1/-1 from events.Emit's +// per-callback goroutine is fine: the consumer (lantern-core's globe-arc +// renderer) keys arcs by source, so it handles re-orderings naturally. +// +// The events package lives in this process (lanternd); cross-process +// consumers in Liblantern can only receive these via this SSE stream, +// since events.Subscribe in the Liblantern process sees a different +// (empty) subscriptions map. +func (s *localapi) peerConnectionEventsHandler(w http.ResponseWriter, r *http.Request) { + flusher := sseWriter(w) + if flusher == nil { + return + } + // Buffered channel so a slow SSE consumer doesn't apply backpressure + // to events.Emit (which spawns a goroutine per subscriber but blocks + // nothing). 64 holds ~one second of accept/close pairs under heavy + // load; beyond that we drop to avoid unbounded memory growth. + queue := make(chan peer.ConnectionEvent, 64) + sub := events.Subscribe(func(evt peer.ConnectionEvent) { + select { + case queue <- evt: + default: + // queue full — drop. SSE consumer is too slow; better to + // lose this event than to back up the events.Emit goroutine. + } + }) + defer sub.Unsubscribe() + + for { + select { + case evt := <-queue: + data, err := json.Marshal(evt) + if err != nil { + continue + } + if _, err := fmt.Fprintf(w, "data: %s\n\n", data); err != nil { + return + } + flusher.Flush() + case <-r.Context().Done(): + return + } + } +} + /////////////////////// // Server selection // /////////////////////// From 3debe3924c098027ac2e809e1c33eb0ffcbfaa6d Mon Sep 17 00:00:00 2001 From: Adam Fisk Date: Mon, 11 May 2026 16:12:47 -0600 Subject: [PATCH 10/12] bump lantern-box: real peer addr in ConnectionEvent.Source lantern-box bumps samizdat to plumb the underlying TLS conn's RemoteAddr through serverStreamConn. With this, peer.ConnectionEvent emitted from the peerconn listener carries a real peer ip:port instead of the "client:0" placeholder, so the Dart Share My Connection UI can key globe arcs per actual peer (and arcs persist through real connection lifetimes instead of flickering). Co-Authored-By: Claude Opus 4.7 (1M context) (cherry picked from commit 0b72cd457c21d8a9fcba2f8f4b4dc1c1ad4552f7) --- go.mod | 6 +----- go.sum | 18 ++---------------- 2 files changed, 3 insertions(+), 21 deletions(-) diff --git a/go.mod b/go.mod index 4f8586bd..07d0cc5d 100644 --- a/go.mod +++ b/go.mod @@ -2,10 +2,6 @@ module github.com/getlantern/radiance go 1.26.2 -// Local while peerconn listener registry is in flight; remove once -// lantern-box tags a release that includes tracker/peerconn. -replace github.com/getlantern/lantern-box => ../lantern-box - replace github.com/sagernet/sing => github.com/getlantern/sing v0.7.18-lantern replace github.com/sagernet/sing-box => github.com/getlantern/sing-box-minimal v1.12.22-lantern @@ -36,7 +32,7 @@ require ( github.com/getlantern/domainfront v0.0.0-20260419161617-0bff0b2169f4 github.com/getlantern/keepcurrent v0.0.0-20260422161259-54a4d9a93694 github.com/getlantern/kindling v0.0.0-20260529141244-21f8b144afab - github.com/getlantern/lantern-box v0.0.86 + github.com/getlantern/lantern-box v0.0.87-0.20260529195337-0b63c0f42962 github.com/getlantern/pluriconfig v0.0.0-20251126214241-8cc8bc561535 github.com/getlantern/publicip v0.0.0-20260328175246-2c460fe80c6b github.com/getlantern/semconv v0.0.0-20260327040646-21845dda05cb diff --git a/go.sum b/go.sum index be09ac43..5d6c042c 100644 --- a/go.sum +++ b/go.sum @@ -246,26 +246,12 @@ github.com/getlantern/hidden v0.0.0-20220104173330-f221c5a24770 h1:cSrD9ryDfTV2y github.com/getlantern/hidden v0.0.0-20220104173330-f221c5a24770/go.mod h1:GOQsoDnEHl6ZmNIL+5uVo+JWRFWozMEp18Izcb++H+A= github.com/getlantern/keepcurrent v0.0.0-20260422161259-54a4d9a93694 h1:iLWm6S/47Hfk7FjW6yaD+1h6kO7C/iauV0DkVia/bXU= github.com/getlantern/keepcurrent v0.0.0-20260422161259-54a4d9a93694/go.mod h1:ag5g9aWUw2FJcX5RVRpJ9EBQBy5yJuy2WXDouIn/m4w= -<<<<<<< HEAD github.com/getlantern/kindling v0.0.0-20260529141244-21f8b144afab h1:PitYhTvo3oHRKYl4pVAoOIN8bhM+Bw+JBWncMglvHSg= github.com/getlantern/kindling v0.0.0-20260529141244-21f8b144afab/go.mod h1:TGTxpoNVwc8Be4qkBNtf5oj2psJaEIZEq47GOPS7zkA= -github.com/getlantern/lantern-box v0.0.86 h1:myJa+Crg/oMgqSFhX7DOox4XcVIx8VFiPnkel8x8YT4= -github.com/getlantern/lantern-box v0.0.86/go.mod h1:BVXPyEicSu7m4nQY1OHPkOZNj87M7sYrzmY9AgyiPkc= -======= -<<<<<<< HEAD -github.com/getlantern/kindling v0.0.0-20260516120759-a9712f95df03 h1:dUTN7mnTTBcSvsURNs1rTlyKrD1uXUEPqxEZDfl+hb4= -github.com/getlantern/kindling v0.0.0-20260516120759-a9712f95df03/go.mod h1:TGTxpoNVwc8Be4qkBNtf5oj2psJaEIZEq47GOPS7zkA= -github.com/getlantern/lantern-box v0.0.84 h1:y+nezmu0LZDlzcS2A4oKDu3f1UTFAgA24vT1htvEiX0= -github.com/getlantern/lantern-box v0.0.84/go.mod h1:6SO1p22tAq9y8JLjNnAbr4/GZ4VjmlcQGYn0qF4aD/k= ->>>>>>> cd64ed8 (peer: emit ConnectionEvent on samizdat accept/close) +github.com/getlantern/lantern-box v0.0.87-0.20260529195337-0b63c0f42962 h1:VSSC7BIn42+tQmhoYg7Wc+ilkXC4SdoJ0LQ6+4kvtC0= +github.com/getlantern/lantern-box v0.0.87-0.20260529195337-0b63c0f42962/go.mod h1:BVXPyEicSu7m4nQY1OHPkOZNj87M7sYrzmY9AgyiPkc= github.com/getlantern/lantern-water v0.0.0-20260520145825-958775d51395 h1:grfGavAUp2E9w9ZoJuM3FyWyQ0sCJ64V4ZMKtZKRqTc= github.com/getlantern/lantern-water v0.0.0-20260520145825-958775d51395/go.mod h1:3JpJgwi4KEI6rS9loOAvcBp+F2jP65d0tTg2GQcTPBU= -======= -github.com/getlantern/kindling v0.0.0-20260428171407-6143132aaf40 h1:P5pkaBGxWOGBn7bKzjzdln/ro+ShG1RUbOuy+7pSzXE= -github.com/getlantern/kindling v0.0.0-20260428171407-6143132aaf40/go.mod h1:TGTxpoNVwc8Be4qkBNtf5oj2psJaEIZEq47GOPS7zkA= -github.com/getlantern/lantern-water v0.0.0-20260317143726-e0ee64a11d90 h1:P9JX1yAu2uq3b5YiT0sLtHkTrkZuttV8gPZh81nUuag= -github.com/getlantern/lantern-water v0.0.0-20260317143726-e0ee64a11d90/go.mod h1:3JpJgwi4KEI6rS9loOAvcBp+F2jP65d0tTg2GQcTPBU= ->>>>>>> 231462b (peer: emit ConnectionEvent on samizdat accept/close) github.com/getlantern/ops v0.0.0-20231025133620-f368ab734534 h1:3BwvWj0JZzFEvNNiMhCu4bf60nqcIuQpTYb00Ezm1ag= github.com/getlantern/ops v0.0.0-20231025133620-f368ab734534/go.mod h1:ZsLfOY6gKQOTyEcPYNA9ws5/XHZQFroxqCOhHjGcs9Y= github.com/getlantern/osversion v0.0.0-20240418205916-2e84a4a4e175 h1:JWH5BB2o0eAeGs0tZnFPpQGx+nMIo/WmxKnj2hnGjgE= From 06ceb1fa61c7fedf07b710af920a8dd0163dbf36 Mon Sep 17 00:00:00 2001 From: Adam Fisk Date: Sat, 30 May 2026 00:46:34 -0600 Subject: [PATCH 11/12] peer/events: address Copilot review on #499 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Four substantive findings (the fifth — local replace directive in go.mod — was already addressed in the cascade rebase to 3debe39): 1. ConnectionEvent now carries a Timestamp field (Unix millis at emit time). Consumers that need ordering across the async dispatch path, or that aggregate over a time window, can compare it directly without snapshotting wall time at receive. 2. Per-connection forwarding log dropped from Info to Debug. Two reasons: under real traffic the Info-level breadcrumb floods logs, and the remote ip:port doesn't belong in routinely-collected client logs for a censorship-circumvention tool. Operators investigating 'no globe arcs despite samizdat traffic' can flip the level. The once-per-session listener-registration line stays at Info — that's a lifecycle event, not a per-connection breadcrumb. 3. events.Emit's diagnostic hook (emitDebugLogger) now defaults to a no-op instead of a synchronous log.Printf, and is called AFTER releasing subscriptionsMu rather than under the RLock. The old default both spammed prod logs and let the logger amplify subscriptionsMu contention on hot event types. Added SetEmitDebugLogger so callers (tests, diagnostic builds) can swap in a real logger when investigating a specific path; nil restores the no-op. The 'log' import is no longer needed. 4. TestClient_StatusEventEmittedOnStartAndStop rewritten to assert set-membership of expected phases + the final-state contract (PhaseServing carries Active=true, RouteID set; all others carry Active=false). events.Emit dispatches each subscriber's callback in its own goroutine, so the channel-arrival order of multiple sequential Emits is non-deterministic and the previous strict- ordered receive was inherently flaky. New drainPhases helper collects N events keyed by Phase (last-write-per-phase wins). Tests pass 5/5 times under -race -count=1. Co-Authored-By: Claude Opus 4.7 --- events/events.go | 34 +++++++++++-------- peer/peer.go | 31 +++++++++++------ peer/peer_test.go | 86 +++++++++++++++++++++++++++++++++-------------- 3 files changed, 101 insertions(+), 50 deletions(-) diff --git a/events/events.go b/events/events.go index a5e37cd4..66219455 100644 --- a/events/events.go +++ b/events/events.go @@ -28,7 +28,6 @@ package events import ( "context" - stdlog "log" "reflect" "sync" "sync/atomic" @@ -120,15 +119,13 @@ func (e *Subscription[T]) Unsubscribe() { // asynchronously in separate goroutines. func Emit[T Event](evt T) { subscriptionsMu.RLock() - defer subscriptionsMu.RUnlock() key := reflect.TypeFor[T]() subs, ok := subscriptions[key] - // Diagnostic: surfaces the subscriber count at emit time so a missing - // FlutterEvent on the consumer side is distinguishable from "no - // subscribers registered for this type" vs "subscribers registered - // but callback panics silently." Spam-friendly when traffic spikes, - // but we're investigating a zero-callback path so the noise is - // short-lived; remove (or downgrade to Debug) once the chain works. + subscriptionsMu.RUnlock() + // Diagnostic hook; default no-op so high-frequency event types don't + // flood logs in prod. Tests / debugging swap in a real logger via + // SetEmitDebugLogger. Called after releasing subscriptionsMu so a + // blocking logger can't amplify lock contention on hot event types. emitDebugLogger(key, len(subs)) if !ok { return @@ -138,10 +135,19 @@ func Emit[T Event](evt T) { } } -// emitDebugLogger is a package-level var so tests can suppress the -// per-emit log, and so prod can swap in slog. Default uses Go's stdlib -// log so events package doesn't need to import slog (and avoid a cycle -// with anything that imports events for its own log forwarding). -var emitDebugLogger = func(key reflect.Type, subCount int) { - stdlog.Printf("events.Emit type=%s subscribers=%d", key, subCount) +// emitDebugLogger is invoked once per Emit with the event type and +// current subscriber count. Default is a no-op; callers (tests, +// diagnostic builds) swap in a real logger via SetEmitDebugLogger. +var emitDebugLogger = func(reflect.Type, int) {} + +// SetEmitDebugLogger replaces the no-op diagnostic hook for the +// duration of an investigation (e.g., tracking "events vanish" paths). +// Pass nil to restore the no-op default. Safe to call from main / +// init; not safe to call concurrently with Emit on the hot path. +func SetEmitDebugLogger(fn func(eventType reflect.Type, subscriberCount int)) { + if fn == nil { + emitDebugLogger = func(reflect.Type, int) {} + return + } + emitDebugLogger = fn } diff --git a/peer/peer.go b/peer/peer.go index fb8cf8c2..c9b87281 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -66,14 +66,19 @@ type StatusEvent struct { // ConnectionEvent fires every time a remote client opens or closes a // samizdat session against the local peer's inbound. Source carries the // remote "ip:port" string; consumers (the globe view, abuse aggregation) -// extract the IP for geo-lookup or rate-limit attribution. +// extract the IP for geo-lookup or rate-limit attribution. Timestamp +// is the emit time in Unix millis; consumers that aggregate across a +// time window or that need to order events when the underlying +// dispatch is async can compare it directly. // -// State +1 on accept, -1 on close -// Source remote peer "ip:port" +// State +1 on accept, -1 on close +// Source remote peer "ip:port" +// Timestamp emit time in Unix milliseconds type ConnectionEvent struct { events.Event - State int `json:"state"` - Source string `json:"source"` + State int `json:"state"` + Source string `json:"source"` + Timestamp int64 `json:"timestamp"` } // Port range chosen to minimize collision risk on the typical home network, @@ -389,13 +394,17 @@ func (c *Client) Start(ctx context.Context) (retErr error) { "state", state, "source", source) return } - // One-line breadcrumb per accept/close so we can correlate samizdat-in - // activity with peer-connection FlutterEvents on the consumer side - // — without this, "no globe arcs despite samizdat traffic" is - // indistinguishable from "events fire but the bridge swallows them." - slog.Info("peer listener: forwarding connection event", + // Per-connection breadcrumb correlates samizdat-in activity with + // peer-connection FlutterEvents on the consumer side. Debug-level + // so prod logs aren't flooded under real traffic and so the + // remote ip:port doesn't land in routinely-collected client logs; + // operators investigating "no globe arcs despite samizdat traffic" + // can flip the level. The listener-registration line below stays + // at Info — that's a once-per-session lifecycle event, not a + // per-connection breadcrumb. + slog.Debug("peer listener: forwarding connection event", "state", state, "source", source) - events.Emit(ConnectionEvent{State: state, Source: source}) + events.Emit(ConnectionEvent{State: state, Source: source, Timestamp: time.Now().UnixMilli()}) }) slog.Info("peer listener: registered with peerconn", "route_id", regResp.RouteID) diff --git a/peer/peer_test.go b/peer/peer_test.go index 29e9c0fe..eb392464 100644 --- a/peer/peer_test.go +++ b/peer/peer_test.go @@ -710,39 +710,75 @@ func TestClient_StatusEventEmittedOnStartAndStop(t *testing.T) { require.NoError(t, c.Start(context.Background())) - wantStartPhases := []Phase{ - PhaseMappingPort, - PhaseDetectingIP, - PhaseRegistering, - PhaseStartingBox, - PhaseVerifying, - PhaseServing, - } - for _, want := range wantStartPhases { - select { - case evt := <-got: - assert.Equal(t, want, evt.Status.Phase, "wrong phase in Start sequence") - if want == PhaseServing { - assert.True(t, evt.Status.Active, "active must be true on serving") - assert.NotEmpty(t, evt.Status.RouteID, "route_id must be set on serving") - } else { - assert.False(t, evt.Status.Active, "active must be false on intermediate phase %q", want) - } - case <-time.After(time.Second): - t.Fatalf("no Start status event for phase %q within 1s", want) + // events.Emit dispatches each callback in a separate goroutine, so + // the order events land on the channel isn't deterministic — assert + // set-membership of the expected phases + the final-state contract + // (the only state observers actually care about) rather than the + // sequence. + wantStartPhases := map[Phase]bool{ + PhaseMappingPort: true, + PhaseDetectingIP: true, + PhaseRegistering: true, + PhaseStartingBox: true, + PhaseVerifying: true, + PhaseServing: true, + } + startEvents := drainPhases(t, got, len(wantStartPhases)) + for want := range wantStartPhases { + assert.Contains(t, startEvents, want, "Start sequence missing phase %q", want) + } + servingEvt, ok := startEvents[PhaseServing] + require.True(t, ok, "Start sequence must reach PhaseServing") + assert.True(t, servingEvt.Status.Active, "active must be true on serving") + assert.NotEmpty(t, servingEvt.Status.RouteID, "route_id must be set on serving") + for phase, evt := range startEvents { + if phase == PhaseServing { + continue } + assert.False(t, evt.Status.Active, "active must be false on intermediate phase %q", phase) } require.NoError(t, c.Stop(context.Background())) - for _, want := range []Phase{PhaseStopping, PhaseIdle} { + wantStopPhases := map[Phase]bool{ + PhaseStopping: true, + PhaseIdle: true, + } + stopEvents := drainPhases(t, got, len(wantStopPhases)) + for want := range wantStopPhases { + assert.Contains(t, stopEvents, want, "Stop sequence missing phase %q", want) + } + for phase, evt := range stopEvents { + assert.False(t, evt.Status.Active, "active must be false during stop (phase %q)", phase) + } +} + +// drainPhases reads up to n StatusEvents from got and returns them +// keyed by Phase (last event per phase wins). Used by tests that need +// set-membership semantics rather than strict ordering because +// events.Emit's per-callback goroutines deliver out of order under +// the runtime's scheduling. +func drainPhases(t *testing.T, got <-chan StatusEvent, n int) map[Phase]StatusEvent { + t.Helper() + out := make(map[Phase]StatusEvent, n) + deadline := time.After(2 * time.Second) + for i := 0; i < n; i++ { select { case evt := <-got: - assert.Equal(t, want, evt.Status.Phase, "wrong phase in Stop sequence") - assert.False(t, evt.Status.Active, "active must be false during stop") - case <-time.After(time.Second): - t.Fatalf("no Stop status event for phase %q within 1s", want) + out[evt.Status.Phase] = evt + case <-deadline: + t.Fatalf("received only %d/%d status events within 2s; got phases: %v", + i, n, mapKeys(out)) } } + return out +} + +func mapKeys[K comparable, V any](m map[K]V) []K { + keys := make([]K, 0, len(m)) + for k := range m { + keys = append(keys, k) + } + return keys } // TestClient_StatusEventOnStartError surfaces a Start failure to the UI From 6fc208b7913d817c6d60a86a5f4f556787b02128 Mon Sep 17 00:00:00 2001 From: Adam Fisk Date: Sat, 30 May 2026 15:25:58 -0600 Subject: [PATCH 12/12] events: snapshot subscribers under RLock before iterating MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Round-1's fix to move emitDebugLogger out from under subscriptionsMu.RLock (so a blocking logger couldn't amplify lock contention) accidentally moved the per-subscriber iteration outside the lock too. Iterating the subscriptions[key] map after RUnlock races against Unsubscribe's write-locked mutation — guaranteed 'concurrent map iteration and map write' panic under load. Fix: snapshot the callbacks into a slice while holding the RLock, then drop the lock and run emitDebugLogger + the per-callback goroutine spawns over the slice. Slice iteration is race-free because the slice itself is unshared. The original code (pre-round-1) was correct because it held the RLock for the whole function via defer — that's still safe but it forces logger calls to run under the lock. The snapshot pattern gets both properties: no iteration race + no blocking under the lock. Co-Authored-By: Claude Opus 4.7 --- events/events.go | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/events/events.go b/events/events.go index 66219455..0e84470f 100644 --- a/events/events.go +++ b/events/events.go @@ -118,19 +118,25 @@ func (e *Subscription[T]) Unsubscribe() { // Emit notifies all subscribers of the event, passing event data. Callbacks are invoked // asynchronously in separate goroutines. func Emit[T Event](evt T) { - subscriptionsMu.RLock() key := reflect.TypeFor[T]() - subs, ok := subscriptions[key] - subscriptionsMu.RUnlock() - // Diagnostic hook; default no-op so high-frequency event types don't - // flood logs in prod. Tests / debugging swap in a real logger via - // SetEmitDebugLogger. Called after releasing subscriptionsMu so a - // blocking logger can't amplify lock contention on hot event types. - emitDebugLogger(key, len(subs)) - if !ok { - return + // Snapshot the callbacks into a slice under the RLock, then drop + // the lock before doing anything that could block (the diagnostic + // log, the per-callback goroutine spawn). Iterating the underlying + // map after releasing the lock would race against Unsubscribe's + // write lock — `concurrent map iteration and map write` panic + // territory under load. + subscriptionsMu.RLock() + subsMap := subscriptions[key] + cbs := make([]func(any), 0, len(subsMap)) + for _, cb := range subsMap { + cbs = append(cbs, cb) } - for _, cb := range subs { + subscriptionsMu.RUnlock() + // Diagnostic hook; default no-op so high-frequency event types + // don't flood logs in prod. Tests / debugging swap in a real + // logger via SetEmitDebugLogger. + emitDebugLogger(key, len(cbs)) + for _, cb := range cbs { go cb(evt) } }