Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ cancelRequests()

```go
// BAD — restates name, generic preamble, narrates the code
// updateURLTestListener manages the lifecycle of the URL test result listener
// updateSelectionHistoryListener manages the lifecycle of the selection history listener
// across VPN status changes. Connected always re-attaches (canceling any
// existing listener) so a stale event still leaves the listener bound to
// the live storage.
Expand Down
31 changes: 31 additions & 0 deletions backend/exhaustion_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package backend

import (
"testing"
"time"
)

func TestExhaustionGate_AllowRateLimitsBelowGap(t *testing.T) {
prev := defaultExhaustionRefetchGap
defaultExhaustionRefetchGap = 50 * time.Millisecond
t.Cleanup(func() { defaultExhaustionRefetchGap = prev })

var g exhaustionGate
if !g.allow() {
t.Fatal("first allow must pass on a zero gate")
}
if g.allow() {
t.Error("second allow inside the gap must be rate-limited")
}
if g.allow() {
t.Error("third allow inside the gap must still be rate-limited")
}

time.Sleep(defaultExhaustionRefetchGap + 10*time.Millisecond)
if !g.allow() {
t.Error("allow after the gap elapses must pass again")
}
if g.allow() {
t.Error("post-recovery allow must re-arm the gate")
}
}
156 changes: 99 additions & 57 deletions backend/radiance.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (
"github.com/getlantern/radiance/traces"
"github.com/getlantern/radiance/vpn"

"github.com/sagernet/sing-box/adapter"
lbA "github.com/getlantern/lantern-box/adapter"
"github.com/sagernet/sing-box/option"
)

Expand Down Expand Up @@ -75,8 +75,10 @@ type LocalBackend struct {
stopDataCap context.CancelFunc
dataCapMu sync.Mutex

stopURLTestListener context.CancelFunc
urlTestMu sync.Mutex
stopSelectionHistoryListener context.CancelFunc
selectionHistoryMu sync.Mutex

exhaustionGate exhaustionGate
}

type Options struct {
Expand Down Expand Up @@ -287,10 +289,10 @@ func (r *LocalBackend) Start() {
// ErrTunnelAlreadyConnected is the expected, non-error case while
// the VPN is up: setServers above already pushed the new outbounds
// (and any bandit URL overrides) into the running tunnel, and
// addOutbounds triggers an immediate URL test cycle for them via
// MutableURLTest.CheckOutbounds. The "offline" pre-warm path here
// is for the not-yet-connected case only — running both would
// duplicate work and conflict with the live URLTest selector.
// addOutbounds triggers an immediate probe cycle for them via
// MutableAutoSelect.CheckOutbounds. The "offline" pre-warm path
// here is for the not-yet-connected case only — running both
// would duplicate work and conflict with the live auto-select group.
slog.Error("Failed to run offline URL tests after config update", "error", err)
}
})
Expand Down Expand Up @@ -322,7 +324,10 @@ func (r *LocalBackend) startVPNStatusListeners() {
r.updateDataCapStream(evt.Status)
})
events.SubscribeContext(r.ctx, func(evt vpn.StatusUpdateEvent) {
r.updateURLTestListener(evt.Status)
r.updateSelectionHistoryListener(evt.Status)
})
events.SubscribeContext(r.ctx, func(vpn.ExhaustionEvent) {
r.refetchOnExhaustion()
})
events.SubscribeContext(r.ctx, func(evt vpn.StatusUpdateEvent) {
switch evt.Status {
Expand Down Expand Up @@ -417,7 +422,7 @@ func baseIssueAttachments() []string {
// UpdateConfig forces an immediate fetch of the latest configuration. It returns
// [config.ErrConfigFetchDisabled] if config fetching is disabled in settings.
func (r *LocalBackend) UpdateConfig() error {
return r.confHandler.Update()
return r.confHandler.Fetch()
}

// Features returns the features available in the current configuration, returned from the server in the
Expand Down Expand Up @@ -640,78 +645,79 @@ func (r *LocalBackend) RevokePrivateServerInvite(ip string, port int, accessToke
return r.srvManager.RevokePrivateServerInvite(ip, port, accessToken, inviteName)
}

// urlTestFlushInterval bounds how often URL test results are written back to the servers manager
// (and to disk). URL test cycles run on the order of minutes and notify per-result, so coalescing
// into a periodic flush avoids re-marshalling and re-writing the servers file for each parallel result.
const urlTestFlushInterval = 5 * time.Second
const selectionHistoryFlushInterval = 5 * time.Second

func (r *LocalBackend) updateURLTestListener(status vpn.VPNStatus) {
r.urlTestMu.Lock()
defer r.urlTestMu.Unlock()
// Status events are dispatched in unordered goroutines, so reacting to
// intermediate statuses (Connecting, Disconnecting, Restarting) risks a
// stale handler tearing down a listener a concurrent Connected handler
// just attached to the new tunnel.
func (r *LocalBackend) updateSelectionHistoryListener(status vpn.VPNStatus) {
r.selectionHistoryMu.Lock()
defer r.selectionHistoryMu.Unlock()
switch status {
case vpn.Connected:
if r.stopURLTestListener != nil {
r.stopURLTestListener()
r.stopURLTestListener = nil
if r.stopSelectionHistoryListener != nil {
r.stopSelectionHistoryListener()
r.stopSelectionHistoryListener = nil
}
storage := r.vpnClient.HistoryStorage()
if storage == nil {
return
}
ctx, cancel := context.WithCancel(r.ctx)
r.stopURLTestListener = cancel
r.stopSelectionHistoryListener = cancel
hook := make(chan struct{}, 1)
storage.SetHook(hook)
go r.runURLTestListener(ctx, storage, hook)
slog.Debug("Started URL test result listener")
storage.SetHook(func(string) {
// Per-tag granularity isn't useful — flushSelectionHistory
// iterates every server. Non-blocking send so storage
// writes never block on a slow flush.
select {
case hook <- struct{}{}:
default:
}
})
go r.runSelectionHistoryListener(ctx, storage, hook)
slog.Debug("Started selection history listener")
case vpn.Disconnected, vpn.ErrorStatus:
if r.stopURLTestListener != nil {
r.stopURLTestListener()
r.stopURLTestListener = nil
slog.Debug("Stopped URL test result listener")
if r.stopSelectionHistoryListener != nil {
r.stopSelectionHistoryListener()
r.stopSelectionHistoryListener = nil
slog.Debug("Stopped selection history listener")
}
}
}

// runURLTestListener coalesces per-result hook notifications into a periodic flush so the servers
// file isn't rewritten for each parallel URL test completion. A final flush runs on shutdown so any
// results that arrived since the last tick are persisted.
func (r *LocalBackend) runURLTestListener(ctx context.Context, storage vpn.URLTestHistoryStorage, hook <-chan struct{}) {
ticker := time.NewTicker(urlTestFlushInterval)
// runSelectionHistoryListener coalesces per-result hook notifications into a periodic flush so the
// servers file isn't rewritten for each parallel probe completion. A final flush runs on shutdown so
// any results that arrived since the last tick are persisted.
func (r *LocalBackend) runSelectionHistoryListener(ctx context.Context, storage vpn.AutoSelectHistoryStorage, hook <-chan struct{}) {
ticker := time.NewTicker(selectionHistoryFlushInterval)
defer ticker.Stop()
dirty := true // start dirty so we persist any results that arrived before the listener started
for {
select {
case <-ctx.Done():
if dirty {
r.flushURLTestResults(storage)
r.flushSelectionHistory(storage)
}
return
case <-hook:
dirty = true
case <-ticker.C:
if dirty {
r.flushURLTestResults(storage)
r.flushSelectionHistory(storage)
dirty = false
}
}
}
}

func (r *LocalBackend) flushURLTestResults(storage vpn.URLTestHistoryStorage) {
results := make(map[string]servers.URLTestResult)
func (r *LocalBackend) flushSelectionHistory(storage vpn.AutoSelectHistoryStorage) {
results := make(map[string]servers.SelectionHistory)
for _, srv := range r.srvManager.AllServers() {
if h := storage.LoadURLTestHistory(srv.Tag); h != nil {
results[srv.Tag] = servers.URLTestResult{Delay: h.Delay, Time: h.Time}
if h := storage.Load(srv.Tag); h != nil {
results[srv.Tag] = *h
}
}
if len(results) > 0 {
if err := r.srvManager.UpdateURLTestResults(results); err != nil {
slog.Warn("Failed to persist URL test results", "error", err)
if err := r.srvManager.UpdateSelectionHistory(results); err != nil {
slog.Warn("Failed to persist selection history", "error", err)
}
}
}
Expand Down Expand Up @@ -751,15 +757,14 @@ func (r *LocalBackend) getBoxOptions() vpn.BoxOptions {
if cfg != nil {
bOptions.Options = cfg.Options
bOptions.BanditURLOverrides = cfg.BanditURLOverrides
bOptions.BanditThroughputURL = cfg.BanditThroughputURL
if settings.GetBool(settings.SmartRoutingKey) {
bOptions.SmartRouting = cfg.SmartRouting
}
if settings.GetBool(settings.AdBlockKey) {
bOptions.AdBlock = cfg.AdBlock
}
}
seed := make(map[string]adapter.URLTestHistory)
seed := make(map[string]lbA.TagHistory)
for _, srv := range r.srvManager.AllServers() {
if !srv.IsLantern {
switch opts := srv.Options.(type) {
Expand All @@ -769,15 +774,12 @@ func (r *LocalBackend) getBoxOptions() vpn.BoxOptions {
bOptions.Options.Endpoints = append(bOptions.Options.Endpoints, opts)
}
}
if srv.URLTestResult != nil {
seed[srv.Tag] = adapter.URLTestHistory{
Time: srv.URLTestResult.Time,
Delay: srv.URLTestResult.Delay,
}
if srv.SelectionHistory != nil {
seed[srv.Tag] = *srv.SelectionHistory
}
}
if len(seed) > 0 {
bOptions.URLTestSeed = seed
bOptions.SelectionHistorySeed = seed
}
return bOptions
}
Expand Down Expand Up @@ -951,13 +953,21 @@ func (r *LocalBackend) RunOfflineURLTests() error {
return err
}
now := time.Now()
urlResults := make(map[string]servers.URLTestResult, len(results))
histories := make(map[string]servers.SelectionHistory, len(results))
for tag, delay := range results {
urlResults[tag] = servers.URLTestResult{Delay: delay, Time: now}
// Offline pre-warm produces only a single success delay per tag;
// shape it as a probe-success snapshot so the production tunnel's
// AutoSelectHistoryStorage can seed cold-start ranking from it
// without further translation.
histories[tag] = lbA.TagHistory{
LastSuccessDelayMs: uint32(delay),
LastOutcomeAt: now,
UpdatedAt: now,
}
}
if len(urlResults) > 0 {
if err := r.srvManager.UpdateURLTestResults(urlResults); err != nil {
slog.Warn("Failed to persist offline URL test results", "error", err)
if len(histories) > 0 {
if err := r.srvManager.UpdateSelectionHistory(histories); err != nil {
slog.Warn("Failed to persist offline selection history", "error", err)
}
selected, err := r.vpnClient.CurrentAutoSelectedServer()
if err != nil {
Expand All @@ -969,6 +979,38 @@ func (r *LocalBackend) RunOfflineURLTests() error {
return nil
}

// defaultExhaustionRefetchGap rate-limits exhaustion-driven refetches
// so a misbehaving config handler can't hammer /config-new.
var defaultExhaustionRefetchGap = time.Minute

// exhaustionGate rate-limits exhaustion-driven refetches. lastAt is
// recorded before the fetch runs so a failing fetcher doesn't
// tight-loop.
type exhaustionGate struct {
mu sync.Mutex
lastAt time.Time
}

func (g *exhaustionGate) allow() bool {
g.mu.Lock()
defer g.mu.Unlock()
now := time.Now()
if !g.lastAt.IsZero() && now.Sub(g.lastAt) < defaultExhaustionRefetchGap {
return false
}
g.lastAt = now
return true
}

func (r *LocalBackend) refetchOnExhaustion() {
if !r.exhaustionGate.allow() {
return
}
if err := r.confHandler.Fetch(); err != nil {
slog.Warn("MutableAutoSelect exhaustion refetch failed", "error", err)
}
}

//////////////////
// Split Tunnel //
/////////////////
Expand Down
8 changes: 0 additions & 8 deletions backend/radiance_test.go

This file was deleted.

Loading
Loading