Skip to content
Open
169 changes: 169 additions & 0 deletions data/capture_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package data

import (
"bufio"
"encoding/binary"
"fmt"
"io"
"os"
"path/filepath"
Expand All @@ -12,11 +14,26 @@ import (
"github.com/matttproud/golang_protobuf_extensions/pbutil"
"github.com/pkg/errors"
v1 "go.viam.com/api/app/datasync/v1"
"google.golang.org/protobuf/encoding/protowire"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"

"go.viam.com/rdk/resource"
)

// ErrNoBinaryField is returned by BinaryPayloadReader when a SensorData message
// contains no binary payload field. This typically indicates a legacy camera.GetImages
// file that stores tabular data in a BINARY_SENSOR-typed capture file.
var ErrNoBinaryField = errors.New("binary payload field not found in capture file")

// ErrSensorMetadataTooLarge is returned by BinaryPayloadReader when the SensorMetadata
// field exceeds the size cap, indicating a corrupt or unexpected file.
var ErrSensorMetadataTooLarge = errors.New("SensorMetadata field exceeds size limit")

// ErrUnparsableBinaryCapture is returned by BinaryPayloadReader when the file cannot
// be streamed due to an unexpected wire format (e.g. unknown wire type).
var ErrUnparsableBinaryCapture = errors.New("capture file cannot be streamed due to unexpected wire format")

// TODO Data-343: Reorganize this into a more standard interface/package, and add tests.

const (
Expand Down Expand Up @@ -274,6 +291,158 @@ func SensorDataFromCaptureFile(f *CaptureFile) ([]*v1.SensorData, error) {
return ret, nil
}

// BinaryPayloadReader reads the next SensorData message from f without loading
// the binary payload into memory. It returns the SensorMetadata, payload size,
// and an io.Reader for streaming the payload.
//
// Binary capture files contain one SensorData message (WriteBinary
// writes one message per file), so this is typically called once. The readOffset
// is advanced past the message regardless, consistent with ReadNext.
// Returns io.EOF when no messages remain.
//
// Each SensorData entry in the capture file is stored as a length-prefixed
// protobuf message (standard protobuf framing). Rather than unmarshaling the
// full message into memory, this function hand-parses the wire format to locate
// the binary payload field and returns an io.SectionReader directly over that
// region of the file. This lets callers stream large payloads without buffering.
//
// Assumes proto field 1 (SensorMetadata) precedes proto field 3 (binary payload)
// within each SensorData message, which matches the encoding produced by our writers.
func (f *CaptureFile) BinaryPayloadReader() (*v1.SensorMetadata, int64, io.Reader, error) {
f.lock.Lock()
defer f.lock.Unlock()

// Seek to where we left off. seekOffset is captured before the seek so we can
// compute absolute file positions for the SectionReader later.
seekOffset := f.readOffset
if _, err := f.file.Seek(seekOffset, io.SeekStart); err != nil {
return nil, 0, nil, err
}

// The capture file is a sequence of length-delimited protobuf records. Each record
// is: [outerLen varint] [SensorData proto bytes].
// Read outerLen so we know where this message ends and the next begins.
varintCR := &countingByteReader{r: f.file}
outerLen, err := binary.ReadUvarint(varintCR)
if err != nil {
return nil, 0, nil, err // io.EOF means no more messages
}

// msgStart is the absolute file offset of the first byte of the SensorData fields
// (i.e., just past the outerLen varint). We use this later to anchor the SectionReader.
msgStart := seekOffset + varintCR.count

// Advance readOffset past this entire record so the next call starts at the right place,
// regardless of how much of the payload the caller actually consumes.
f.readOffset = msgStart + int64(outerLen)

// inner is bounded to exactly outerLen bytes so field parsing can never stray into
// the next record, even if the file is malformed.
inner := &countingByteReader{r: io.LimitReader(f.file, int64(outerLen))}

var sensorMeta *v1.SensorMetadata

// Walk the SensorData fields in wire order. Each field starts with a tag varint that
// encodes both the field number (upper bits) and the wire type (lower 3 bits), followed
// by the field value. We only need two fields:
// field 1 (BytesType) — SensorMetadata, decoded fully into memory (always small).
// field 3 (BytesType) — binary payload, returned as a SectionReader without buffering.
// All other fields are skipped so we remain forward-compatible with future additions.
for {
tagVal, err := binary.ReadUvarint(inner)
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return nil, 0, nil, fmt.Errorf("reading SensorData field tag: %w", err)
}

// Protobuf tag encoding: upper bits are the field number, lower 3 bits are the wire type.
fieldNum := protowire.Number(tagVal >> 3)
wireType := protowire.Type(tagVal & 0x7)

// Skip any non-length-delimited field (varint, fixed32, fixed64) by consuming its
// fixed-width value and moving on.
if wireType != protowire.BytesType {
var skipErr error
switch wireType { //nolint:exhaustive
case protowire.VarintType:
_, skipErr = binary.ReadUvarint(inner)
case protowire.Fixed32Type:
_, skipErr = io.CopyN(io.Discard, inner, 4)
case protowire.Fixed64Type:
_, skipErr = io.CopyN(io.Discard, inner, 8)
default:
return nil, 0, nil, fmt.Errorf("%w: unsupported wire type %d for field %d", ErrUnparsableBinaryCapture, wireType, fieldNum)
}
if skipErr != nil {
return nil, 0, nil, fmt.Errorf("skipping field %d (wire type %d): %w", fieldNum, wireType, skipErr)
}
continue
}

// For BytesType fields the value is: [fieldLen varint] [fieldLen bytes].
fieldLen, err := binary.ReadUvarint(inner)
if err != nil {
return nil, 0, nil, fmt.Errorf("reading field length for SensorData field %d: %w", fieldNum, err)
}

switch fieldNum { //nolint:exhaustive
case 1: // SensorMetadata — small proto message, safe to read fully into memory.
const maxSensorMetaBytes = 1024 * 1024 // 1 MiB; metadata is always tiny in practice
if fieldLen > maxSensorMetaBytes {
return nil, 0, nil, fmt.Errorf("%w: %d bytes (limit %d)", ErrSensorMetadataTooLarge, fieldLen, maxSensorMetaBytes)
}
metaBytes := make([]byte, fieldLen)
Comment thread
n0nick marked this conversation as resolved.
if _, err := io.ReadFull(inner, metaBytes); err != nil {
return nil, 0, nil, fmt.Errorf("reading SensorMetadata bytes: %w", err)
}
sensorMeta = &v1.SensorMetadata{}
if err := proto.Unmarshal(metaBytes, sensorMeta); err != nil {
return nil, 0, nil, fmt.Errorf("unmarshaling SensorMetadata: %w", err)
}
case 3: // binary payload (SensorData.binary oneof field)
if sensorMeta == nil {
return nil, 0, nil, errors.New("binary payload field appeared before SensorMetadata in capture file")
}
// inner.count is the number of bytes consumed from msgStart so far, so
// msgStart+inner.count is the absolute file offset where the payload bytes begin.
// SectionReader lets the caller read directly from the file at that window.
return sensorMeta, int64(fieldLen), io.NewSectionReader(f.file, msgStart+inner.count, int64(fieldLen)), nil
Comment thread
n0nick marked this conversation as resolved.
default:
// Unknown field — skip the value bytes and continue.
if _, err := io.CopyN(io.Discard, inner, int64(fieldLen)); err != nil {
return nil, 0, nil, fmt.Errorf("skipping SensorData field %d: %w", fieldNum, err)
}
}
}

return nil, 0, nil, ErrNoBinaryField
}

// countingByteReader wraps an io.Reader and tracks the total number of bytes read.
// It implements io.ByteReader so it can be passed to binary.ReadUvarint.
type countingByteReader struct {
r io.Reader
count int64
}

func (c *countingByteReader) ReadByte() (byte, error) {
var b [1]byte
n, err := c.r.Read(b[:])
c.count += int64(n)
if n == 1 {
return b[0], nil
}
return 0, err
}

func (c *countingByteReader) Read(p []byte) (int, error) {
n, err := c.r.Read(p)
c.count += int64(n)
return n, err
}

// CaptureFilePathWithReplacedReservedChars returns the filepath with substitutions
// for reserved characters.
func CaptureFilePathWithReplacedReservedChars(filepath string) string {
Expand Down
142 changes: 142 additions & 0 deletions data/capture_file_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
package data

import (
"encoding/binary"
"fmt"
"io"
"os"
"path/filepath"
"testing"

"github.com/matttproud/golang_protobuf_extensions/pbutil"
v1 "go.viam.com/api/app/datasync/v1"
"go.viam.com/test"
"google.golang.org/protobuf/encoding/protowire"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"

"go.viam.com/rdk/protoutils"
Expand Down Expand Up @@ -148,6 +156,140 @@ func TestBuildCaptureMetadata(t *testing.T) {
}
}

func TestBinaryPayloadReader(t *testing.T) {
// captureFileFromSensorData writes msgs to a new capture file, closes it (which
// renames it from .prog to .capture), then reopens it for reading.
captureFileFromSensorData := func(t *testing.T, msgs ...*v1.SensorData) *CaptureFile {
t.Helper()
dir := t.TempDir()
cf, err := NewCaptureFile(dir, &v1.DataCaptureMetadata{Type: v1.DataType_DATA_TYPE_BINARY_SENSOR})
test.That(t, err, test.ShouldBeNil)
for _, msg := range msgs {
test.That(t, cf.WriteNext(msg), test.ShouldBeNil)
}
test.That(t, cf.Close(), test.ShouldBeNil)
entries, err := os.ReadDir(dir)
test.That(t, err, test.ShouldBeNil)
test.That(t, len(entries), test.ShouldEqual, 1)
f, err := os.Open(filepath.Join(dir, entries[0].Name()))
test.That(t, err, test.ShouldBeNil)
readCF, err := ReadCaptureFile(f)
test.That(t, err, test.ShouldBeNil)
return readCF
}

// captureFileFromRawBytes builds a capture file with a manually constructed SensorData
// body, allowing tests to inject arbitrary wire-format bytes.
captureFileFromRawBytes := func(t *testing.T, body []byte) *CaptureFile {
t.Helper()
f, err := os.CreateTemp(t.TempDir(), "*"+CompletedCaptureFileExt)
test.That(t, err, test.ShouldBeNil)
_, err = pbutil.WriteDelimited(f, &v1.DataCaptureMetadata{Type: v1.DataType_DATA_TYPE_BINARY_SENSOR})
test.That(t, err, test.ShouldBeNil)
var lenBuf [binary.MaxVarintLen64]byte
n := binary.PutUvarint(lenBuf[:], uint64(len(body)))
_, err = f.Write(lenBuf[:n])
test.That(t, err, test.ShouldBeNil)
_, err = f.Write(body)
test.That(t, err, test.ShouldBeNil)
_, err = f.Seek(0, io.SeekStart)
test.That(t, err, test.ShouldBeNil)
cf, err := ReadCaptureFile(f)
test.That(t, err, test.ShouldBeNil)
return cf
}

tcs := []struct {
name string
setup func(t *testing.T) *CaptureFile
wantPayloads [][]byte
wantErr error
}{
{
name: "single message",
setup: func(t *testing.T) *CaptureFile {
return captureFileFromSensorData(t, &v1.SensorData{
Metadata: &v1.SensorMetadata{},
Data: &v1.SensorData_Binary{Binary: []byte("single binary payload")},
})
},
wantPayloads: [][]byte{[]byte("single binary payload")},
},
{
name: "multiple messages",
setup: func(t *testing.T) *CaptureFile {
msgs := make([]*v1.SensorData, 5)
for i := range msgs {
msgs[i] = &v1.SensorData{
Metadata: &v1.SensorMetadata{},
Data: &v1.SensorData_Binary{Binary: []byte(fmt.Sprintf("payload-%d", i))},
}
}
return captureFileFromSensorData(t, msgs...)
},
wantPayloads: [][]byte{
[]byte("payload-0"),
[]byte("payload-1"),
[]byte("payload-2"),
[]byte("payload-3"),
[]byte("payload-4"),
},
},
{
name: "unknown non-bytes wire type field is skipped",
setup: func(t *testing.T) *CaptureFile {
metaBytes, err := proto.Marshal(&v1.SensorMetadata{})
test.That(t, err, test.ShouldBeNil)
var body []byte
body = protowire.AppendTag(body, 1, protowire.BytesType)
body = protowire.AppendBytes(body, metaBytes)
// Unknown field 99 with varint wire type — must be skipped gracefully.
body = protowire.AppendTag(body, 99, protowire.VarintType)
body = protowire.AppendVarint(body, 42)
body = protowire.AppendTag(body, 3, protowire.BytesType)
body = protowire.AppendBytes(body, []byte("payload after unknown varint field"))
return captureFileFromRawBytes(t, body)
},
wantPayloads: [][]byte{[]byte("payload after unknown varint field")},
},
{
name: "no binary field returns error",
setup: func(t *testing.T) *CaptureFile {
return captureFileFromSensorData(t, &v1.SensorData{
Metadata: &v1.SensorMetadata{},
Data: &v1.SensorData_Struct{Struct: &structpb.Struct{}},
})
},
wantErr: ErrNoBinaryField,
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
cf := tc.setup(t)
cf.Reset()

if tc.wantErr != nil {
_, _, _, err := cf.BinaryPayloadReader()
test.That(t, err, test.ShouldBeError, tc.wantErr)
return
}

for _, want := range tc.wantPayloads {
meta, payloadLen, r, err := cf.BinaryPayloadReader()
test.That(t, err, test.ShouldBeNil)
test.That(t, meta, test.ShouldNotBeNil)
test.That(t, payloadLen, test.ShouldEqual, int64(len(want)))
got, err := io.ReadAll(r)
test.That(t, err, test.ShouldBeNil)
test.That(t, got, test.ShouldResemble, want)
}
_, _, _, err := cf.BinaryPayloadReader()
test.That(t, err, test.ShouldEqual, io.EOF)
})
}
}

// TestReadCorruptedFile ensures that if a file ends with invalid data (which can occur if a robot is killed uncleanly
// during a write, e.g. if the power is cut), the file is still successfully read up until that point.
func TestReadCorruptedFile(t *testing.T) {
Expand Down
Loading
Loading