Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions data/capture_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
CompletedCaptureFileExt = ".capture"
readImage = "ReadImage"
getAudio = "GetAudio"
getVideo = "GetVideo"
// GetImages is used for getting simultaneous images from different imagers.
GetImages = "GetImages"
nextPointCloud = "NextPointCloud"
Expand Down
5 changes: 4 additions & 1 deletion data/collector_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (dt CaptureType) ToProto() datasyncPB.DataType {
// MethodToCaptureType returns the DataType of the method.
func MethodToCaptureType(methodName string) CaptureType {
switch methodName {
case nextPointCloud, readImage, pointCloudMap, GetImages, captureAllFromCamera, getAudio:
case nextPointCloud, readImage, pointCloudMap, GetImages, captureAllFromCamera, getAudio, getVideo:
return CaptureTypeBinary
default:
return CaptureTypeTabular
Expand Down Expand Up @@ -388,6 +388,9 @@ func getFileExt(dataType CaptureType, methodName string, parameters map[string]i
return ExtDefault
}
}
if methodName == getVideo {
return ExtMP4
}
if methodName == getAudio {
switch parameters["codec"] {
case rutils.CodecPCM16, rutils.CodecPCM32, rutils.CodecPCM32Float:
Expand Down
2 changes: 2 additions & 0 deletions services/datamanager/builtin/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"go.viam.com/rdk/services/datamanager/builtin/shared"
datasync "go.viam.com/rdk/services/datamanager/builtin/sync"
"go.viam.com/rdk/services/slam"
"go.viam.com/rdk/services/video"
"go.viam.com/rdk/services/vision"
"go.viam.com/rdk/utils"
)
Expand Down Expand Up @@ -74,6 +75,7 @@ func init() {
WeakDependencies: []resource.Matcher{
resource.TypeMatcher{Type: resource.APITypeComponentName},
resource.SubtypeMatcher{Subtype: slam.SubtypeName},
resource.SubtypeMatcher{Subtype: video.SubtypeName},
resource.SubtypeMatcher{Subtype: vision.SubtypeName},
},
})
Expand Down
92 changes: 92 additions & 0 deletions services/video/collectors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package video

import (
"context"
"time"

"google.golang.org/protobuf/types/known/anypb"

"go.viam.com/rdk/data"
)

type method int64

const (
getVideo method = iota
doCommand
)

func (m method) String() string {
switch m {
case getVideo:
return "GetVideo"
case doCommand:
return "DoCommand"
}
return "Unknown"
}

func newGetVideoCollector(resource interface{}, params data.CollectorParams) (data.Collector, error) {
videoSvc, err := assertVideo(resource)
if err != nil {
return nil, err
}

cFunc := data.CaptureFunc(func(ctx context.Context, _ map[string]*anypb.Any) (data.CaptureResult, error) {
timeRequested := time.Now()
var res data.CaptureResult

startTime := timeRequested
endTime := startTime.Add(params.Interval)

videoChan, err := videoSvc.GetVideo(ctx, startTime, endTime, "h264", "mp4", data.FromDMExtraMap)
if err != nil {
if data.IsNoCaptureToStoreError(err) {
return res, err
}
return res, data.NewFailedToReadError(params.ComponentName, getVideo.String(), err)
}

var payload []byte
loop:
for {
select {
case <-ctx.Done():
break loop
case chunk, ok := <-videoChan:
if !ok {
break loop
}
payload = append(payload, chunk.Data...)
}
}

ts := data.Timestamps{
TimeRequested: timeRequested,
TimeReceived: time.Now(),
}
return data.NewBinaryCaptureResult(ts, []data.Binary{{
Payload: payload,
MimeType: "video/mp4",
}}), nil
})

return data.NewCollector(cFunc, params)
}

func assertVideo(resource interface{}) (Service, error) {
v, ok := resource.(Service)
if !ok {
return nil, data.InvalidInterfaceErr(API)
}
return v, nil
}

func newDoCommandCollector(resource interface{}, params data.CollectorParams) (data.Collector, error) {
videoSvc, err := assertVideo(resource)
if err != nil {
return nil, err
}
cFunc := data.NewDoCommandCaptureFunc(videoSvc, params)
return data.NewCollector(cFunc, params)
}
8 changes: 8 additions & 0 deletions services/video/export_collectors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// export_collectors_test.go adds functionality to the package that we only want to use and expose during testing.
package video

// Exported variables for testing collectors, see unexported collectors for implementation details.
var (
NewGetVideoCollector = newGetVideoCollector
NewDoCommandCollector = newDoCommandCollector
)
9 changes: 9 additions & 0 deletions services/video/video.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

servicepb "go.viam.com/api/service/video/v1"

"go.viam.com/rdk/data"
"go.viam.com/rdk/resource"
"go.viam.com/rdk/robot"
)
Expand All @@ -18,6 +19,14 @@ func init() {
RPCServiceDesc: &servicepb.VideoService_ServiceDesc,
RPCClient: NewClientFromConn,
})
data.RegisterCollector(data.MethodMetadata{
API: API,
MethodName: getVideo.String(),
}, newGetVideoCollector)
data.RegisterCollector(data.MethodMetadata{
API: API,
MethodName: doCommand.String(),
}, newDoCommandCollector)
}

// Chunk defines a chunk of video data.
Expand Down
Loading