diff --git a/data/capture_file.go b/data/capture_file.go index 1f0636eadb3..1a21479ebfb 100644 --- a/data/capture_file.go +++ b/data/capture_file.go @@ -28,6 +28,7 @@ const ( CompletedCaptureFileExt = ".capture" readImage = "ReadImage" getAudio = "GetAudio" + getVideo = "GetVideo" // GetImages is used for getting simultaneous images from different imagers. GetImages = "GetImages" nextPointCloud = "NextPointCloud" diff --git a/data/collector_types.go b/data/collector_types.go index 659f5f3bb22..947505185eb 100644 --- a/data/collector_types.go +++ b/data/collector_types.go @@ -210,7 +210,7 @@ func (dt CaptureType) ToProto() datasyncPB.DataType { // MethodToCaptureType returns the DataType of the method. func MethodToCaptureType(methodName string) CaptureType { switch methodName { - case nextPointCloud, readImage, pointCloudMap, GetImages, captureAllFromCamera, getAudio: + case nextPointCloud, readImage, pointCloudMap, GetImages, captureAllFromCamera, getAudio, getVideo: return CaptureTypeBinary default: return CaptureTypeTabular @@ -388,6 +388,9 @@ func getFileExt(dataType CaptureType, methodName string, parameters map[string]i return ExtDefault } } + if methodName == getVideo { + return ExtMP4 + } if methodName == getAudio { switch parameters["codec"] { case rutils.CodecPCM16, rutils.CodecPCM32, rutils.CodecPCM32Float: diff --git a/services/datamanager/builtin/builtin.go b/services/datamanager/builtin/builtin.go index 84f6099f48a..e6d16184063 100644 --- a/services/datamanager/builtin/builtin.go +++ b/services/datamanager/builtin/builtin.go @@ -33,6 +33,7 @@ import ( "go.viam.com/rdk/services/datamanager/builtin/shared" datasync "go.viam.com/rdk/services/datamanager/builtin/sync" "go.viam.com/rdk/services/slam" + "go.viam.com/rdk/services/video" "go.viam.com/rdk/services/vision" "go.viam.com/rdk/utils" ) @@ -74,6 +75,7 @@ func init() { WeakDependencies: []resource.Matcher{ resource.TypeMatcher{Type: resource.APITypeComponentName}, resource.SubtypeMatcher{Subtype: slam.SubtypeName}, + resource.SubtypeMatcher{Subtype: video.SubtypeName}, resource.SubtypeMatcher{Subtype: vision.SubtypeName}, }, }) diff --git a/services/video/collectors.go b/services/video/collectors.go new file mode 100644 index 00000000000..2da78500960 --- /dev/null +++ b/services/video/collectors.go @@ -0,0 +1,92 @@ +package video + +import ( + "context" + "time" + + "google.golang.org/protobuf/types/known/anypb" + + "go.viam.com/rdk/data" +) + +type method int64 + +const ( + getVideo method = iota + doCommand +) + +func (m method) String() string { + switch m { + case getVideo: + return "GetVideo" + case doCommand: + return "DoCommand" + } + return "Unknown" +} + +func newGetVideoCollector(resource interface{}, params data.CollectorParams) (data.Collector, error) { + videoSvc, err := assertVideo(resource) + if err != nil { + return nil, err + } + + cFunc := data.CaptureFunc(func(ctx context.Context, _ map[string]*anypb.Any) (data.CaptureResult, error) { + timeRequested := time.Now() + var res data.CaptureResult + + startTime := timeRequested + endTime := startTime.Add(params.Interval) + + videoChan, err := videoSvc.GetVideo(ctx, startTime, endTime, "h264", "mp4", data.FromDMExtraMap) + if err != nil { + if data.IsNoCaptureToStoreError(err) { + return res, err + } + return res, data.NewFailedToReadError(params.ComponentName, getVideo.String(), err) + } + + var payload []byte + loop: + for { + select { + case <-ctx.Done(): + break loop + case chunk, ok := <-videoChan: + if !ok { + break loop + } + payload = append(payload, chunk.Data...) + } + } + + ts := data.Timestamps{ + TimeRequested: timeRequested, + TimeReceived: time.Now(), + } + return data.NewBinaryCaptureResult(ts, []data.Binary{{ + Payload: payload, + MimeType: "video/mp4", + }}), nil + }) + + return data.NewCollector(cFunc, params) +} + +func assertVideo(resource interface{}) (Service, error) { + v, ok := resource.(Service) + if !ok { + return nil, data.InvalidInterfaceErr(API) + } + return v, nil +} + +func newDoCommandCollector(resource interface{}, params data.CollectorParams) (data.Collector, error) { + videoSvc, err := assertVideo(resource) + if err != nil { + return nil, err + } + cFunc := data.NewDoCommandCaptureFunc(videoSvc, params) + return data.NewCollector(cFunc, params) +} diff --git a/services/video/export_collectors_test.go b/services/video/export_collectors_test.go new file mode 100644 index 00000000000..438eaa91223 --- /dev/null +++ b/services/video/export_collectors_test.go @@ -0,0 +1,8 @@ +// export_collectors_test.go adds functionality to the package that we only want to use and expose during testing. +package video + +// Exported variables for testing collectors, see unexported collectors for implementation details. +var ( + NewGetVideoCollector = newGetVideoCollector + NewDoCommandCollector = newDoCommandCollector +) diff --git a/services/video/video.go b/services/video/video.go index 251d935f478..daa9739199f 100644 --- a/services/video/video.go +++ b/services/video/video.go @@ -7,6 +7,7 @@ import ( servicepb "go.viam.com/api/service/video/v1" + "go.viam.com/rdk/data" "go.viam.com/rdk/resource" "go.viam.com/rdk/robot" ) @@ -18,6 +19,14 @@ func init() { RPCServiceDesc: &servicepb.VideoService_ServiceDesc, RPCClient: NewClientFromConn, }) + data.RegisterCollector(data.MethodMetadata{ + API: API, + MethodName: getVideo.String(), + }, newGetVideoCollector) + data.RegisterCollector(data.MethodMetadata{ + API: API, + MethodName: doCommand.String(), + }, newDoCommandCollector) } // Chunk defines a chunk of video data.