diff --git a/conn/bind_std.go b/conn/bind_std.go index fc0563456..c13891e67 100644 --- a/conn/bind_std.go +++ b/conn/bind_std.go @@ -204,7 +204,10 @@ again: func (s *StdNetBind) putMessages(msgs *[]ipv6.Message) { for i := range *msgs { - (*msgs)[i] = ipv6.Message{Buffers: (*msgs)[i].Buffers, OOB: (*msgs)[i].OOB} + // Non coalesced write paths access only batch.msgs[i].Buffers[0], + // but we append more during [coalesceMessages]. + // Leave index zero accessible: + (*msgs)[i] = ipv6.Message{Buffers: (*msgs)[i].Buffers[:1], OOB: (*msgs)[i].OOB} } s.msgsPool.Put(msgs) } @@ -445,17 +448,34 @@ const ( maxIPv6PayloadLen = 1<<16 - 1 - 8 // This is a hard limit imposed by the kernel. + // As long as we use one fragment per datagram, this also serves as a + // limit for the number of fragments we can coalesce during scatter-gather writes. + // + // 64 is below the 1024 of IOV_MAX (Linux) or UIO_MAXIOV (BSD), + // and the 256 of WSABUF_MAX_COUNT (Windows). + // + // (2026-04) If we begin shipping datagrams in more than one fragment, + // an independent fragment count limit needs to be implemented. udpSegmentMaxDatagrams = 64 ) type setGSOFunc func(control *[]byte, gsoSize uint16) +// coalesceMessages iterates 'buffs', setting and coalescing them in 'msgs' +// where possible while maintaining datagram order. +// +// It aggregates message components as a list of buffers without copying, +// and expects to be used only on Linux with scatter-gather writes via sendmmsg(2). +// +// All msgs[i].Buffers len must be one. Will panic if there is not enough msgs +// to coalesce all buffs. func coalesceMessages(addr *net.UDPAddr, ep *StdNetEndpoint, bufs [][]byte, offset int, msgs []ipv6.Message, setGSO setGSOFunc) int { var ( - base = -1 // index of msg we are currently coalescing into - gsoSize int // segmentation size of msgs[base] - dgramCnt int // number of dgrams coalesced into msgs[base] - endBatch bool // tracking flag to start a new batch on next iteration of bufs + base = -1 // index of msg we are currently coalescing into + gsoSize int // segmentation size of msgs[base] + dgramCnt int // number of dgrams coalesced into msgs[base] + endBatch bool // tracking flag to start a new batch on next iteration of bufs + coalescedLen int // bytes coalesced into msgs[base] ) maxPayloadLen := maxIPv4PayloadLen if ep.DstIP().Is6() { @@ -465,18 +485,18 @@ func coalesceMessages(addr *net.UDPAddr, ep *StdNetEndpoint, bufs [][]byte, offs buf = buf[offset:] if i > 0 { msgLen := len(buf) - baseLenBefore := len(msgs[base].Buffers[0]) - freeBaseCap := cap(msgs[base].Buffers[0]) - baseLenBefore - if msgLen+baseLenBefore <= maxPayloadLen && + if msgLen+coalescedLen <= maxPayloadLen && msgLen <= gsoSize && - msgLen <= freeBaseCap && dgramCnt < udpSegmentMaxDatagrams && !endBatch { - msgs[base].Buffers[0] = append(msgs[base].Buffers[0], buf...) + // msgs[base].Buffers[0] is set to buf[i] when a new base is set. + // This appends a struct iovec element in the underlying struct msghdr (scatter-gather). + msgs[base].Buffers = append(msgs[base].Buffers, buf) if i == len(bufs)-1 { setGSO(&msgs[base].OOB, uint16(gsoSize)) } dgramCnt++ + coalescedLen += msgLen if msgLen < gsoSize { // A smaller than gsoSize packet on the tail is legal, but // it must end the batch. @@ -497,6 +517,7 @@ func coalesceMessages(addr *net.UDPAddr, ep *StdNetEndpoint, bufs [][]byte, offs msgs[base].Buffers[0] = buf msgs[base].Addr = addr dgramCnt = 1 + coalescedLen = len(buf) } return base + 1 } diff --git a/conn/bind_std_test.go b/conn/bind_std_test.go index 77af0d925..254952f0a 100644 --- a/conn/bind_std_test.go +++ b/conn/bind_std_test.go @@ -34,9 +34,11 @@ func mockSetGSOSize(control *[]byte, gsoSize uint16) { func Test_coalesceMessages(t *testing.T) { cases := []struct { - name string - buffs [][]byte - wantLens []int + name string + buffs [][]byte + // Each wantLens slice corresponds to the Buffers of a single coalesced message, + // and each int is the expected length of the corresponding Buffer[i]. + wantLens [][]int wantGSO []int }{ { @@ -44,7 +46,7 @@ func Test_coalesceMessages(t *testing.T) { buffs: [][]byte{ make([]byte, 1, 1), }, - wantLens: []int{1}, + wantLens: [][]int{{1}}, wantGSO: []int{0}, }, { @@ -53,7 +55,7 @@ func Test_coalesceMessages(t *testing.T) { make([]byte, 1, 2), make([]byte, 1, 1), }, - wantLens: []int{2}, + wantLens: [][]int{{1, 1}}, wantGSO: []int{1}, }, { @@ -62,7 +64,7 @@ func Test_coalesceMessages(t *testing.T) { make([]byte, 2, 3), make([]byte, 1, 1), }, - wantLens: []int{3}, + wantLens: [][]int{{2, 1}}, wantGSO: []int{2}, }, { @@ -72,7 +74,7 @@ func Test_coalesceMessages(t *testing.T) { make([]byte, 1, 1), make([]byte, 2, 2), }, - wantLens: []int{3, 2}, + wantLens: [][]int{{2, 1}, {2}}, wantGSO: []int{2, 0}, }, { @@ -82,8 +84,8 @@ func Test_coalesceMessages(t *testing.T) { make([]byte, 2, 2), make([]byte, 2, 2), }, - wantLens: []int{4, 2}, - wantGSO: []int{2, 0}, + wantLens: [][]int{{2, 2, 2}}, + wantGSO: []int{2}, }, } @@ -106,9 +108,14 @@ func Test_coalesceMessages(t *testing.T) { if msgs[i].Addr != addr { t.Errorf("msgs[%d].Addr != passed addr", i) } - gotLen := len(msgs[i].Buffers[0]) - if gotLen != tt.wantLens[i] { - t.Errorf("len(msgs[%d].Buffers[0]) %d != %d", i, gotLen, tt.wantLens[i]) + if len(msgs[i].Buffers) != len(tt.wantLens[i]) { + t.Fatalf("len(msgs[%d].Buffers) %d != %d", i, len(msgs[i].Buffers), len(tt.wantLens[i])) + } + for j := range tt.wantLens[i] { + gotLen := len(msgs[i].Buffers[j]) + if gotLen != tt.wantLens[i][j] { + t.Errorf("len(msgs[%d].Buffers[%d]) %d != %d", i, j, gotLen, tt.wantLens[i][j]) + } } gotGSO, err := mockGetGSOSize(msgs[i].OOB) if err != nil {