Skip to content
124 changes: 124 additions & 0 deletions data/capture_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package data

import (
"bufio"
"encoding/binary"
"fmt"
"io"
"os"
"path/filepath"
Expand All @@ -12,6 +14,8 @@ import (
"github.com/matttproud/golang_protobuf_extensions/pbutil"
"github.com/pkg/errors"
v1 "go.viam.com/api/app/datasync/v1"
"google.golang.org/protobuf/encoding/protowire"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"

"go.viam.com/rdk/resource"
Expand Down Expand Up @@ -274,6 +278,126 @@ func SensorDataFromCaptureFile(f *CaptureFile) ([]*v1.SensorData, error) {
return ret, nil
}

// BinaryPayloadReader reads the next SensorData message from f without loading
// the binary payload into memory. It returns the SensorMetadata, payload size,
// and an io.Reader for streaming the payload.
//
// Successive calls advance through the file; call f.Reset() to restart.
// Returns io.EOF when no messages remain.
Comment thread
n0nick marked this conversation as resolved.
Outdated
//
// Assumes SensorMetadata (field 1) precedes the binary payload (field 3).
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do field 1 field 3 mean here? I'm assuming proto SensorData message fields, if so let's clarify

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generaly, there's a lot of proto magic here, so i think some paragraph explaining the internals of what's this doing would be helpful for future readers. also comment on magic numbers like L327-328

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

field 1 = sensormetadata and field 3 = binary payload.
true, added comments in this function, lmk if makes sense

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

much better, thank you 👍

func (f *CaptureFile) BinaryPayloadReader() (*v1.SensorMetadata, int64, io.Reader, error) {
f.lock.Lock()
defer f.lock.Unlock()

if err := f.writer.Flush(); err != nil {
Comment thread
n0nick marked this conversation as resolved.
Outdated
return nil, 0, nil, err
}

seekOffset := f.readOffset
if _, err := f.file.Seek(seekOffset, io.SeekStart); err != nil {
return nil, 0, nil, err
}

varintCR := &countingByteReader{r: f.file}

// Read the outer length-prefix varint to advance readOffset past the full message.
outerLen, err := binary.ReadUvarint(varintCR)
if err != nil {
return nil, 0, nil, err // io.EOF means no more messages
}
// msgStart is the absolute file offset of the first field in this SensorData message.
msgStart := seekOffset + varintCR.count
f.readOffset = msgStart + int64(outerLen)

// Bound field parsing to exactly outerLen bytes so we can't read into the next message.
inner := &countingByteReader{r: io.LimitReader(f.file, int64(outerLen))}

var sensorMeta *v1.SensorMetadata

for {
tagVal, err := binary.ReadUvarint(inner)
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return nil, 0, nil, fmt.Errorf("reading SensorData field tag: %w", err)
}

fieldNum := protowire.Number(tagVal >> 3)
wireType := protowire.Type(tagVal & 0x7)

// Skip non-bytes wire type fields to remain forward-compatible with fields
// added by future server versions.
if wireType != protowire.BytesType {
var skipErr error
switch wireType { //nolint:exhaustive
case protowire.VarintType:
_, skipErr = binary.ReadUvarint(inner)
case protowire.Fixed32Type:
_, skipErr = io.CopyN(io.Discard, inner, 4)
case protowire.Fixed64Type:
_, skipErr = io.CopyN(io.Discard, inner, 8)
default:
return nil, 0, nil, fmt.Errorf("unsupported wire type %d for field %d in SensorData", wireType, fieldNum)
}
if skipErr != nil {
return nil, 0, nil, fmt.Errorf("skipping field %d (wire type %d): %w", fieldNum, wireType, skipErr)
}
continue
}

fieldLen, err := binary.ReadUvarint(inner)
if err != nil {
return nil, 0, nil, fmt.Errorf("reading field length for SensorData field %d: %w", fieldNum, err)
}

switch fieldNum { //nolint:exhaustive
case 1: // SensorMetadata
metaBytes := make([]byte, fieldLen)
Comment thread
n0nick marked this conversation as resolved.
if _, err := io.ReadFull(inner, metaBytes); err != nil {
return nil, 0, nil, fmt.Errorf("reading SensorMetadata bytes: %w", err)
}
sensorMeta = &v1.SensorMetadata{}
if err := proto.Unmarshal(metaBytes, sensorMeta); err != nil {
return nil, 0, nil, fmt.Errorf("unmarshaling SensorMetadata: %w", err)
}
case 3: // binary payload (SensorData.binary oneof field)
// inner.count bytes consumed since msgStart; binary data starts here.
return sensorMeta, int64(fieldLen), io.NewSectionReader(f.file, msgStart+inner.count, int64(fieldLen)), nil
Comment thread
n0nick marked this conversation as resolved.
default:
if _, err := io.CopyN(io.Discard, inner, int64(fieldLen)); err != nil {
return nil, 0, nil, fmt.Errorf("skipping SensorData field %d: %w", fieldNum, err)
}
}
}

return nil, 0, nil, errors.New("binary payload field not found in capture file")
}

// countingByteReader wraps an io.Reader and tracks the total number of bytes read.
// It implements io.ByteReader so it can be passed to binary.ReadUvarint.
type countingByteReader struct {
r io.Reader
count int64
}

func (c *countingByteReader) ReadByte() (byte, error) {
var b [1]byte
n, err := c.r.Read(b[:])
c.count += int64(n)
if n == 1 {
return b[0], nil
}
return 0, err
}

func (c *countingByteReader) Read(p []byte) (int, error) {
n, err := c.r.Read(p)
c.count += int64(n)
return n, err
}

// CaptureFilePathWithReplacedReservedChars returns the filepath with substitutions
// for reserved characters.
func CaptureFilePathWithReplacedReservedChars(filepath string) string {
Expand Down
130 changes: 130 additions & 0 deletions data/capture_file_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
package data

import (
"encoding/binary"
"fmt"
"io"
"os"
"testing"

"github.com/matttproud/golang_protobuf_extensions/pbutil"
v1 "go.viam.com/api/app/datasync/v1"
"go.viam.com/test"
"google.golang.org/protobuf/encoding/protowire"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"

"go.viam.com/rdk/protoutils"
Expand Down Expand Up @@ -148,6 +155,129 @@ func TestBuildCaptureMetadata(t *testing.T) {
}
}

func TestBinaryPayloadReader(t *testing.T) {
// writeRawCaptureFile builds a capture file with a manually constructed SensorData
// body, allowing tests to inject arbitrary wire-format bytes.
writeRawCaptureFile := func(t *testing.T, body []byte) *CaptureFile {
t.Helper()
f, err := os.CreateTemp(t.TempDir(), "*"+CompletedCaptureFileExt)
test.That(t, err, test.ShouldBeNil)
_, err = pbutil.WriteDelimited(f, &v1.DataCaptureMetadata{Type: v1.DataType_DATA_TYPE_BINARY_SENSOR})
test.That(t, err, test.ShouldBeNil)
var lenBuf [binary.MaxVarintLen64]byte
n := binary.PutUvarint(lenBuf[:], uint64(len(body)))
_, err = f.Write(lenBuf[:n])
test.That(t, err, test.ShouldBeNil)
_, err = f.Write(body)
test.That(t, err, test.ShouldBeNil)
_, err = f.Seek(0, io.SeekStart)
test.That(t, err, test.ShouldBeNil)
cf, err := ReadCaptureFile(f)
test.That(t, err, test.ShouldBeNil)
return cf
}

tcs := []struct {
name string
setup func(t *testing.T) *CaptureFile
wantPayloads [][]byte
wantErr bool
}{
{
name: "single message",
setup: func(t *testing.T) *CaptureFile {
cf, err := NewCaptureFile(t.TempDir(), &v1.DataCaptureMetadata{Type: v1.DataType_DATA_TYPE_BINARY_SENSOR})
test.That(t, err, test.ShouldBeNil)
err = cf.WriteNext(&v1.SensorData{
Metadata: &v1.SensorMetadata{},
Data: &v1.SensorData_Binary{Binary: []byte("single binary payload")},
})
test.That(t, err, test.ShouldBeNil)
return cf
},
wantPayloads: [][]byte{[]byte("single binary payload")},
},
{
name: "multiple messages",
setup: func(t *testing.T) *CaptureFile {
cf, err := NewCaptureFile(t.TempDir(), &v1.DataCaptureMetadata{Type: v1.DataType_DATA_TYPE_BINARY_SENSOR})
test.That(t, err, test.ShouldBeNil)
for i := 0; i < 5; i++ {
err = cf.WriteNext(&v1.SensorData{
Metadata: &v1.SensorMetadata{},
Data: &v1.SensorData_Binary{Binary: []byte(fmt.Sprintf("payload-%d", i))},
})
test.That(t, err, test.ShouldBeNil)
}
return cf
},
wantPayloads: [][]byte{
[]byte("payload-0"),
[]byte("payload-1"),
[]byte("payload-2"),
[]byte("payload-3"),
[]byte("payload-4"),
},
},
{
name: "unknown non-bytes wire type field is skipped",
setup: func(t *testing.T) *CaptureFile {
metaBytes, err := proto.Marshal(&v1.SensorMetadata{})
test.That(t, err, test.ShouldBeNil)
var body []byte
body = protowire.AppendTag(body, 1, protowire.BytesType)
body = protowire.AppendBytes(body, metaBytes)
// Unknown field 99 with varint wire type — must be skipped gracefully.
body = protowire.AppendTag(body, 99, protowire.VarintType)
body = protowire.AppendVarint(body, 42)
body = protowire.AppendTag(body, 3, protowire.BytesType)
body = protowire.AppendBytes(body, []byte("payload after unknown varint field"))
return writeRawCaptureFile(t, body)
},
wantPayloads: [][]byte{[]byte("payload after unknown varint field")},
},
{
name: "no binary field returns error",
setup: func(t *testing.T) *CaptureFile {
cf, err := NewCaptureFile(t.TempDir(), &v1.DataCaptureMetadata{Type: v1.DataType_DATA_TYPE_TABULAR_SENSOR})
test.That(t, err, test.ShouldBeNil)
err = cf.WriteNext(&v1.SensorData{
Metadata: &v1.SensorMetadata{},
Data: &v1.SensorData_Struct{Struct: &structpb.Struct{}},
})
test.That(t, err, test.ShouldBeNil)
return cf
},
wantErr: true,
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
cf := tc.setup(t)
cf.Reset()

if tc.wantErr {
_, _, _, err := cf.BinaryPayloadReader()
test.That(t, err, test.ShouldNotBeNil)
return
}

for _, want := range tc.wantPayloads {
meta, payloadLen, r, err := cf.BinaryPayloadReader()
test.That(t, err, test.ShouldBeNil)
test.That(t, meta, test.ShouldNotBeNil)
test.That(t, payloadLen, test.ShouldEqual, int64(len(want)))
got, err := io.ReadAll(r)
test.That(t, err, test.ShouldBeNil)
test.That(t, got, test.ShouldResemble, want)
}
_, _, _, err := cf.BinaryPayloadReader()
test.That(t, err, test.ShouldEqual, io.EOF)
})
}
}

// TestReadCorruptedFile ensures that if a file ends with invalid data (which can occur if a robot is killed uncleanly
// during a write, e.g. if the power is cut), the file is still successfully read up until that point.
func TestReadCorruptedFile(t *testing.T) {
Expand Down
Loading
Loading