From 9e6d28f8467eaca9a27912d82675a2c09e26012b Mon Sep 17 00:00:00 2001 From: Juliane Holzt Date: Thu, 28 May 2026 00:05:04 +0200 Subject: [PATCH] Correctly handle multi-packet PAT The old code did not handle PAT which do not fit into single packet correctly. This adds a PAT accumulator and fixes ReadPAT to use that. A test using a multi packet PAT (real-world data) and tests for the accumulator were added. --- psi/pat.go | 63 ++++++++++++++++--------- psi/pat_test.go | 120 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 162 insertions(+), 21 deletions(-) diff --git a/psi/pat.go b/psi/pat.go index 83e99cb..ab3d252 100644 --- a/psi/pat.go +++ b/psi/pat.go @@ -122,35 +122,56 @@ func (pat pat) SPTSpmtPID() (int, error) { return 0, errors.New("No programs in transport stream") } -// ReadPAT extracts a PAT from a reader of a TS stream. It will read until a -// PAT packet is found or EOF is reached. -// It returns a new PAT object parsed from the packet, if found, and otherwise -// returns an error. +// PatAccumulatorDoneFunc is a doneFunc that can be used for packet accumulation +// to create a PAT. It returns true once the accumulated bytes contain the +// complete PAT section(s). +func PatAccumulatorDoneFunc(b []byte) (bool, error) { + if len(b) < 1 { + return false, nil + } + + start := 1 + int(PointerField(b)) + if len(b) < start { + return false, nil + } + + sectionBytes := b[start:] + for len(sectionBytes) > 2 && sectionBytes[0] != 0xFF { + tableLength := sectionLength(sectionBytes) + if len(sectionBytes) < int(tableLength)+3 { + return false, nil + } + sectionBytes = sectionBytes[3+tableLength:] + } + + return true, nil +} + +// ReadPAT extracts a PAT from a reader of a TS stream. It will read until PAT +// packet(s) are found or EOF is reached. +// It returns a new PAT object parsed from the packet(s), if found, and +// otherwise returns an error. func ReadPAT(r io.Reader) (PAT, error) { var pkt packet.Packet - var pat PAT - for pat == nil { + + patAcc := packet.NewAccumulator(PatAccumulatorDoneFunc) + + for { if _, err := io.ReadFull(r, pkt[:]); err != nil { if err == io.EOF || err == io.ErrUnexpectedEOF { - break + return nil, gots.ErrPATNotFound } return nil, err } - isPat := packet.IsPat(&pkt) + if !packet.IsPat(&pkt) { + continue + } - if isPat { - pay, err := packet.Payload(&pkt) - if err != nil { - return nil, err - } - cp := make([]byte, len(pay)) - copy(cp, pay) - pat, err := NewPAT(cp) - if err != nil { - return nil, err - } - return pat, nil + _, err := patAcc.WritePacket(&pkt) + if err == gots.ErrAccumulatorDone { + return NewPAT(patAcc.Bytes()) + } else if err != nil { + return nil, err } } - return nil, gots.ErrPATNotFound } diff --git a/psi/pat_test.go b/psi/pat_test.go index cdc033c..d5efb19 100644 --- a/psi/pat_test.go +++ b/psi/pat_test.go @@ -29,6 +29,9 @@ import ( "encoding/hex" "reflect" "testing" + + "github.com/Comcast/gots/v3" + "github.com/Comcast/gots/v3/packet" ) var testData = []struct { @@ -136,3 +139,120 @@ func TestReadPATIncomplete(t *testing.T) { t.Errorf("Expected to get error reading PAT, but did not") } } + +func TestReadPATMultiplePackets(t *testing.T) { + bs, _ := hex.DecodeString( + "474000180000b0d9040fcb00000000e0102887f5182888f5222889f52c288af5" + + "36288bf5402896f5722897f57328a0e06428a1e06e28a2e07828a3e08228a4e0" + + "8c28a5e09628a6e0a028a8e0b428ace0c828ade0d228aee0dc28afe0e628b0e0" + + "f028b1e0fa28b2e10428b3e10e28b4e11828b5e12228b6e12728bae12c28bbe1" + + "3628bce14028bde14a28c0e19028c1e19a28c2e1a428c8e1f428c9e1fe28cae2" + + "0828cbe21228cce21c28cde22628cee23028cfe23a28d0e24428d3e247000019" + + "5828d4e26228d5e26c28d6e27628d7e28028d8e28a28d9e29428dae29e28dbe2" + + "a854869793ffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "ffffffffffffffffffffffffffffffffffffffffffffffff") + r := bytes.NewReader(bs) + pat, err := ReadPAT(r) + if err != nil { + t.Errorf("Unexpected error reading PAT: %v", err) + } + wantMap := map[int]int{ + 10375: 5400, 10376: 5410, 10377: 5420, 10378: 5430, 10379: 5440, + 10390: 5490, 10391: 5491, 10400: 100, 10401: 110, 10402: 120, + 10403: 130, 10404: 140, 10405: 150, 10406: 160, 10408: 180, + 10412: 200, 10413: 210, 10414: 220, 10415: 230, 10416: 240, + 10417: 250, 10418: 260, 10419: 270, 10420: 280, 10421: 290, + 10422: 295, 10426: 300, 10427: 310, 10428: 320, 10429: 330, + 10432: 400, 10433: 410, 10434: 420, 10440: 500, 10441: 510, + 10442: 520, 10443: 530, 10444: 540, 10445: 550, 10446: 560, + 10447: 570, 10448: 580, 10451: 600, 10452: 610, 10453: 620, + 10454: 630, 10455: 640, 10456: 650, 10457: 660, 10458: 670, + 10459: 680, + } + gotMap := pat.ProgramMap() + if !reflect.DeepEqual(wantMap, gotMap) { + t.Errorf("PAT read is invalid, did not have expected program map") + } +} + +func TestBuildPAT(t *testing.T) { + pkt := parseHexString("474000100000b00d0001c100000001e256f803e71bffffff" + + "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "ffffffffffffffffffffffffffffffff") + acc := packet.NewAccumulator(PatAccumulatorDoneFunc) + + _, err := acc.WritePacket(pkt) + if err != gots.ErrAccumulatorDone { + t.Errorf("Single packet PAT expected. This means your doneFunc is probably bad.") + } + + pat, err := NewPAT(acc.Bytes()) + if err != nil { + t.Error(err) + } + + wantMap := map[int]int{1: 598} + if !reflect.DeepEqual(wantMap, pat.ProgramMap()) { + t.Errorf("PAT program map does not match want:%v got:%v", wantMap, pat.ProgramMap()) + } +} + +func TestBuildMultiPacketPAT(t *testing.T) { + firstPacket := parseHexString("474000180000b0d9040fcb00000000e0102887f5182888f5222889f52c288af5" + + "36288bf5402896f5722897f57328a0e06428a1e06e28a2e07828a3e08228a4e0" + + "8c28a5e09628a6e0a028a8e0b428ace0c828ade0d228aee0dc28afe0e628b0e0" + + "f028b1e0fa28b2e10428b3e10e28b4e11828b5e12228b6e12728bae12c28bbe1" + + "3628bce14028bde14a28c0e19028c1e19a28c2e1a428c8e1f428c9e1fe28cae2" + + "0828cbe21228cce21c28cde22628cee23028cfe23a28d0e24428d3e2") + secondPacket := parseHexString("470000195828d4e26228d5e26c28d6e27628d7e28028d8e28a28d9e29428dae2" + + "9e28dbe2a854869793ffffffffffffffffffffffffffffffffffffffffffffff" + + "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "ffffffffffffffffffffffffffffffffffffffffffffffffffffffff") + + acc := packet.NewAccumulator(PatAccumulatorDoneFunc) + + _, err := acc.WritePacket(firstPacket) + if err == gots.ErrAccumulatorDone { + t.Error("Added first packet of multi-packet PAT and already indicating done. That's not right.") + } + + _, err = acc.WritePacket(secondPacket) + if err != gots.ErrAccumulatorDone { + t.Error("Added second and final packet of multi-packet PAT but indicating not done. That's not right.") + } + + pat, err := NewPAT(acc.Bytes()) + if err != nil { + t.Error(err) + } + + if got, want := pat.NumPrograms(), 52; got != want { + t.Errorf("Wrong number of programs, want %d got %d", want, got) + } +} + +func TestBuildPAT_ExpectsAnotherPacket(t *testing.T) { + // First packet of a multi-packet PAT on its own: the section_length spans + // into the next packet, so the doneFunc must not yet report done. + pkt := parseHexString("474000180000b0d9040fcb00000000e0102887f5182888f5222889f52c288af5" + + "36288bf5402896f5722897f57328a0e06428a1e06e28a2e07828a3e08228a4e0" + + "8c28a5e09628a6e0a028a8e0b428ace0c828ade0d228aee0dc28afe0e628b0e0" + + "f028b1e0fa28b2e10428b3e10e28b4e11828b5e12228b6e12728bae12c28bbe1" + + "3628bce14028bde14a28c0e19028c1e19a28c2e1a428c8e1f428c9e1fe28cae2" + + "0828cbe21228cce21c28cde22628cee23028cfe23a28d0e24428d3e2") + + acc := packet.NewAccumulator(PatAccumulatorDoneFunc) + _, err := acc.WritePacket(pkt) + if err == gots.ErrAccumulatorDone { + t.Errorf("Expected not done because not enough packets are present to create the PAT") + } +}