Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ cancelRequests()

```go
// BAD — restates name, generic preamble, narrates the code
// updateURLTestListener manages the lifecycle of the URL test result listener
// updateSelectionHistoryListener manages the lifecycle of the selection history listener
// across VPN status changes. Connected always re-attaches (canceling any
// existing listener) so a stale event still leaves the listener bound to
// the live storage.
Expand Down
31 changes: 31 additions & 0 deletions backend/exhaustion_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package backend

import (
"testing"
"time"
)

func TestExhaustionGate_AllowRateLimitsBelowGap(t *testing.T) {
prev := defaultExhaustionRefetchGap
defaultExhaustionRefetchGap = 50 * time.Millisecond
t.Cleanup(func() { defaultExhaustionRefetchGap = prev })

var g exhaustionGate
if !g.allow() {
t.Fatal("first allow must pass on a zero gate")
}
if g.allow() {
t.Error("second allow inside the gap must be rate-limited")
}
if g.allow() {
t.Error("third allow inside the gap must still be rate-limited")
}

time.Sleep(defaultExhaustionRefetchGap + 10*time.Millisecond)
if !g.allow() {
t.Error("allow after the gap elapses must pass again")
}
if g.allow() {
t.Error("post-recovery allow must re-arm the gate")
}
}
156 changes: 99 additions & 57 deletions backend/radiance.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (
"github.com/getlantern/radiance/traces"
"github.com/getlantern/radiance/vpn"

"github.com/sagernet/sing-box/adapter"
lbA "github.com/getlantern/lantern-box/adapter"
"github.com/sagernet/sing-box/option"
)

Expand Down Expand Up @@ -75,8 +75,10 @@ type LocalBackend struct {
stopDataCap context.CancelFunc
dataCapMu sync.Mutex

stopURLTestListener context.CancelFunc
urlTestMu sync.Mutex
stopSelectionHistoryListener context.CancelFunc
selectionHistoryMu sync.Mutex

exhaustionGate exhaustionGate
}

type Options struct {
Expand Down Expand Up @@ -287,10 +289,10 @@ func (r *LocalBackend) Start() {
// ErrTunnelAlreadyConnected is the expected, non-error case while
// the VPN is up: setServers above already pushed the new outbounds
// (and any bandit URL overrides) into the running tunnel, and
// addOutbounds triggers an immediate URL test cycle for them via
// MutableURLTest.CheckOutbounds. The "offline" pre-warm path here
// is for the not-yet-connected case only — running both would
// duplicate work and conflict with the live URLTest selector.
// addOutbounds triggers an immediate probe cycle for them via
// MutableAutoSelect.CheckOutbounds. The "offline" pre-warm path
// here is for the not-yet-connected case only — running both
// would duplicate work and conflict with the live auto-select group.
slog.Error("Failed to run offline URL tests after config update", "error", err)
}
})
Expand Down Expand Up @@ -322,7 +324,10 @@ func (r *LocalBackend) startVPNStatusListeners() {
r.updateDataCapStream(evt.Status)
})
events.SubscribeContext(r.ctx, func(evt vpn.StatusUpdateEvent) {
r.updateURLTestListener(evt.Status)
r.updateSelectionHistoryListener(evt.Status)
})
events.SubscribeContext(r.ctx, func(vpn.ExhaustionEvent) {
r.refetchOnExhaustion()
})
events.SubscribeContext(r.ctx, func(evt vpn.StatusUpdateEvent) {
switch evt.Status {
Expand Down Expand Up @@ -417,7 +422,7 @@ func baseIssueAttachments() []string {
// UpdateConfig forces an immediate fetch of the latest configuration. It returns
// [config.ErrConfigFetchDisabled] if config fetching is disabled in settings.
func (r *LocalBackend) UpdateConfig() error {
return r.confHandler.Update()
return r.confHandler.Fetch()
}

// Features returns the features available in the current configuration, returned from the server in the
Expand Down Expand Up @@ -640,78 +645,79 @@ func (r *LocalBackend) RevokePrivateServerInvite(ip string, port int, accessToke
return r.srvManager.RevokePrivateServerInvite(ip, port, accessToken, inviteName)
}

// urlTestFlushInterval bounds how often URL test results are written back to the servers manager
// (and to disk). URL test cycles run on the order of minutes and notify per-result, so coalescing
// into a periodic flush avoids re-marshalling and re-writing the servers file for each parallel result.
const urlTestFlushInterval = 5 * time.Second
const selectionHistoryFlushInterval = 5 * time.Second

func (r *LocalBackend) updateURLTestListener(status vpn.VPNStatus) {
r.urlTestMu.Lock()
defer r.urlTestMu.Unlock()
// Status events are dispatched in unordered goroutines, so reacting to
// intermediate statuses (Connecting, Disconnecting, Restarting) risks a
// stale handler tearing down a listener a concurrent Connected handler
// just attached to the new tunnel.
func (r *LocalBackend) updateSelectionHistoryListener(status vpn.VPNStatus) {
r.selectionHistoryMu.Lock()
defer r.selectionHistoryMu.Unlock()
switch status {
case vpn.Connected:
if r.stopURLTestListener != nil {
r.stopURLTestListener()
r.stopURLTestListener = nil
if r.stopSelectionHistoryListener != nil {
r.stopSelectionHistoryListener()
r.stopSelectionHistoryListener = nil
}
storage := r.vpnClient.HistoryStorage()
if storage == nil {
return
}
ctx, cancel := context.WithCancel(r.ctx)
r.stopURLTestListener = cancel
r.stopSelectionHistoryListener = cancel
hook := make(chan struct{}, 1)
storage.SetHook(hook)
go r.runURLTestListener(ctx, storage, hook)
slog.Debug("Started URL test result listener")
storage.SetHook(func(string) {
// Per-tag granularity isn't useful — flushSelectionHistory
// iterates every server. Non-blocking send so storage
// writes never block on a slow flush.
select {
case hook <- struct{}{}:
default:
}
})
go r.runSelectionHistoryListener(ctx, storage, hook)
slog.Debug("Started selection history listener")
case vpn.Disconnected, vpn.ErrorStatus:
if r.stopURLTestListener != nil {
r.stopURLTestListener()
r.stopURLTestListener = nil
slog.Debug("Stopped URL test result listener")
if r.stopSelectionHistoryListener != nil {
r.stopSelectionHistoryListener()
r.stopSelectionHistoryListener = nil
slog.Debug("Stopped selection history listener")
}
}
}

// runURLTestListener coalesces per-result hook notifications into a periodic flush so the servers
// file isn't rewritten for each parallel URL test completion. A final flush runs on shutdown so any
// results that arrived since the last tick are persisted.
func (r *LocalBackend) runURLTestListener(ctx context.Context, storage vpn.URLTestHistoryStorage, hook <-chan struct{}) {
ticker := time.NewTicker(urlTestFlushInterval)
// runSelectionHistoryListener coalesces per-result hook notifications into a periodic flush so the
// servers file isn't rewritten for each parallel probe completion. A final flush runs on shutdown so
// any results that arrived since the last tick are persisted.
func (r *LocalBackend) runSelectionHistoryListener(ctx context.Context, storage vpn.AutoSelectHistoryStorage, hook <-chan struct{}) {
ticker := time.NewTicker(selectionHistoryFlushInterval)
defer ticker.Stop()
dirty := true // start dirty so we persist any results that arrived before the listener started
for {
select {
case <-ctx.Done():
if dirty {
r.flushURLTestResults(storage)
r.flushSelectionHistory(storage)
}
return
case <-hook:
dirty = true
case <-ticker.C:
if dirty {
r.flushURLTestResults(storage)
r.flushSelectionHistory(storage)
dirty = false
}
}
}
}

func (r *LocalBackend) flushURLTestResults(storage vpn.URLTestHistoryStorage) {
results := make(map[string]servers.URLTestResult)
func (r *LocalBackend) flushSelectionHistory(storage vpn.AutoSelectHistoryStorage) {
results := make(map[string]servers.SelectionHistory)
for _, srv := range r.srvManager.AllServers() {
if h := storage.LoadURLTestHistory(srv.Tag); h != nil {
results[srv.Tag] = servers.URLTestResult{Delay: h.Delay, Time: h.Time}
if h := storage.Load(srv.Tag); h != nil {
results[srv.Tag] = *h
}
}
if len(results) > 0 {
if err := r.srvManager.UpdateURLTestResults(results); err != nil {
slog.Warn("Failed to persist URL test results", "error", err)
if err := r.srvManager.UpdateSelectionHistory(results); err != nil {
slog.Warn("Failed to persist selection history", "error", err)
}
}
}
Expand Down Expand Up @@ -751,15 +757,14 @@ func (r *LocalBackend) getBoxOptions() vpn.BoxOptions {
if cfg != nil {
bOptions.Options = cfg.Options
bOptions.BanditURLOverrides = cfg.BanditURLOverrides
bOptions.BanditThroughputURL = cfg.BanditThroughputURL
if settings.GetBool(settings.SmartRoutingKey) {
bOptions.SmartRouting = cfg.SmartRouting
}
if settings.GetBool(settings.AdBlockKey) {
bOptions.AdBlock = cfg.AdBlock
}
}
seed := make(map[string]adapter.URLTestHistory)
seed := make(map[string]lbA.TagHistory)
for _, srv := range r.srvManager.AllServers() {
if !srv.IsLantern {
switch opts := srv.Options.(type) {
Expand All @@ -769,15 +774,12 @@ func (r *LocalBackend) getBoxOptions() vpn.BoxOptions {
bOptions.Options.Endpoints = append(bOptions.Options.Endpoints, opts)
}
}
if srv.URLTestResult != nil {
seed[srv.Tag] = adapter.URLTestHistory{
Time: srv.URLTestResult.Time,
Delay: srv.URLTestResult.Delay,
}
if srv.SelectionHistory != nil {
seed[srv.Tag] = *srv.SelectionHistory
}
}
if len(seed) > 0 {
bOptions.URLTestSeed = seed
bOptions.SelectionHistorySeed = seed
}
return bOptions
}
Expand Down Expand Up @@ -951,13 +953,21 @@ func (r *LocalBackend) RunOfflineURLTests() error {
return err
}
now := time.Now()
urlResults := make(map[string]servers.URLTestResult, len(results))
histories := make(map[string]servers.SelectionHistory, len(results))
for tag, delay := range results {
urlResults[tag] = servers.URLTestResult{Delay: delay, Time: now}
// Offline pre-warm produces only a single success delay per tag;
// shape it as a probe-success snapshot so the production tunnel's
// AutoSelectHistoryStorage can seed cold-start ranking from it
// without further translation.
histories[tag] = lbA.TagHistory{
LastSuccessDelayMs: uint32(delay),
LastOutcomeAt: now,
UpdatedAt: now,
}
}
if len(urlResults) > 0 {
if err := r.srvManager.UpdateURLTestResults(urlResults); err != nil {
slog.Warn("Failed to persist offline URL test results", "error", err)
if len(histories) > 0 {
if err := r.srvManager.UpdateSelectionHistory(histories); err != nil {
slog.Warn("Failed to persist offline selection history", "error", err)
}
selected, err := r.vpnClient.CurrentAutoSelectedServer()
if err != nil {
Expand All @@ -969,6 +979,38 @@ func (r *LocalBackend) RunOfflineURLTests() error {
return nil
}

// defaultExhaustionRefetchGap rate-limits exhaustion-driven refetches
// so a misbehaving config handler can't hammer /config-new.
var defaultExhaustionRefetchGap = time.Minute

// exhaustionGate rate-limits exhaustion-driven refetches. lastAt is
// recorded before the fetch runs so a failing fetcher doesn't
// tight-loop.
type exhaustionGate struct {
mu sync.Mutex
lastAt time.Time
}

func (g *exhaustionGate) allow() bool {
g.mu.Lock()
defer g.mu.Unlock()
now := time.Now()
if !g.lastAt.IsZero() && now.Sub(g.lastAt) < defaultExhaustionRefetchGap {
return false
}
g.lastAt = now
return true
}

func (r *LocalBackend) refetchOnExhaustion() {
if !r.exhaustionGate.allow() {
return
}
if err := r.confHandler.Fetch(); err != nil {
slog.Warn("MutableAutoSelect exhaustion refetch failed", "error", err)
}
}

//////////////////
// Split Tunnel //
/////////////////
Expand Down
8 changes: 0 additions & 8 deletions backend/radiance_test.go

This file was deleted.

Loading
Loading