Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ import (
"io"
)

type contextKey int

const (
// ActionDigestSizeBytesKey carries the action digest size_bytes to the proxy,
// which needs it to issue a valid GetActionResult request.
ActionDigestSizeBytesKey contextKey = iota
)

// EntryKind describes the kind of cache entry
type EntryKind int

Expand Down
55 changes: 47 additions & 8 deletions cache/grpcproxy/grpcproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,16 +166,24 @@
resourceName := fmt.Sprintf(template, uuid.New().String(), item.Hash, item.LogicalSize)

firstIteration := true
writeOffset := int64(0)
for {
n, err := item.Rc.Read(buf)
if err != nil && err != io.EOF {
logResponse(r.errorLogger, "Write", err.Error(), item.Kind, item.Hash)
n, readErr := item.Rc.Read(buf)
if readErr != nil && readErr != io.EOF {
logResponse(r.errorLogger, "Write", readErr.Error(), item.Kind, item.Hash)
err := stream.CloseSend()
if err != nil {
logResponse(r.errorLogger, "Write", err.Error(), item.Kind, item.Hash)
}
return
}

// The ByteStream Write protocol requires finish_write=true on the
// last WriteRequest. We set it when the reader signals EOF, whether
// that comes with the last data chunk (n>0, readErr==io.EOF) or as
// a standalone termination (n==0, readErr==io.EOF).
finishWrite := readErr == io.EOF

if n > 0 {
rn := ""
if firstIteration {
Expand All @@ -184,15 +192,40 @@
}
req := &bs.WriteRequest{
ResourceName: rn,
WriteOffset: writeOffset,
Data: buf[:n],
FinishWrite: finishWrite,
}
err := stream.Send(req)
if err != nil {
if err := stream.Send(req); err != nil {
logResponse(r.errorLogger, "Write", err.Error(), item.Kind, item.Hash)
return
}
} else {
_, err = stream.CloseAndRecv()
writeOffset += int64(n)
}

if finishWrite {
if n == 0 {
// All data was sent in previous iterations without FinishWrite.
// io.Reader may return (n>0, nil) for the last chunk then
// (0, io.EOF) on the next call. Send a zero-data terminal
// message with the correct write_offset so the server sees
// finish_write=true at the right position.
rn := ""
if firstIteration {
firstIteration = false

Check failure on line 215 in cache/grpcproxy/grpcproxy.go

View workflow job for this annotation

GitHub Actions / golangci-lint

ineffectual assignment to firstIteration (ineffassign)
rn = resourceName
}
req := &bs.WriteRequest{
ResourceName: rn,
WriteOffset: writeOffset,
FinishWrite: true,
}
if err := stream.Send(req); err != nil {
logResponse(r.errorLogger, "Write", err.Error(), item.Kind, item.Hash)
return
}
}
_, err := stream.CloseAndRecv()
if err != nil {
logResponse(r.errorLogger, "Write", err.Error(), item.Kind, item.Hash)
return
Expand Down Expand Up @@ -264,9 +297,15 @@
// is enabled. We can treat them as AC in this scope
fallthrough
case cache.AC:
actionDigestSize := int64(-1)
if v := ctx.Value(cache.ActionDigestSizeBytesKey); v != nil {
if sz, ok := v.(int64); ok {
actionDigestSize = sz
}
}
digest := pb.Digest{
Hash: hash,
SizeBytes: -1,
SizeBytes: actionDigestSize,
}

req := &pb.GetActionResultRequest{ActionDigest: &digest}
Expand Down
4 changes: 4 additions & 0 deletions server/grpc_ac.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ func (s *grpcServer) GetActionResult(ctx context.Context,
// checked by the the disk cache.
const unknownActionResultSize = -1

// Thread the action digest size_bytes to the proxy via context; the
// Proxy.Get interface only carries the (unknown) ActionResult size.
ctx = context.WithValue(ctx, cache.ActionDigestSizeBytesKey, req.ActionDigest.SizeBytes)

if !s.depsCheck {
logPrefix = "GRPC AC GET NODEPSCHECK"

Expand Down
Loading