Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 34 additions & 14 deletions img_tool/cmd/deploy/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,25 @@ func DeployWithExtras(ctx context.Context, rawRequest []byte, opts DeployOptions
}

vfsBuilder := deployvfs.NewBuilder(req).WithContainerRegistryOption(registry.WithAuthFromMultiKeychain())
vfsBuilder, err := configureBuilderFromEnv(vfsBuilder)
hasLazyStrategy := false
baseOps, err := req.BaseOperations()
if err != nil {
return fmt.Errorf("checking operations for lazy strategy: %w", err)
}
for _, op := range baseOps {
var strategy string
switch op.Command {
case "push":
strategy = req.Settings.PushStrategy
case "load":
strategy = req.Settings.LoadStrategy
}
if strategy == "lazy" {
hasLazyStrategy = true
break
}
}
vfsBuilder, err = configureBuilderFromEnv(vfsBuilder, hasLazyStrategy)
if err != nil {
return err
}
Expand Down Expand Up @@ -427,25 +445,27 @@ func credentialHelperInstance() credential.Helper {
return credential.NopHelper()
}

func configureBuilderFromEnv(builder *deployvfs.Builder) (*deployvfs.Builder, error) {
func configureBuilderFromEnv(builder *deployvfs.Builder, needsCAS bool) (*deployvfs.Builder, error) {
diskCachePath := os.Getenv("IMG_DISK_CACHE")
if diskCachePath != "" {
builder = builder.WithDiskCache(diskCachePath)
}

reapiEndpoint := os.Getenv("IMG_REAPI_ENDPOINT")
if reapiEndpoint != "" {
reapiInstanceName := os.Getenv("IMG_REAPI_INSTANCE_NAME")
credHelper := credentialHelperInstance()
grpcConn, err := protohelper.Client(reapiEndpoint, credHelper)
if err != nil {
return nil, fmt.Errorf("creating gRPC client for REAPI: %w", err)
}
casReader, err := cas.New(grpcConn, cas.WithInstanceName(reapiInstanceName))
if err != nil {
return nil, fmt.Errorf("creating CAS client: %w", err)
if needsCAS {
reapiEndpoint := os.Getenv("IMG_REAPI_ENDPOINT")
if reapiEndpoint != "" {
reapiInstanceName := os.Getenv("IMG_REAPI_INSTANCE_NAME")
credHelper := credentialHelperInstance()
grpcConn, err := protohelper.Client(reapiEndpoint, credHelper)
if err != nil {
return nil, fmt.Errorf("creating gRPC client for REAPI: %w", err)
}
casReader, err := cas.New(grpcConn, cas.WithInstanceName(reapiInstanceName))
if err != nil {
return nil, fmt.Errorf("creating CAS client: %w", err)
}
builder = builder.WithCASReader(casReader)
}
builder = builder.WithCASReader(casReader)
}

return builder, nil
Expand Down
5 changes: 4 additions & 1 deletion img_tool/cmd/deploy/persistentworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ type deployWorkerHandler struct {
func newDeployWorkerHandler(jobs int) *deployWorkerHandler {
baseBuilder := deployvfs.NewBuilder(api.DeployManifest{}).
WithContainerRegistryOption(registry.WithAuthFromMultiKeychain())
baseBuilder, err := configureBuilderFromEnv(baseBuilder)
// We set needsCAS to true unconditionally.
// The reason is that we just cannot know in advance whether a future work request
// wants to connect to the remote cache or not.
baseBuilder, err := configureBuilderFromEnv(baseBuilder, true /* needsCAS */)
if err != nil {
fmt.Fprintf(os.Stderr, "warning: failed to configure VFS from environment: %v\n", err)
}
Expand Down
99 changes: 64 additions & 35 deletions img_tool/pkg/deployvfs/deployvfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,9 +557,10 @@ func (b *Builder) layerBlob(operationIndex int, manifestIndex int, layerIndex in
// 2. explicit layer files (--layer flags)
// 3. runfiles tree
// 4. registry of base image (if base image is shallow, blob was marked as "missing blob" (exists remotely) and strategy allows it)
// 5. bazel disk cache (if configured via IMG_DISK_CACHE)
// 6. bazel remote cache (if configured via IMG_REAPI_ENDPOINT)
// 7. stub blob (cas_registry strategy where all blobs are assumed to already be in the remote CAS)
// 5. layer hints (local paths from BUILD_WORKSPACE_DIRECTORY, populated by lazy builds)
// 6. bazel disk cache (if configured via IMG_DISK_CACHE)
// 7. bazel remote cache (if configured via IMG_REAPI_ENDPOINT)
// 8. stub blob (cas_registry strategy where all blobs are assumed to already be in the remote CAS)

var sourceErrors []*BlobSourceError

Expand All @@ -583,6 +584,11 @@ func (b *Builder) layerBlob(operationIndex int, manifestIndex int, layerIndex in
} else {
sourceErrors = append(sourceErrors, err.(*BlobSourceError))
}
if entry, err := b.layerFromHints(desc); err == nil {
return entry, nil
} else {
sourceErrors = append(sourceErrors, err.(*BlobSourceError))
}
if entry, err := b.blobFromDiskCache(desc); err == nil {
return entry, nil
} else {
Expand Down Expand Up @@ -723,47 +729,50 @@ func (b *Builder) layerFromRegistry(pullInfo api.PullInfo, missingBlobs []string
return blobEntry{}, &BlobSourceError{Source: "base image registry", Digest: desc.Digest, Kind: BlobSourceBlobMissing, Message: "digest not in missing blobs list (layer is not from a shallow base image)"}
}

// layerFromCAS tries to find the layer using a two-step fallback strategy:
// 1. If layer hints are available, try to read from local paths in BUILD_WORKSPACE_DIRECTORY
// 2. Fall back to reading from the bazel remote cache
func (b *Builder) layerFromCAS(desc api.Descriptor) (blobEntry, error) {
if b.casReader == nil && b.layerHints == nil {
return blobEntry{}, &BlobSourceError{Source: "remote CAS", Digest: desc.Digest, Kind: BlobSourceUnconfigured, Message: "no CAS reader and no layer hints configured"}
// layerFromHints tries to find the layer from local paths provided by layer hints.
// Layer hints are local file paths from BUILD_WORKSPACE_DIRECTORY populated by lazy builds.
func (b *Builder) layerFromHints(desc api.Descriptor) (blobEntry, error) {
if b.layerHints == nil {
return blobEntry{}, &BlobSourceError{Source: "layer hints", Digest: desc.Digest, Kind: BlobSourceUnconfigured, Message: "no layer hints configured"}
}

// Get potential local paths from layer hints
var hintPaths []string
if b.layerHints != nil {
hintPaths = b.layerHints[desc.Digest]
hintPaths := b.layerHints[desc.Digest]
if len(hintPaths) == 0 {
return blobEntry{}, &BlobSourceError{Source: "layer hints", Digest: desc.Digest, Kind: BlobSourceBlobMissing, Message: "digest not in layer hints"}
}

stats := b.stats
return blobEntry{
Descriptor: desc,
Location: "remote_cache",
Location: "file",
stats: stats,
Opener: func() (io.ReadCloser, error) {
// First, try to open from local paths if we have hints
for _, localPath := range hintPaths {
if file, err := os.Open(localPath); err == nil {
// Successfully opened local file from layer hints
stats.BlobsFromLocalDisk.Add(1)
return file, nil
}
// If open failed, try the next path
}
return nil, fmt.Errorf("layer %s not found in any hint path", desc.Digest)
},
}, nil
}

// All local paths failed (or no hints), fall back to remote cache
casReader := b.casReader
if casReader == nil {
return nil, fmt.Errorf("blob with digest %s not found in local hints and no remote cache configured", desc.Digest)
}
// layerFromCAS tries to find the layer in the bazel remote cache.
func (b *Builder) layerFromCAS(desc api.Descriptor) (blobEntry, error) {
if b.casReader == nil {
return blobEntry{}, &BlobSourceError{Source: "remote CAS", Digest: desc.Digest, Kind: BlobSourceUnconfigured, Message: "no CAS reader configured"}
}
stats := b.stats
return blobEntry{
Descriptor: desc,
Location: "remote_cache",
stats: stats,
Opener: func() (io.ReadCloser, error) {
digest, err := digestFromDescriptor(desc)
if err != nil {
return nil, err
}
stats.BlobsFromRemoteCache.Add(1)
return casReader.ReaderForBlob(context.TODO(), digest)
return b.casReader.ReaderForBlob(context.TODO(), digest)
},
}, nil
}
Expand All @@ -786,33 +795,49 @@ type blobEntry struct {
}

// resolveManifestBlob resolves a manifest or index blob from available sources.
// Priority: OCI layouts → disk cache → remote CAS → runfiles sparse layout path.
// Priority: OCI layouts → runfiles sparse layout path → disk cache → remote CAS.
func (b *Builder) resolveManifestBlob(operationIndex int, desc api.Descriptor) blobEntry {
if entry, err := b.blobFromOCILayouts(desc); err == nil {
return entry
}
if entry, err := b.blobFromRunfilesSparseLayout(operationIndex, desc); err == nil {
return entry
}
if entry, err := b.blobFromDiskCache(desc); err == nil {
return entry
}
if entry, err := b.blobFromCAS(desc); err == nil {
return entry
}
return b.blobFromRunfilesSparseLayout(operationIndex, desc)
return blobEntry{
Descriptor: desc,
Opener: func() (io.ReadCloser, error) {
return nil, fmt.Errorf("manifest blob %s not found in any source (OCI layouts, runfiles, disk cache, remote CAS)", desc.Digest)
},
}
}

// resolveConfigBlob resolves a config blob from available sources.
// Priority: OCI layouts → disk cache → remote CAS → runfiles sparse layout path.
// Priority: OCI layouts → runfiles sparse layout path → disk cache → remote CAS.
func (b *Builder) resolveConfigBlob(operationIndex int, desc api.Descriptor) blobEntry {
if entry, err := b.blobFromOCILayouts(desc); err == nil {
return entry
}
if entry, err := b.blobFromRunfilesSparseLayout(operationIndex, desc); err == nil {
return entry
}
if entry, err := b.blobFromDiskCache(desc); err == nil {
return entry
}
if entry, err := b.blobFromCAS(desc); err == nil {
return entry
}
return b.blobFromRunfilesSparseLayout(operationIndex, desc)
return blobEntry{
Descriptor: desc,
Opener: func() (io.ReadCloser, error) {
return nil, fmt.Errorf("config blob %s not found in any source (OCI layouts, runfiles, disk cache, remote CAS)", desc.Digest)
},
}
}

// blobFromCAS tries to resolve a blob from the Bazel remote cache.
Expand Down Expand Up @@ -892,21 +917,25 @@ func (b *Builder) blobFromOCILayouts(desc api.Descriptor) (blobEntry, error) {
}

// blobFromRunfilesSparseLayout resolves a blob from the runfiles sparse layout tree.
func (b *Builder) blobFromRunfilesSparseLayout(operationIndex int, desc api.Descriptor) blobEntry {
func (b *Builder) blobFromRunfilesSparseLayout(operationIndex int, desc api.Descriptor) (blobEntry, error) {
runfilesPath := sparseLayoutBlobPath(operationIndex, desc.Digest)
fpath, err := b.rlocation(runfilesPath)
if err != nil {
return blobEntry{}, &BlobSourceError{Source: "runfiles", Digest: desc.Digest, Kind: BlobSourceOther, Message: fmt.Sprintf("rlocation(%s)", runfilesPath), Err: err}
}
if _, err := os.Stat(fpath); err != nil {
return blobEntry{}, &BlobSourceError{Source: "runfiles", Digest: desc.Digest, Kind: BlobSourceBlobMissing, Message: fpath, Err: err}
}
stats := b.stats
return blobEntry{
Descriptor: desc,
Location: "file",
stats: stats,
Opener: func() (io.ReadCloser, error) {
fpath, err := b.rlocation(sparseLayoutBlobPath(operationIndex, desc.Digest))
if err != nil {
return nil, err
}
stats.BlobsFromLocalDisk.Add(1)
return os.Open(fpath)
},
}
}, nil
}

// sparseLayoutBlobPathInDir returns the absolute path to a blob within a sparse layout directory.
Expand Down
Loading