diff --git a/congestion/congestion.go b/congestion/congestion.go index 10787ea..8746ded 100644 --- a/congestion/congestion.go +++ b/congestion/congestion.go @@ -53,6 +53,9 @@ type Receiver interface { // SetNAKInterval sets the interval between two periodic NAK messages to the sender in microseconds. SetNAKInterval(nakInterval uint64) + + // ReorderTolerance returns the current dynamic reorder tolerance value. + ReorderTolerance() int } // SendStats are collected statistics from a sender @@ -121,4 +124,7 @@ type ReceiveStats struct { MbpsEstimatedLinkCapacity float64 PktLossRate float64 + + PktReorderTolerance int + PktReorderDistance int } diff --git a/congestion/live/fake.go b/congestion/live/fake.go index c98db1d..80dcd3b 100644 --- a/congestion/live/fake.go +++ b/congestion/live/fake.go @@ -180,3 +180,5 @@ func (r *fakeLiveReceive) SetNAKInterval(nakInterval uint64) { r.periodicNAKInterval = nakInterval } + +func (r *fakeLiveReceive) ReorderTolerance() int { return 0 } diff --git a/congestion/live/receive.go b/congestion/live/receive.go index 50776d9..bbf6f7a 100644 --- a/congestion/live/receive.go +++ b/congestion/live/receive.go @@ -12,11 +12,19 @@ import ( "github.com/datarhei/gosrt/packet" ) +// freshLossEntry represents a loss range whose NAK report is deferred by a TTL. +type freshLossEntry struct { + seqLo circular.Number + seqHi circular.Number + ttl int +} + // ReceiveConfig is the configuration for the liveRecv congestion control type ReceiveConfig struct { InitialSequenceNumber circular.Number PeriodicACKInterval uint64 // microseconds PeriodicNAKInterval uint64 // microseconds + MaxReorderTolerance int OnSendACK func(seq circular.Number, light bool) OnSendNAK func(list []circular.Number) OnDeliver func(p packet.Packet) @@ -46,6 +54,14 @@ type receiver struct { statistics congestion.ReceiveStats + // Adaptive reorder tolerance state (mirrors libsrt m_iReorderTolerance) + maxReorderTolerance int + reorderTolerance int + consecOrderedDelivery int + consecEarlyDelivery int + freshLoss []freshLossEntry + traceReorderDistance int + rate struct { last uint64 // microseconds period uint64 @@ -78,6 +94,9 @@ func NewReceiver(config ReceiveConfig) congestion.Receiver { avgPayloadSize: 1456, // 5.1.2. SRT's Default LiveCC Algorithm + maxReorderTolerance: config.MaxReorderTolerance, + reorderTolerance: config.MaxReorderTolerance, + sendACK: config.OnSendACK, sendNAK: config.OnSendNAK, deliver: config.OnDeliver, @@ -109,6 +128,8 @@ func (r *receiver) Stats() congestion.ReceiveStats { r.statistics.MbpsEstimatedRecvBandwidth = r.rate.bytesPerSecond * 8 / 1024 / 1024 r.statistics.MbpsEstimatedLinkCapacity = r.avgLinkCapacity * packet.MAX_PAYLOAD_SIZE * 8 / 1024 / 1024 r.statistics.PktLossRate = r.rate.pktLossRate + r.statistics.PktReorderTolerance = r.reorderTolerance + r.statistics.PktReorderDistance = r.traceReorderDistance return r.statistics } @@ -201,11 +222,14 @@ func (r *receiver) Push(pkt packet.Packet) { return } + wasSentInOrder := true + if pkt.Header().PacketSequenceNumber.Equals(r.maxSeenSequenceNumber.Inc()) { // In order, the packet we expected r.maxSeenSequenceNumber = pkt.Header().PacketSequenceNumber } else if pkt.Header().PacketSequenceNumber.Lte(r.maxSeenSequenceNumber) { // Out of order, is it a missing piece? put it in the correct position + inserted := false for e := r.packetList.Front(); e != nil; e = e.Next() { p := e.Value.(packet.Packet) @@ -214,7 +238,7 @@ func (r *receiver) Push(pkt packet.Packet) { r.statistics.PktDrop++ r.statistics.ByteDrop += pktLen - break + return } else if p.Header().PacketSequenceNumber.Gt(pkt.Header().PacketSequenceNumber) { // Late arrival, this fills a gap r.statistics.PktBuf++ @@ -224,23 +248,67 @@ func (r *receiver) Push(pkt packet.Packet) { r.statistics.ByteUnique += pktLen r.packetList.InsertBefore(pkt, e) + inserted = true break } } + if !inserted { + return + } + + // This is a belated packet (seq <= maxSeenSequenceNumber). Apply unlose logic. + r.unlose(pkt) + + // Belated original (not retransmitted) means out-of-order delivery + if !pkt.Header().RetransmittedPacketFlag { + wasSentInOrder = false + } + + // After unlose, handle ordered delivery counter and return + r.updateOrderedDelivery(wasSentInOrder) return } else { - // Too far ahead, there are some missing sequence numbers, immediate NAK report - // here we can prevent a possibly unnecessary NAK with SRTO_LOXXMAXTTL - r.sendNAK([]circular.Number{ - r.maxSeenSequenceNumber.Inc(), - pkt.Header().PacketSequenceNumber.Dec(), - }) + // Too far ahead, there are some missing sequence numbers + lossLen := uint64(pkt.Header().PacketSequenceNumber.Distance(r.maxSeenSequenceNumber)) + r.statistics.PktLoss += lossLen + r.statistics.ByteLoss += lossLen * uint64(r.avgPayloadSize) + + // Determine initial loss TTL based on adaptive reorder tolerance support + initialLossTTL := 0 + if r.maxReorderTolerance > 0 { + initialLossTTL = r.reorderTolerance + } - len := uint64(pkt.Header().PacketSequenceNumber.Distance(r.maxSeenSequenceNumber)) - r.statistics.PktLoss += len - r.statistics.ByteLoss += len * uint64(r.avgPayloadSize) + if initialLossTTL > 0 { + // Defer NAK: add to freshLoss with current tolerance as TTL + r.freshLoss = append(r.freshLoss, freshLossEntry{ + seqLo: r.maxSeenSequenceNumber.Inc(), + seqHi: pkt.Header().PacketSequenceNumber.Dec(), + ttl: initialLossTTL, + }) + + // Enforce freshLoss size limit - collect overflow NAKs to send outside lock + var overflowNAKs []circular.Number + for len(r.freshLoss) > 1000 { + // Force NAK on oldest entry + oldest := r.freshLoss[0] + r.freshLoss = r.freshLoss[1:] + overflowNAKs = append(overflowNAKs, oldest.seqLo, oldest.seqHi) + } + + // Send overflow NAKs outside the critical section below + if len(overflowNAKs) > 0 { + defer r.sendNAK(overflowNAKs) + } + } else { + // Immediate NAK report + r.sendNAK([]circular.Number{ + r.maxSeenSequenceNumber.Inc(), + pkt.Header().PacketSequenceNumber.Dec(), + }) + } r.maxSeenSequenceNumber = pkt.Header().PacketSequenceNumber } @@ -252,6 +320,9 @@ func (r *receiver) Push(pkt packet.Packet) { r.statistics.ByteUnique += pktLen r.packetList.PushBack(pkt) + + // Update ordered delivery counter for in-order or retransmitted packets + r.updateOrderedDelivery(wasSentInOrder) } func (r *receiver) periodicACK(now uint64) (ok bool, sequenceNumber circular.Number, lite bool) { @@ -308,6 +379,11 @@ func (r *receiver) periodicACK(now uint64) (ok bool, sequenceNumber circular.Num ok = true sequenceNumber = ackSequenceNumber.Inc() + // If ACK advanced past gaps (TLPKTDROP), revoke skipped sequences from freshLoss + if len(r.freshLoss) > 0 && ackSequenceNumber.Gt(r.lastACKSequenceNumber) { + r.freshLossRevoke(r.lastACKSequenceNumber.Inc(), ackSequenceNumber) + } + // Keep track of the last ACK's sequence number. With this we can faster ignore // packets that come in late that have a lower sequence number. r.lastACKSequenceNumber = ackSequenceNumber @@ -321,21 +397,20 @@ func (r *receiver) periodicACK(now uint64) (ok bool, sequenceNumber circular.Num } func (r *receiver) periodicNAK(now uint64) []circular.Number { - r.lock.RLock() - defer r.lock.RUnlock() + r.lock.Lock() + defer r.lock.Unlock() if now-r.lastPeriodicNAK < r.periodicNAKInterval { return nil } - list := []circular.Number{} + nakList := []circular.Number{} // Send a periodic NAK ackSequenceNumber := r.lastACKSequenceNumber - // Send a NAK for all gaps. - // Not all gaps might get announced because the size of the NAK packet is limited. + // Send a NAK for all gaps, but skip ranges that are still pending in freshLoss. for e := r.packetList.Front(); e != nil; e = e.Next() { p := e.Value.(packet.Packet) @@ -346,10 +421,14 @@ func (r *receiver) periodicNAK(now uint64) []circular.Number { // If this packet is not in sequence, we stop here and report that gap. if !p.Header().PacketSequenceNumber.Equals(ackSequenceNumber.Inc()) { - nackSequenceNumber := ackSequenceNumber.Inc() + gapLo := ackSequenceNumber.Inc() + gapHi := p.Header().PacketSequenceNumber.Dec() - list = append(list, nackSequenceNumber) - list = append(list, p.Header().PacketSequenceNumber.Dec()) + // Skip this gap if it overlaps with any freshLoss entry + if !r.isInFreshLoss(gapLo, gapHi) { + nakList = append(nakList, gapLo) + nakList = append(nakList, gapHi) + } } ackSequenceNumber = p.Header().PacketSequenceNumber @@ -357,7 +436,7 @@ func (r *receiver) periodicNAK(now uint64) []circular.Number { r.lastPeriodicNAK = now - return list + return nakList } func (r *receiver) Tick(now uint64) { @@ -365,10 +444,13 @@ func (r *receiver) Tick(now uint64) { r.sendACK(sequenceNumber, lite) } - if list := r.periodicNAK(now); len(list) != 0 { - r.sendNAK(list) + if nakList := r.periodicNAK(now); len(nakList) != 0 { + r.sendNAK(nakList) } + // Process freshLoss TTL expiry + r.processFreshLoss() + // Deliver packets whose PktTsbpdTime is ripe r.lock.Lock() removeList := make([]*list.Element, 0, r.packetList.Len()) @@ -421,6 +503,230 @@ func (r *receiver) SetNAKInterval(nakInterval uint64) { r.periodicNAKInterval = nakInterval } +func (r *receiver) ReorderTolerance() int { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.reorderTolerance +} + +// unlose adjusts reorder tolerance for a belated packet. Mirrors libsrt CUDT::unlose(). +func (r *receiver) unlose(pkt packet.Packet) { + hasIncreasedTolerance := false + wasReordered := false + + if r.maxReorderTolerance > 0 { + // Original (not retransmitted) belated packet means reordering + wasReordered = !pkt.Header().RetransmittedPacketFlag + if wasReordered { + seqdiff := int(r.maxSeenSequenceNumber.Distance(pkt.Header().PacketSequenceNumber)) + if seqdiff > r.traceReorderDistance { + r.traceReorderDistance = seqdiff + } + if seqdiff > r.reorderTolerance { + newTolerance := seqdiff + if newTolerance > r.maxReorderTolerance { + newTolerance = r.maxReorderTolerance + } + r.reorderTolerance = newTolerance + hasIncreasedTolerance = true + } + } + } + + // Early return if adaptive reorder is not active (mirrors libsrt) + if r.maxReorderTolerance == 0 || r.reorderTolerance == 0 { + return + } + + // Remove this sequence from freshLoss + hadTTL := r.freshLossRemoveOne(pkt.Header().PacketSequenceNumber) + + if wasReordered { + r.consecOrderedDelivery = 0 + if hasIncreasedTolerance { + r.consecEarlyDelivery = 0 + } else if hadTTL > 2 { + r.consecEarlyDelivery++ + if r.consecEarlyDelivery >= 10 { + r.consecEarlyDelivery = 0 + if r.reorderTolerance > 0 { + r.reorderTolerance-- + } + } + } + } +} + +// updateOrderedDelivery tracks in-order delivery and decays tolerance after 50 consecutive. +func (r *receiver) updateOrderedDelivery(wasSentInOrder bool) { + if r.maxReorderTolerance == 0 { + return + } + + if wasSentInOrder { + r.consecOrderedDelivery++ + if r.consecOrderedDelivery >= 50 { + r.consecOrderedDelivery = 0 + if r.reorderTolerance > 0 { + r.reorderTolerance-- + } + } + } else { + r.consecOrderedDelivery = 0 + } +} + +// processFreshLoss sends NAK for expired freshLoss entries and decrements TTL of the rest. +func (r *receiver) processFreshLoss() { + r.lock.Lock() + + if len(r.freshLoss) == 0 { + r.lock.Unlock() + return + } + + var lossdata []circular.Number + + // Phase 1: collect entries with TTL <= 0 + expiredCount := 0 + for i := range r.freshLoss { + if r.freshLoss[i].ttl <= 0 { + lossdata = append(lossdata, r.freshLoss[i].seqLo, r.freshLoss[i].seqHi) + expiredCount = i + 1 + } else { + break + } + } + + // Remove expired entries + if expiredCount > 0 { + r.freshLoss = r.freshLoss[expiredCount:] + } + + // Phase 2: decrement TTL of remaining entries + for i := range r.freshLoss { + r.freshLoss[i].ttl-- + } + + r.lock.Unlock() + + // Send NAK for expired entries (outside lock to avoid deadlock with sendNAK) + if len(lossdata) > 0 { + r.sendNAK(lossdata) + } +} + +// freshLossRemoveOne removes a single sequence number from the freshLoss queue. +// Returns the TTL the entry had when removed, or 0 if not found. +func (r *receiver) freshLossRemoveOne(seq circular.Number) int { + for i := 0; i < len(r.freshLoss); i++ { + entry := &r.freshLoss[i] + + if seq.Lt(entry.seqLo) || seq.Gt(entry.seqHi) { + continue + } + + hadTTL := entry.ttl + + if entry.seqLo.Equals(entry.seqHi) { + // DELETE: single-element range + r.freshLoss = append(r.freshLoss[:i], r.freshLoss[i+1:]...) + } else if seq.Equals(entry.seqLo) { + // STRIPPED: remove from beginning + entry.seqLo = entry.seqLo.Inc() + } else if seq.Equals(entry.seqHi) { + // STRIPPED: remove from end + entry.seqHi = entry.seqHi.Dec() + } else { + // SPLIT: split into two ranges + newEntry := freshLossEntry{ + seqLo: seq.Inc(), + seqHi: entry.seqHi, + ttl: entry.ttl, + } + entry.seqHi = seq.Dec() + + // Insert newEntry after current position + r.freshLoss = append(r.freshLoss, freshLossEntry{}) + copy(r.freshLoss[i+2:], r.freshLoss[i+1:]) + r.freshLoss[i+1] = newEntry + } + + return hadTTL + } + + return 0 +} + +// freshLossRevoke removes a range [lo, hi] from the freshLoss queue. +// Used when packets are dropped (e.g., TLPKTDROP). +func (r *receiver) freshLossRevoke(lo, hi circular.Number) { + result := make([]freshLossEntry, 0, len(r.freshLoss)) + + for i := 0; i < len(r.freshLoss); i++ { + entry := r.freshLoss[i] + + // Past the revoke range: copy remaining entries and stop + if hi.Lt(entry.seqLo) { + result = append(result, r.freshLoss[i:]...) + break + } + + // Before the revoke range, keep entry as-is + if lo.Gt(entry.seqHi) { + result = append(result, entry) + continue + } + + // Full overlap: skip (delete) this entry + if lo.Lte(entry.seqLo) && hi.Gte(entry.seqHi) { + continue + } + + // Partial overlap from the left only: strip left side + if lo.Lte(entry.seqLo) { + entry.seqLo = hi.Inc() + result = append(result, entry) + continue + } + + // Partial overlap from the right only: strip right side + if hi.Gte(entry.seqHi) { + entry.seqHi = lo.Dec() + result = append(result, entry) + continue + } + + // Middle overlap: split into left [seqLo, lo-1] and right [hi+1, seqHi] + leftEntry := freshLossEntry{ + seqLo: entry.seqLo, + seqHi: lo.Dec(), + ttl: entry.ttl, + } + rightEntry := freshLossEntry{ + seqLo: hi.Inc(), + seqHi: entry.seqHi, + ttl: entry.ttl, + } + result = append(result, leftEntry, rightEntry) + } + + r.freshLoss = result +} + +// isInFreshLoss checks if a gap range [lo, hi] overlaps with any freshLoss entry. +func (r *receiver) isInFreshLoss(lo, hi circular.Number) bool { + for i := range r.freshLoss { + entry := &r.freshLoss[i] + // Overlap check: !(hi < entry.seqLo || lo > entry.seqHi) + if !hi.Lt(entry.seqLo) && !lo.Gt(entry.seqHi) { + return true + } + } + return false +} + func (r *receiver) String(t uint64) string { var b strings.Builder diff --git a/conn_request.go b/conn_request.go index 26c595b..5f1a362 100644 --- a/conn_request.go +++ b/conn_request.go @@ -192,7 +192,7 @@ func newConnRequest(ln *listener, p packet.Packet) *connRequest { } // Check the required SRT flags - if !cif.SRTHS.SRTFlags.TSBPDSND || !cif.SRTHS.SRTFlags.TSBPDRCV || !cif.SRTHS.SRTFlags.TLPKTDROP || !cif.SRTHS.SRTFlags.PERIODICNAK || !cif.SRTHS.SRTFlags.REXMITFLG { + if !cif.SRTHS.SRTFlags.TSBPDSND || !cif.SRTHS.SRTFlags.TSBPDRCV || !cif.SRTHS.SRTFlags.TLPKTDROP || !cif.SRTHS.SRTFlags.PERIODICNAK { cif.HandshakeType = packet.HandshakeType(REJ_ROGUE) ln.log("handshake:recv:error", func() string { return "not all required flags are set" }) p.MarshalCIF(cif) diff --git a/connection.go b/connection.go index 26901f0..4dfdead 100644 --- a/connection.go +++ b/connection.go @@ -308,6 +308,7 @@ func newSRTConn(config srtConnConfig) *srtConn { InitialSequenceNumber: c.initialPacketSequenceNumber, PeriodicACKInterval: 10_000, PeriodicNAKInterval: 20_000, + MaxReorderTolerance: int(c.config.LossMaxTTL), OnSendACK: c.sendACK, OnSendNAK: c.sendNAK, OnDeliver: c.deliver, @@ -917,13 +918,6 @@ func (c *srtConn) handleHSRequest(p packet.Packet) { return } - if !cif.SRTFlags.REXMITFLG { - c.log("control:recv:HSRes:error", func() string { return "REXMITFLG flag must be set" }) - c.close() - - return - } - // we as receiver don't need this cif.SRTFlags.TSBPDSND = false @@ -1012,13 +1006,6 @@ func (c *srtConn) handleHSResponse(p packet.Packet) { return } - if !cif.SRTFlags.REXMITFLG { - c.log("control:recv:HSRes:error", func() string { return "REXMITFLG flag must be set" }) - c.close() - - return - } - // These flag was introduced in HSv5 and should not be set in HSv4 if cif.SRTFlags.STREAM { c.log("control:recv:HSReq:error", func() string { return "STREAM flag is set" }) @@ -1494,7 +1481,7 @@ func (c *srtConn) Stats(s *Statistics) { MbpsSendRate: float64(s.Accumulated.ByteSent-previous.ByteSent) * 8 / 1024 / 1024 / (float64(interval) / 1000), MbpsRecvRate: float64(s.Accumulated.ByteRecv-previous.ByteRecv) * 8 / 1024 / 1024 / (float64(interval) / 1000), UsSndDuration: s.Accumulated.UsSndDuration - previous.UsSndDuration, - PktReorderDistance: 0, + PktReorderDistance: uint64(recv.PktReorderDistance), PktRecvBelated: s.Accumulated.PktRecvBelated - previous.PktRecvBelated, PktSndDrop: s.Accumulated.PktSendDrop - previous.PktSendDrop, PktRecvDrop: s.Accumulated.PktRecvDrop - previous.PktRecvDrop, @@ -1533,7 +1520,7 @@ func (c *srtConn) Stats(s *Statistics) { ByteRecvBuf: recv.ByteBuf, MsRecvBuf: recv.MsBuf, MsRecvTsbPdDelay: c.tsbpdDelay / 1000, - PktReorderTolerance: uint64(c.config.LossMaxTTL), + PktReorderTolerance: uint64(recv.PktReorderTolerance), PktRecvAvgBelatedTime: 0, PktSendLossRate: send.PktLossRate, PktRecvLossRate: recv.PktLossRate, diff --git a/dial.go b/dial.go index 09ca9d7..1a25310 100644 --- a/dial.go +++ b/dial.go @@ -446,7 +446,7 @@ func (dl *dialer) handleHandshake(p packet.Packet) { } // Check the required SRT flags - if !cif.SRTHS.SRTFlags.TSBPDSND || !cif.SRTHS.SRTFlags.TSBPDRCV || !cif.SRTHS.SRTFlags.TLPKTDROP || !cif.SRTHS.SRTFlags.PERIODICNAK || !cif.SRTHS.SRTFlags.REXMITFLG { + if !cif.SRTHS.SRTFlags.TSBPDSND || !cif.SRTHS.SRTFlags.TSBPDRCV || !cif.SRTHS.SRTFlags.TLPKTDROP || !cif.SRTHS.SRTFlags.PERIODICNAK { dl.sendShutdown(cif.SRTSocketId) dl.connChan <- connResponse{