Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions p2p/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,11 @@ func ping_loop() {
return
}
c.update(&response.Common) // update common information
latency := atomic.LoadInt64(&c.Latency)
topoH := atomic.LoadInt64(&c.TopoHeight)
if latency > 0 {
Peer_UpdateLatency(Address(c), latency, topoH)
}
}()
}
return true
Expand Down Expand Up @@ -666,10 +671,16 @@ func trigger_sync() {
clist = append(clist, value)
}

// sort the list random
// do random shuffling, can we get away with len/2 random shuffling
globals.Global_Random.Shuffle(len(clist), func(i, j int) {
clist[i], clist[j] = clist[j], clist[i]
// sort by height descending (furthest ahead first), then latency ascending (fastest among equal)
sort.SliceStable(clist, func(i, j int) bool {
hi := atomic.LoadInt64(&clist[i].Height)
hj := atomic.LoadInt64(&clist[j].Height)
if hi != hj {
return hi > hj
}
li := atomic.LoadInt64(&clist[i].Latency)
lj := atomic.LoadInt64(&clist[j].Latency)
return li < lj
})
Comment on lines +674 to 684
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Design note (non-blocking): the old code randomly shuffled sync partners; this deterministically sorts by height-desc → latency-asc and breaks on the first lagging peer. Net effect: many nodes will preferentially pull from the same highest+fastest peer, concentrating sync load on the best-connected nodes. It's bounded (one sync at a time per node) and is arguably the intended win, but it's a real behavioral shift from "spread load randomly." Flagging for a conscious sign-off — no change requested.


for _, connection := range clist {
Expand Down
65 changes: 54 additions & 11 deletions p2p/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,17 +427,60 @@ func maintain_seed_node_connection() {
return
case <-delay.C:
}
endpoint := ""
if globals.IsMainnet() { // choose mainnet seed node
r, _ := rand.Int(rand.Reader, big.NewInt(10240))
endpoint = config.Mainnet_seed_nodes[r.Int64()%int64(len(config.Mainnet_seed_nodes))]
} else { // choose testnet peer node
r, _ := rand.Int(rand.Reader, big.NewInt(10240))
endpoint = config.Testnet_seed_nodes[r.Int64()%int64(len(config.Testnet_seed_nodes))]
}
if endpoint != "" {
connect_with_endpoint(endpoint, sync_node)
//connect_with_endpoint(endpoint, true) // seed nodes always have sync mode

var seeds []string
if globals.IsMainnet() {
seeds = config.Mainnet_seed_nodes
} else {
seeds = config.Testnet_seed_nodes
}

// split seeds into known (have recent latency data) and unknown
now := uint64(time.Now().UTC().Unix())
type scoredSeed struct {
addr string
lat int64
}
var known []scoredSeed
var unknown []string

peer_mutex.Lock()
for _, s := range seeds {
if p, ok := peer_map[ParseIPNoError(s)]; ok &&
p.LastMeasured > 0 &&
p.LastLatency > 0 &&
(now-p.LastMeasured) < 24*3600 {

known = append(known, scoredSeed{s, p.LastLatency})
} else {
unknown = append(unknown, s)
}
}
peer_mutex.Unlock()

// known seeds: fastest first
sort.Slice(known, func(i, j int) bool {
return known[i].lat < known[j].lat
})

// unknown seeds: shuffle for variety
globals.Global_Random.Shuffle(len(unknown), func(i, j int) {
unknown[i], unknown[j] = unknown[j], unknown[i]
})

// build ordered list: known (fast first) + unknown (random)
var ordered []string
for _, s := range known {
ordered = append(ordered, s.addr)
}
ordered = append(ordered, unknown...)

// connect to first non-connected seed
for _, endpoint := range ordered {
if !IsAddressConnected(ParseIPNoError(endpoint)) {
connect_with_endpoint(endpoint, sync_node)
break
}
}
}
}
Expand Down
137 changes: 116 additions & 21 deletions p2p/peer_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ type Peer struct {
ConnectAfter uint64 `json:"connectafter"` // we should connect when the following timestamp passes
BlacklistBefore uint64 `json:"blacklistbefore"` // peer blacklisted till epoch , priority nodes are never blacklisted, 0 if not blacklist
GoodCount uint64 `json:"goodcount"` // how many times peer has been shared with us
SuccessCount uint64 `json:"successcount"` // outbound connection successes (for scoring)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc nit: documented as "outbound connection successes", but since dispatch_test_handshake runs for both incoming (controller.go:589) and outgoing (controller.go:741) connections, Peer_SetSuccess now increments on inbound handshakes too. That matches the "both sides whitelist each other" intent — just worth syncing the comment so the next reader isn't misled.

Suggested change
SuccessCount uint64 `json:"successcount"` // outbound connection successes (for scoring)
SuccessCount uint64 `json:"successcount"` // successful handshakes, in or out (for scoring)

LastLatency int64 `json:"lastlatency"` // nanoseconds, from rtt_micro
LastTopoHeight int64 `json:"lasttopoheight"` // peer's topo height at measurement
LastMeasured uint64 `json:"lastmeasured"` // epoch seconds when latency captured
Version int `json:"version"` // version 1 is original C daemon peer, version 2 is golang p2p version
Whitelist bool `json:"whitelist"`
sync.Mutex
Expand Down Expand Up @@ -202,12 +206,42 @@ func Peer_SetSuccess(address string) {
}
peer_mutex.Lock()
defer peer_mutex.Unlock()
p.FailCount = 0 // fail count is zero again
p.ConnectAfter = 0
p.Whitelist = true
p.LastConnected = uint64(time.Now().UTC().Unix()) // set time when last connected
p.FailCount = 0 // fail count is zero again
p.ConnectAfter = 0
p.Whitelist = true
p.LastConnected = uint64(time.Now().UTC().Unix()) // set time when last connected
p.SuccessCount++

// logger.Infof("Setting peer as white listed")
// logger.Infof("Setting peer as white listed")
Comment on lines +209 to +215
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gofmt: this block is over-indented by one tab level. gofmt -l also flags p2p/peer_pool_test.go (struct field alignment). A single gofmt -w p2p/ cleans both. (Heads-up: controller.go also shows a gofmt diff, but that's a pre-existing misalignment in P2P_Shutdown you didn't touch — ignore it.)

Suggested change
p.FailCount = 0 // fail count is zero again
p.ConnectAfter = 0
p.Whitelist = true
p.LastConnected = uint64(time.Now().UTC().Unix()) // set time when last connected
p.SuccessCount++
// logger.Infof("Setting peer as white listed")
// logger.Infof("Setting peer as white listed")
p.FailCount = 0 // fail count is zero again
p.ConnectAfter = 0
p.Whitelist = true
p.LastConnected = uint64(time.Now().UTC().Unix()) // set time when last connected
p.SuccessCount++
// logger.Infof("Setting peer as white listed")

}

// captures live latency/topoheight from the ping path, not from handshake
// call after c.update(&response.Common) in ping_loop when Latency > 0
func Peer_UpdateLatency(address string, latencyNs int64, topoHeight int64) {
p := GetPeerInList(ParseIPNoError(address))
if p == nil {
return
}
peer_mutex.Lock()
defer peer_mutex.Unlock()
p.LastLatency = latencyNs
p.LastTopoHeight = topoHeight
p.LastMeasured = uint64(time.Now().UTC().Unix())
}

// computes a score for a peer: success/fail history + latency bonus
// latency bonus decays to zero after 24 hours
func peerScore(p *Peer, now uint64) float64 {
score := float64(p.SuccessCount*10) - float64(p.FailCount*50)

if p.LastMeasured > 0 && p.LastLatency > 0 {
age := now - p.LastMeasured
if age < 24*3600 {
latencyMs := float64(p.LastLatency) / 1e6 // ns → ms
score += 10000.0 / (latencyMs + 1.0) // +1 avoids div-by-zero
}
}
return score
}

/*
Expand Down Expand Up @@ -240,32 +274,64 @@ func Peer_Delete(p *Peer) {
delete(peer_map, ParseIPNoError(p.Address))
}

func formatAge(lastMeasured uint64) string {
if lastMeasured == 0 {
return "-"
}
age := time.Now().UTC().Unix() - int64(lastMeasured)
if age < 0 {
return "now"
}
switch {
case age < 60:
return fmt.Sprintf("%ds ago", age)
case age < 3600:
return fmt.Sprintf("%dm ago", age/60)
case age < 86400:
return fmt.Sprintf("%dh ago", age/3600)
default:
return fmt.Sprintf("%dd ago", age/86400)
}
}

func printLatency(p *Peer) string {
if p.LastLatency <= 0 {
return "-"
}
ms := float64(p.LastLatency) / 1e6
return fmt.Sprintf("%.1fms", ms)
}

// prints all the connection info to screen
func PeerList_Print() {
peer_mutex.Lock()
defer peer_mutex.Unlock()
fmt.Printf("Peer List\n")
fmt.Printf("%-22s %-6s %-4s %-5s %-7s %9s %3s\n", "Remote Addr", "Active", "Good", "Fail", " State", "Height", "DIR")
fmt.Printf("%-22s %-6s %4s %5s %4s %8s %8s\n", "Remote Addr", "Active", "Good", "Fail", "Succ", "Lat(ms)", "Age")

var list []*Peer
greycount := 0
for _, v := range peer_map {
if v.Whitelist { // only display white listed peer
if v.Whitelist {
list = append(list, v)
} else {
greycount++
}
}

// sort the list
sort.Slice(list, func(i, j int) bool { return list[i].Address < list[j].Address })

for i := range list {
connected := ""
if IsAddressConnected(ParseIPNoError(list[i].Address)) {
connected = "ACTIVE"
}
fmt.Printf("%-22s %-6s %4d %5d \n", list[i].Address, connected, list[i].GoodCount, list[i].FailCount)
fmt.Printf("%-22s %-6s %4d %5d %4d %8s %8s\n",
list[i].Address, connected,
list[i].GoodCount, list[i].FailCount,
list[i].SuccessCount,
printLatency(list[i]),
formatAge(list[i].LastMeasured))
}

fmt.Printf("\nWhitelist size %d\n", len(peer_map)-greycount)
Expand All @@ -289,24 +355,53 @@ func find_peer_to_connect(version int) *Peer {
peer_mutex.Lock()
defer peer_mutex.Unlock()

// first search the whitelisted ones
now := uint64(time.Now().UTC().Unix())

// Pass 1: weighted random among eligible whitelist peers (reservoir sampling)
var best *Peer
var totalWeight float64
for _, v := range peer_map {
if uint64(time.Now().Unix()) > v.BlacklistBefore && // if ip is blacklisted skip it
uint64(time.Now().Unix()) > v.ConnectAfter &&
!IsAddressConnected(ParseIPNoError(v.Address)) && v.Whitelist && !IsAddressInBanList(ParseIPNoError(v.Address)) {
v.ConnectAfter = uint64(time.Now().UTC().Unix()) + 10 // minimum 10 secs gap
return v
if now > v.BlacklistBefore &&
now > v.ConnectAfter &&
!IsAddressConnected(ParseIPNoError(v.Address)) &&
v.Whitelist &&
!IsAddressInBanList(ParseIPNoError(v.Address)) {

w := peerScore(v, now)
if w < 1 {
w = 1 // minimum weight 1 so all eligible peers have a chance
}
totalWeight += w
if globals.Global_Random.Float64()*totalWeight < w {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pre-existing concurrency note (non-blocking): globals.Global_Random is a single *math/rand.Rand, which isn't safe for concurrent use. These Float64() calls are fine — they run under peer_mutex. But trigger_sync and the seed-node code call Global_Random.Shuffle without that lock, concurrently. This race already existed before this PR (the old trigger_sync/seed code used .Shuffle the same way); you're adding call sites but no new class of bug. The crypto/rand-backed source makes real harm unlikely, and the -race suite won't catch it since the tests don't exercise concurrent Global_Random access. Worth a maintainer ticket someday, not this PR's job.

best = v
}
}
}
// if we donot have any white listed, choose from the greylist
if best != nil {
best.ConnectAfter = now + 10 // minimum 10 secs gap
return best
}

// Pass 2: uniform random among eligible greylist peers (no latency data)
var greyBest *Peer
var greyCount float64
for _, v := range peer_map {
if uint64(time.Now().Unix()) > v.BlacklistBefore && // if ip is blacklisted skip it
uint64(time.Now().Unix()) > v.ConnectAfter &&
!IsAddressConnected(ParseIPNoError(v.Address)) && !v.Whitelist && !IsAddressInBanList(ParseIPNoError(v.Address)) {
v.ConnectAfter = uint64(time.Now().UTC().Unix()) + 10 // minimum 10 secs gap
return v
if now > v.BlacklistBefore &&
now > v.ConnectAfter &&
!IsAddressConnected(ParseIPNoError(v.Address)) &&
!v.Whitelist &&
!IsAddressInBanList(ParseIPNoError(v.Address)) {

greyCount++
if globals.Global_Random.Float64()*greyCount < 1 {
greyBest = v
}
}
}
if greyBest != nil {
greyBest.ConnectAfter = now + 10 // minimum 10 secs gap
return greyBest
}

return nil // if no peer found, return nil
}
Expand Down
Loading