Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 31 additions & 10 deletions conn/bind_std.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,10 @@ again:

func (s *StdNetBind) putMessages(msgs *[]ipv6.Message) {
for i := range *msgs {
(*msgs)[i] = ipv6.Message{Buffers: (*msgs)[i].Buffers, OOB: (*msgs)[i].OOB}
// Non coalesced write paths access only batch.msgs[i].Buffers[0],
// but we append more during [coalesceMessages].
// Leave index zero accessible:
(*msgs)[i] = ipv6.Message{Buffers: (*msgs)[i].Buffers[:1], OOB: (*msgs)[i].OOB}
}
s.msgsPool.Put(msgs)
}
Expand Down Expand Up @@ -445,17 +448,34 @@ const (
maxIPv6PayloadLen = 1<<16 - 1 - 8

// This is a hard limit imposed by the kernel.
// As long as we use one fragment per datagram, this also serves as a
// limit for the number of fragments we can coalesce during scatter-gather writes.
//
// 64 is below the 1024 of IOV_MAX (Linux) or UIO_MAXIOV (BSD),
// and the 256 of WSABUF_MAX_COUNT (Windows).
//
// (2026-04) If we begin shipping datagrams in more than one fragment,
// an independent fragment count limit needs to be implemented.
udpSegmentMaxDatagrams = 64
)

type setGSOFunc func(control *[]byte, gsoSize uint16)

// coalesceMessages iterates 'buffs', setting and coalescing them in 'msgs'
// where possible while maintaining datagram order.
//
// It aggregates message components as a list of buffers without copying,
// and expects to be used only on Linux with scatter-gather writes via sendmmsg(2).
//
// All msgs[i].Buffers len must be one. Will panic if there is not enough msgs
// to coalesce all buffs.
func coalesceMessages(addr *net.UDPAddr, ep *StdNetEndpoint, bufs [][]byte, offset int, msgs []ipv6.Message, setGSO setGSOFunc) int {
var (
base = -1 // index of msg we are currently coalescing into
gsoSize int // segmentation size of msgs[base]
dgramCnt int // number of dgrams coalesced into msgs[base]
endBatch bool // tracking flag to start a new batch on next iteration of bufs
base = -1 // index of msg we are currently coalescing into
gsoSize int // segmentation size of msgs[base]
dgramCnt int // number of dgrams coalesced into msgs[base]
endBatch bool // tracking flag to start a new batch on next iteration of bufs
coalescedLen int // bytes coalesced into msgs[base]
)
maxPayloadLen := maxIPv4PayloadLen
if ep.DstIP().Is6() {
Expand All @@ -465,18 +485,18 @@ func coalesceMessages(addr *net.UDPAddr, ep *StdNetEndpoint, bufs [][]byte, offs
buf = buf[offset:]
if i > 0 {
msgLen := len(buf)
baseLenBefore := len(msgs[base].Buffers[0])
freeBaseCap := cap(msgs[base].Buffers[0]) - baseLenBefore
if msgLen+baseLenBefore <= maxPayloadLen &&
if msgLen+coalescedLen <= maxPayloadLen &&
msgLen <= gsoSize &&
msgLen <= freeBaseCap &&
dgramCnt < udpSegmentMaxDatagrams &&
!endBatch {
msgs[base].Buffers[0] = append(msgs[base].Buffers[0], buf...)
// msgs[base].Buffers[0] is set to buf[i] when a new base is set.
// This appends a struct iovec element in the underlying struct msghdr (scatter-gather).
msgs[base].Buffers = append(msgs[base].Buffers, buf)
if i == len(bufs)-1 {
setGSO(&msgs[base].OOB, uint16(gsoSize))
}
dgramCnt++
coalescedLen += msgLen
if msgLen < gsoSize {
// A smaller than gsoSize packet on the tail is legal, but
// it must end the batch.
Expand All @@ -497,6 +517,7 @@ func coalesceMessages(addr *net.UDPAddr, ep *StdNetEndpoint, bufs [][]byte, offs
msgs[base].Buffers[0] = buf
msgs[base].Addr = addr
dgramCnt = 1
coalescedLen = len(buf)
}
return base + 1
}
Expand Down
31 changes: 19 additions & 12 deletions conn/bind_std_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,19 @@ func mockSetGSOSize(control *[]byte, gsoSize uint16) {

func Test_coalesceMessages(t *testing.T) {
cases := []struct {
name string
buffs [][]byte
wantLens []int
name string
buffs [][]byte
// Each wantLens slice corresponds to the Buffers of a single coalesced message,
// and each int is the expected length of the corresponding Buffer[i].
wantLens [][]int
wantGSO []int
}{
{
name: "one message no coalesce",
buffs: [][]byte{
make([]byte, 1, 1),
},
wantLens: []int{1},
wantLens: [][]int{{1}},
wantGSO: []int{0},
},
{
Expand All @@ -53,7 +55,7 @@ func Test_coalesceMessages(t *testing.T) {
make([]byte, 1, 2),
make([]byte, 1, 1),
},
wantLens: []int{2},
wantLens: [][]int{{1, 1}},
wantGSO: []int{1},
},
{
Expand All @@ -62,7 +64,7 @@ func Test_coalesceMessages(t *testing.T) {
make([]byte, 2, 3),
make([]byte, 1, 1),
},
wantLens: []int{3},
wantLens: [][]int{{2, 1}},
wantGSO: []int{2},
},
{
Expand All @@ -72,7 +74,7 @@ func Test_coalesceMessages(t *testing.T) {
make([]byte, 1, 1),
make([]byte, 2, 2),
},
wantLens: []int{3, 2},
wantLens: [][]int{{2, 1}, {2}},
wantGSO: []int{2, 0},
},
{
Expand All @@ -82,8 +84,8 @@ func Test_coalesceMessages(t *testing.T) {
make([]byte, 2, 2),
make([]byte, 2, 2),
},
wantLens: []int{4, 2},
wantGSO: []int{2, 0},
wantLens: [][]int{{2, 2, 2}},
wantGSO: []int{2},
},
}

Expand All @@ -106,9 +108,14 @@ func Test_coalesceMessages(t *testing.T) {
if msgs[i].Addr != addr {
t.Errorf("msgs[%d].Addr != passed addr", i)
}
gotLen := len(msgs[i].Buffers[0])
if gotLen != tt.wantLens[i] {
t.Errorf("len(msgs[%d].Buffers[0]) %d != %d", i, gotLen, tt.wantLens[i])
if len(msgs[i].Buffers) != len(tt.wantLens[i]) {
t.Fatalf("len(msgs[%d].Buffers) %d != %d", i, len(msgs[i].Buffers), len(tt.wantLens[i]))
}
for j := range tt.wantLens[i] {
gotLen := len(msgs[i].Buffers[j])
if gotLen != tt.wantLens[i][j] {
t.Errorf("len(msgs[%d].Buffers[%d]) %d != %d", i, j, gotLen, tt.wantLens[i][j])
}
}
gotGSO, err := mockGetGSOSize(msgs[i].OOB)
if err != nil {
Expand Down
Loading