diff --git a/data/capture_file.go b/data/capture_file.go index 96b44f5c062..bb92f9378e0 100644 --- a/data/capture_file.go +++ b/data/capture_file.go @@ -2,6 +2,8 @@ package data import ( "bufio" + "encoding/binary" + "fmt" "io" "os" "path/filepath" @@ -12,11 +14,26 @@ import ( "github.com/matttproud/golang_protobuf_extensions/pbutil" "github.com/pkg/errors" v1 "go.viam.com/api/app/datasync/v1" + "google.golang.org/protobuf/encoding/protowire" + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" "go.viam.com/rdk/resource" ) +// ErrNoBinaryField is returned by BinaryPayloadReader when a SensorData message +// contains no binary payload field. This typically indicates a legacy camera.GetImages +// file that stores tabular data in a BINARY_SENSOR-typed capture file. +var ErrNoBinaryField = errors.New("binary payload field not found in capture file") + +// ErrSensorMetadataTooLarge is returned by BinaryPayloadReader when the SensorMetadata +// field exceeds the size cap, indicating a corrupt or unexpected file. +var ErrSensorMetadataTooLarge = errors.New("SensorMetadata field exceeds size limit") + +// ErrUnparsableBinaryCapture is returned by BinaryPayloadReader when the file cannot +// be streamed due to an unexpected wire format (e.g. unknown wire type). +var ErrUnparsableBinaryCapture = errors.New("capture file cannot be streamed due to unexpected wire format") + // TODO Data-343: Reorganize this into a more standard interface/package, and add tests. const ( @@ -274,6 +291,158 @@ func SensorDataFromCaptureFile(f *CaptureFile) ([]*v1.SensorData, error) { return ret, nil } +// BinaryPayloadReader reads the next SensorData message from f without loading +// the binary payload into memory. It returns the SensorMetadata, payload size, +// and an io.Reader for streaming the payload. +// +// Binary capture files contain one SensorData message (WriteBinary +// writes one message per file), so this is typically called once. The readOffset +// is advanced past the message regardless, consistent with ReadNext. +// Returns io.EOF when no messages remain. +// +// Each SensorData entry in the capture file is stored as a length-prefixed +// protobuf message (standard protobuf framing). Rather than unmarshaling the +// full message into memory, this function hand-parses the wire format to locate +// the binary payload field and returns an io.SectionReader directly over that +// region of the file. This lets callers stream large payloads without buffering. +// +// Assumes proto field 1 (SensorMetadata) precedes proto field 3 (binary payload) +// within each SensorData message, which matches the encoding produced by our writers. +func (f *CaptureFile) BinaryPayloadReader() (*v1.SensorMetadata, int64, io.Reader, error) { + f.lock.Lock() + defer f.lock.Unlock() + + // Seek to where we left off. seekOffset is captured before the seek so we can + // compute absolute file positions for the SectionReader later. + seekOffset := f.readOffset + if _, err := f.file.Seek(seekOffset, io.SeekStart); err != nil { + return nil, 0, nil, err + } + + // The capture file is a sequence of length-delimited protobuf records. Each record + // is: [outerLen varint] [SensorData proto bytes]. + // Read outerLen so we know where this message ends and the next begins. + varintCR := &countingByteReader{r: f.file} + outerLen, err := binary.ReadUvarint(varintCR) + if err != nil { + return nil, 0, nil, err // io.EOF means no more messages + } + + // msgStart is the absolute file offset of the first byte of the SensorData fields + // (i.e., just past the outerLen varint). We use this later to anchor the SectionReader. + msgStart := seekOffset + varintCR.count + + // Advance readOffset past this entire record so the next call starts at the right place, + // regardless of how much of the payload the caller actually consumes. + f.readOffset = msgStart + int64(outerLen) + + // inner is bounded to exactly outerLen bytes so field parsing can never stray into + // the next record, even if the file is malformed. + inner := &countingByteReader{r: io.LimitReader(f.file, int64(outerLen))} + + var sensorMeta *v1.SensorMetadata + + // Walk the SensorData fields in wire order. Each field starts with a tag varint that + // encodes both the field number (upper bits) and the wire type (lower 3 bits), followed + // by the field value. We only need two fields: + // field 1 (BytesType) — SensorMetadata, decoded fully into memory (always small). + // field 3 (BytesType) — binary payload, returned as a SectionReader without buffering. + // All other fields are skipped so we remain forward-compatible with future additions. + for { + tagVal, err := binary.ReadUvarint(inner) + if err != nil { + if errors.Is(err, io.EOF) { + break + } + return nil, 0, nil, fmt.Errorf("reading SensorData field tag: %w", err) + } + + // Protobuf tag encoding: upper bits are the field number, lower 3 bits are the wire type. + fieldNum := protowire.Number(tagVal >> 3) + wireType := protowire.Type(tagVal & 0x7) + + // Skip any non-length-delimited field (varint, fixed32, fixed64) by consuming its + // fixed-width value and moving on. + if wireType != protowire.BytesType { + var skipErr error + switch wireType { //nolint:exhaustive + case protowire.VarintType: + _, skipErr = binary.ReadUvarint(inner) + case protowire.Fixed32Type: + _, skipErr = io.CopyN(io.Discard, inner, 4) + case protowire.Fixed64Type: + _, skipErr = io.CopyN(io.Discard, inner, 8) + default: + return nil, 0, nil, fmt.Errorf("%w: unsupported wire type %d for field %d", ErrUnparsableBinaryCapture, wireType, fieldNum) + } + if skipErr != nil { + return nil, 0, nil, fmt.Errorf("skipping field %d (wire type %d): %w", fieldNum, wireType, skipErr) + } + continue + } + + // For BytesType fields the value is: [fieldLen varint] [fieldLen bytes]. + fieldLen, err := binary.ReadUvarint(inner) + if err != nil { + return nil, 0, nil, fmt.Errorf("reading field length for SensorData field %d: %w", fieldNum, err) + } + + switch fieldNum { //nolint:exhaustive + case 1: // SensorMetadata — small proto message, safe to read fully into memory. + const maxSensorMetaBytes = 1024 * 1024 // 1 MiB; metadata is always tiny in practice + if fieldLen > maxSensorMetaBytes { + return nil, 0, nil, fmt.Errorf("%w: %d bytes (limit %d)", ErrSensorMetadataTooLarge, fieldLen, maxSensorMetaBytes) + } + metaBytes := make([]byte, fieldLen) + if _, err := io.ReadFull(inner, metaBytes); err != nil { + return nil, 0, nil, fmt.Errorf("reading SensorMetadata bytes: %w", err) + } + sensorMeta = &v1.SensorMetadata{} + if err := proto.Unmarshal(metaBytes, sensorMeta); err != nil { + return nil, 0, nil, fmt.Errorf("unmarshaling SensorMetadata: %w", err) + } + case 3: // binary payload (SensorData.binary oneof field) + if sensorMeta == nil { + return nil, 0, nil, errors.New("binary payload field appeared before SensorMetadata in capture file") + } + // inner.count is the number of bytes consumed from msgStart so far, so + // msgStart+inner.count is the absolute file offset where the payload bytes begin. + // SectionReader lets the caller read directly from the file at that window. + return sensorMeta, int64(fieldLen), io.NewSectionReader(f.file, msgStart+inner.count, int64(fieldLen)), nil + default: + // Unknown field — skip the value bytes and continue. + if _, err := io.CopyN(io.Discard, inner, int64(fieldLen)); err != nil { + return nil, 0, nil, fmt.Errorf("skipping SensorData field %d: %w", fieldNum, err) + } + } + } + + return nil, 0, nil, ErrNoBinaryField +} + +// countingByteReader wraps an io.Reader and tracks the total number of bytes read. +// It implements io.ByteReader so it can be passed to binary.ReadUvarint. +type countingByteReader struct { + r io.Reader + count int64 +} + +func (c *countingByteReader) ReadByte() (byte, error) { + var b [1]byte + n, err := c.r.Read(b[:]) + c.count += int64(n) + if n == 1 { + return b[0], nil + } + return 0, err +} + +func (c *countingByteReader) Read(p []byte) (int, error) { + n, err := c.r.Read(p) + c.count += int64(n) + return n, err +} + // CaptureFilePathWithReplacedReservedChars returns the filepath with substitutions // for reserved characters. func CaptureFilePathWithReplacedReservedChars(filepath string) string { diff --git a/data/capture_file_test.go b/data/capture_file_test.go index 17553aa528b..a82d962ef12 100644 --- a/data/capture_file_test.go +++ b/data/capture_file_test.go @@ -1,10 +1,18 @@ package data import ( + "encoding/binary" + "fmt" + "io" + "os" + "path/filepath" "testing" + "github.com/matttproud/golang_protobuf_extensions/pbutil" v1 "go.viam.com/api/app/datasync/v1" "go.viam.com/test" + "google.golang.org/protobuf/encoding/protowire" + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/structpb" "go.viam.com/rdk/protoutils" @@ -148,6 +156,140 @@ func TestBuildCaptureMetadata(t *testing.T) { } } +func TestBinaryPayloadReader(t *testing.T) { + // captureFileFromSensorData writes msgs to a new capture file, closes it (which + // renames it from .prog to .capture), then reopens it for reading. + captureFileFromSensorData := func(t *testing.T, msgs ...*v1.SensorData) *CaptureFile { + t.Helper() + dir := t.TempDir() + cf, err := NewCaptureFile(dir, &v1.DataCaptureMetadata{Type: v1.DataType_DATA_TYPE_BINARY_SENSOR}) + test.That(t, err, test.ShouldBeNil) + for _, msg := range msgs { + test.That(t, cf.WriteNext(msg), test.ShouldBeNil) + } + test.That(t, cf.Close(), test.ShouldBeNil) + entries, err := os.ReadDir(dir) + test.That(t, err, test.ShouldBeNil) + test.That(t, len(entries), test.ShouldEqual, 1) + f, err := os.Open(filepath.Join(dir, entries[0].Name())) + test.That(t, err, test.ShouldBeNil) + readCF, err := ReadCaptureFile(f) + test.That(t, err, test.ShouldBeNil) + return readCF + } + + // captureFileFromRawBytes builds a capture file with a manually constructed SensorData + // body, allowing tests to inject arbitrary wire-format bytes. + captureFileFromRawBytes := func(t *testing.T, body []byte) *CaptureFile { + t.Helper() + f, err := os.CreateTemp(t.TempDir(), "*"+CompletedCaptureFileExt) + test.That(t, err, test.ShouldBeNil) + _, err = pbutil.WriteDelimited(f, &v1.DataCaptureMetadata{Type: v1.DataType_DATA_TYPE_BINARY_SENSOR}) + test.That(t, err, test.ShouldBeNil) + var lenBuf [binary.MaxVarintLen64]byte + n := binary.PutUvarint(lenBuf[:], uint64(len(body))) + _, err = f.Write(lenBuf[:n]) + test.That(t, err, test.ShouldBeNil) + _, err = f.Write(body) + test.That(t, err, test.ShouldBeNil) + _, err = f.Seek(0, io.SeekStart) + test.That(t, err, test.ShouldBeNil) + cf, err := ReadCaptureFile(f) + test.That(t, err, test.ShouldBeNil) + return cf + } + + tcs := []struct { + name string + setup func(t *testing.T) *CaptureFile + wantPayloads [][]byte + wantErr error + }{ + { + name: "single message", + setup: func(t *testing.T) *CaptureFile { + return captureFileFromSensorData(t, &v1.SensorData{ + Metadata: &v1.SensorMetadata{}, + Data: &v1.SensorData_Binary{Binary: []byte("single binary payload")}, + }) + }, + wantPayloads: [][]byte{[]byte("single binary payload")}, + }, + { + name: "multiple messages", + setup: func(t *testing.T) *CaptureFile { + msgs := make([]*v1.SensorData, 5) + for i := range msgs { + msgs[i] = &v1.SensorData{ + Metadata: &v1.SensorMetadata{}, + Data: &v1.SensorData_Binary{Binary: []byte(fmt.Sprintf("payload-%d", i))}, + } + } + return captureFileFromSensorData(t, msgs...) + }, + wantPayloads: [][]byte{ + []byte("payload-0"), + []byte("payload-1"), + []byte("payload-2"), + []byte("payload-3"), + []byte("payload-4"), + }, + }, + { + name: "unknown non-bytes wire type field is skipped", + setup: func(t *testing.T) *CaptureFile { + metaBytes, err := proto.Marshal(&v1.SensorMetadata{}) + test.That(t, err, test.ShouldBeNil) + var body []byte + body = protowire.AppendTag(body, 1, protowire.BytesType) + body = protowire.AppendBytes(body, metaBytes) + // Unknown field 99 with varint wire type — must be skipped gracefully. + body = protowire.AppendTag(body, 99, protowire.VarintType) + body = protowire.AppendVarint(body, 42) + body = protowire.AppendTag(body, 3, protowire.BytesType) + body = protowire.AppendBytes(body, []byte("payload after unknown varint field")) + return captureFileFromRawBytes(t, body) + }, + wantPayloads: [][]byte{[]byte("payload after unknown varint field")}, + }, + { + name: "no binary field returns error", + setup: func(t *testing.T) *CaptureFile { + return captureFileFromSensorData(t, &v1.SensorData{ + Metadata: &v1.SensorMetadata{}, + Data: &v1.SensorData_Struct{Struct: &structpb.Struct{}}, + }) + }, + wantErr: ErrNoBinaryField, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + cf := tc.setup(t) + cf.Reset() + + if tc.wantErr != nil { + _, _, _, err := cf.BinaryPayloadReader() + test.That(t, err, test.ShouldBeError, tc.wantErr) + return + } + + for _, want := range tc.wantPayloads { + meta, payloadLen, r, err := cf.BinaryPayloadReader() + test.That(t, err, test.ShouldBeNil) + test.That(t, meta, test.ShouldNotBeNil) + test.That(t, payloadLen, test.ShouldEqual, int64(len(want))) + got, err := io.ReadAll(r) + test.That(t, err, test.ShouldBeNil) + test.That(t, got, test.ShouldResemble, want) + } + _, _, _, err := cf.BinaryPayloadReader() + test.That(t, err, test.ShouldEqual, io.EOF) + }) + } +} + // TestReadCorruptedFile ensures that if a file ends with invalid data (which can occur if a robot is killed uncleanly // during a write, e.g. if the power is cut), the file is still successfully read up until that point. func TestReadCorruptedFile(t *testing.T) { diff --git a/services/datamanager/builtin/sync/upload_data_capture_file.go b/services/datamanager/builtin/sync/upload_data_capture_file.go index f965c2ef46e..16594adacb8 100644 --- a/services/datamanager/builtin/sync/upload_data_capture_file.go +++ b/services/datamanager/builtin/sync/upload_data_capture_file.go @@ -1,7 +1,9 @@ package sync import ( + "bytes" "context" + "io" "sync/atomic" "github.com/docker/go-units" @@ -45,6 +47,88 @@ func uploadDataCaptureFile( logger.Debugf("preparing to upload data capture file: %s, size: %d", f.GetPath(), f.Size()) md := f.ReadMetadata() + + if md.GetType() == datasyncPB.DataType_DATA_TYPE_BINARY_SENSOR { + n, err := uploadBinaryPayloads(ctx, f, conn, md, logger, bytesUploadingCounter) + switch { + case err == nil: + return n, nil + case errors.Is(err, data.ErrNoBinaryField): + // Legacy camera.GetImages file — tabular data in a BINARY_SENSOR-typed file. + // Fall through to in-memory path. + case errors.Is(err, data.ErrSensorMetadataTooLarge) || errors.Is(err, data.ErrUnparsableBinaryCapture): + // File cannot be streamed due to unexpected format. Fall back with a warning. + logger.Warnf("cannot stream binary payload for %s, falling back to in-memory upload: %v", f.GetPath(), err) + default: + return 0, err + } + } + + return uploadFromMemory(ctx, f, conn, md, logger, bytesUploadingCounter) +} + +// uploadBinaryPayloads streams each binary payload in f directly from disk without +// loading it into memory. Returns data.ErrNoBinaryField if the first message has no +// binary field, which indicates a legacy camera.GetImages file that must be handled +// by uploadFromMemory instead. +func uploadBinaryPayloads( + ctx context.Context, + f *data.CaptureFile, + conn cloudConn, + md *datasyncPB.DataCaptureMetadata, + logger logging.Logger, + bytesUploadingCounter *atomic.Uint64, +) (uint64, error) { + f.Reset() + uploadMD := uploadMetadata(conn.partID, md) + msgIdx := 0 + for { + sensorMeta, payloadLen, r, err := f.BinaryPayloadReader() + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { + break + } + if err != nil { + return 0, errors.Wrap(err, "reading binary payload from capture file") + } + clonedMD := proto.Clone(uploadMD).(*datasyncPB.UploadMetadata) + if payloadLen > MaxUnaryFileSize { + logger.Debugf("streaming large binary payload (%d bytes), message %d: %s", payloadLen, msgIdx, f.GetPath()) + if err := uploadLargeBinaryFromReader(ctx, conn.client, clonedMD, sensorMeta, r, f.GetPath(), + logger, bytesUploadingCounter); err != nil { + return 0, err + } + } else { + logger.Debugf("uploading small binary payload (%d bytes), message %d: %s", payloadLen, msgIdx, f.GetPath()) + payload, err := io.ReadAll(r) + if err != nil { + return 0, errors.Wrap(err, "reading binary payload into memory") + } + sd := &datasyncPB.SensorData{ + Metadata: sensorMeta, + Data: &datasyncPB.SensorData_Binary{Binary: payload}, + } + if err := uploadBinarySensorData(ctx, conn.client, clonedMD, sd, bytesUploadingCounter); err != nil { + return 0, err + } + } + msgIdx++ + } + if msgIdx == 0 { + return 0, data.ErrNoBinaryField + } + return uint64(f.Size()), nil +} + +// uploadFromMemory loads all sensor data from f into memory and uploads it. It handles +// tabular files and legacy camera.GetImages binary files. +func uploadFromMemory( + ctx context.Context, + f *data.CaptureFile, + conn cloudConn, + md *datasyncPB.DataCaptureMetadata, + logger logging.Logger, + bytesUploadingCounter *atomic.Uint64, +) (uint64, error) { sensorData, err := data.SensorDataFromCaptureFile(f) if err != nil { return 0, errors.Wrap(err, "error reading sensor data") @@ -338,7 +422,7 @@ func uploadLargeBinarySensorData( } // Then call the function to send the rest. - if err := sendStreamingDCRequests(ctx, c, sd.GetBinary(), path, logger, bytesUploadingCounter); err != nil { + if err := sendStreamingDCRequests(ctx, c, bytes.NewReader(sd.GetBinary()), path, logger, bytesUploadingCounter); err != nil { return errors.Wrap(err, "StreamingDataCaptureUpload failed to sync") } @@ -349,51 +433,88 @@ func uploadLargeBinarySensorData( return nil } +// uploadLargeBinaryFromReader streams the binary payload from r without loading it +// into memory. sensorMeta may be nil if the capture file had no SensorMetadata. +func uploadLargeBinaryFromReader( + ctx context.Context, + client datasyncPB.DataSyncServiceClient, + md *datasyncPB.UploadMetadata, + sensorMeta *datasyncPB.SensorMetadata, + r io.Reader, + path string, + logger logging.Logger, + bytesUploadingCounter *atomic.Uint64, +) error { + c, err := client.StreamingDataCaptureUpload(ctx) + if err != nil { + return errors.Wrap(err, "error creating StreamingDataCaptureUpload client") + } + + fileExtensionFromMimeType := getFileExtFromStringMimeType(md.GetMimeType()) + if fileExtensionFromMimeType != "" { + md.FileExtension = fileExtensionFromMimeType + } + + if err := c.Send(&datasyncPB.StreamingDataCaptureUploadRequest{ + UploadPacket: &datasyncPB.StreamingDataCaptureUploadRequest_Metadata{ + Metadata: &datasyncPB.DataCaptureUploadMetadata{ + UploadMetadata: md, + SensorMetadata: sensorMeta, + }, + }, + }); err != nil { + return errors.Wrap(err, "StreamingDataCaptureUpload failed sending metadata") + } + + if err := sendStreamingDCRequests(ctx, c, r, path, logger, bytesUploadingCounter); err != nil { + return errors.Wrap(err, "StreamingDataCaptureUpload failed to sync") + } + + if _, err = c.CloseAndRecv(); err != nil { + return errors.Wrap(err, "StreamingDataCaptureUpload CloseAndRecv failed") + } + return nil +} + func sendStreamingDCRequests( ctx context.Context, stream datasyncPB.DataSyncService_StreamingDataCaptureUploadClient, - contents []byte, + r io.Reader, path string, logger logging.Logger, bytesUploadingCounter *atomic.Uint64, ) error { - // Loop until there is no more content to send. + buf := make([]byte, UploadChunkSize) chunkCount := 0 - for i := 0; i < len(contents); i += UploadChunkSize { + for { select { case <-ctx.Done(): return ctx.Err() default: - // Get the next chunk from contents. - end := i + UploadChunkSize - if end > len(contents) { - end = len(contents) + n, err := io.ReadFull(r, buf) + if n > 0 { + uploadReq := &datasyncPB.StreamingDataCaptureUploadRequest{ + UploadPacket: &datasyncPB.StreamingDataCaptureUploadRequest_Data{ + Data: buf[:n], + }, + } + logger.Debugf("datasync.StreamingDataCaptureUpload sending chunk %d for file: %s", chunkCount, path) + if sendErr := stream.Send(uploadReq); sendErr != nil { + return sendErr + } + if bytesUploadingCounter != nil { + bytesUploadingCounter.Add(uint64(n)) + } + chunkCount++ } - chunk := contents[i:end] - - // Build request with contents. - uploadReq := &datasyncPB.StreamingDataCaptureUploadRequest{ - UploadPacket: &datasyncPB.StreamingDataCaptureUploadRequest_Data{ - Data: chunk, - }, + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { + return nil } - - // Send request - logger.Debugf("datasync.StreamingDataCaptureUpload sending chunk %d starting at byte index %d for file: %s", chunkCount, i, path) - if err := stream.Send(uploadReq); err != nil { + if err != nil { return err } - - // Update byte counter after successful chunk upload. - if bytesUploadingCounter != nil { - bytesUploadingCounter.Add(uint64(len(chunk))) - } - - chunkCount++ } } - - return nil } func getFileExtFromImageMimeType(mimeType string) string {