Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use_repo(
"com_github_aws_aws_sdk_go_v2_service_sts",
"com_github_bazelbuild_buildtools",
"com_github_bazelbuild_remote_apis",
"com_github_buildbarn_go_cdc",
"com_github_buildbarn_go_sha256tree",
"com_github_fxtlabs_primes",
"com_github_go_jose_go_jose_v3",
Expand Down
4 changes: 2 additions & 2 deletions cmd/bb_copy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func main() {

grpcClientFactory := grpc.NewBaseClientFactory(grpc.BaseClientDialer, nil, nil, nil)

blobAccessCreator := blobstore_configuration.NewCASBlobAccessCreator(
blobAccessCreator := blobstore_configuration.NewCSBlobAccessCreator(
grpcClientFactory,
int(configuration.MaximumMessageSizeBytes),
bb_zstd.NewPoolFromConfiguration(nil),
Expand All @@ -68,7 +68,7 @@ func main() {
configuration.Replicator,
source.BlobAccess,
sink,
blobstore_configuration.NewCASBlobReplicatorCreator(grpcClientFactory),
blobstore_configuration.NewCSBlobReplicatorCreator(grpcClientFactory),
)
if err != nil {
return util.StatusWrap(err, "Failed to create replicator")
Expand Down
4 changes: 2 additions & 2 deletions cmd/bb_replicator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func main() {
return util.StatusWrap(err, "Failed to apply global configuration options")
}

blobAccessCreator := blobstore_configuration.NewCASBlobAccessCreator(
blobAccessCreator := blobstore_configuration.NewCSBlobAccessCreator(
grpcClientFactory,
int(configuration.MaximumMessageSizeBytes),
bb_zstd.NewPoolFromConfiguration(nil),
Expand All @@ -59,7 +59,7 @@ func main() {
configuration.Replicator,
source.BlobAccess,
sink,
blobstore_configuration.NewCASBlobReplicatorCreator(grpcClientFactory),
blobstore_configuration.NewCSBlobReplicatorCreator(grpcClientFactory),
)
if err != nil {
return util.StatusWrap(err, "Failed to create replicator")
Expand Down
4 changes: 4 additions & 0 deletions cmd/bb_storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@ go_library(
"//pkg/auth",
"//pkg/auth/configuration",
"//pkg/blobstore",
"//pkg/blobstore/cdc",
"//pkg/blobstore/configuration",
"//pkg/blobstore/grpcservers",
"//pkg/builder",
"//pkg/capabilities",
"//pkg/clock",
"//pkg/digest",
"//pkg/eviction",
"//pkg/global",
"//pkg/grpc",
"//pkg/program",
Expand Down
88 changes: 75 additions & 13 deletions cmd/bb_storage/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@ import (
"github.com/buildbarn/bb-storage/pkg/auth"
auth_configuration "github.com/buildbarn/bb-storage/pkg/auth/configuration"
"github.com/buildbarn/bb-storage/pkg/blobstore"
"github.com/buildbarn/bb-storage/pkg/blobstore/cdc"
blobstore_configuration "github.com/buildbarn/bb-storage/pkg/blobstore/configuration"
"github.com/buildbarn/bb-storage/pkg/blobstore/grpcservers"
"github.com/buildbarn/bb-storage/pkg/builder"
"github.com/buildbarn/bb-storage/pkg/capabilities"
"github.com/buildbarn/bb-storage/pkg/clock"
"github.com/buildbarn/bb-storage/pkg/digest"
"github.com/buildbarn/bb-storage/pkg/eviction"
"github.com/buildbarn/bb-storage/pkg/global"
bb_grpc "github.com/buildbarn/bb-storage/pkg/grpc"
"github.com/buildbarn/bb-storage/pkg/program"
Expand Down Expand Up @@ -54,34 +58,88 @@ func main() {
var cacheCapabilitiesAuthorizers []auth.Authorizer

// Content Addressable Storage (CAS).
var contentAddressableStorageInfo *blobstore_configuration.BlobAccessInfo
var contentAddressableStorage blobstore.BlobAccess
var contentAddressableStorageKeyFormat digest.KeyFormat
var chunkListStorage blobstore.BlobAccess
if configuration.ContentAddressableStorage != nil {
info, authorizedBackend, allAuthorizers, err := newScannableBlobAccess(
casConfiguration := configuration.ContentAddressableStorage
if casConfiguration.ChunkStorage == nil {
return status.Error(codes.InvalidArgument, "The Chunk Storage is a mandatory part of the Content Addressable Storage.")
}
if casConfiguration.ChunkListStorage == nil {
return status.Error(codes.InvalidArgument, "The Chunk List Storage is a mandatory part of the Content Addressable Storage.")
}
Comment on lines +66 to +71

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really a necessary requirement? I can imagine that for frontends that implement the full REv2 API it's necessary that both are provided. But for individual shards of our storage backends there is no requirement that each node provides both a CS and CLS.


var parameterCache *cdc.TTLCache[cdc.Parameters]
if casConfiguration.ContentDefinedChunkingParameterCache != nil {
parameterCacheConfiguraiton := casConfiguration.ContentDefinedChunkingParameterCache

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: Configuraiton

evictionSet, err := eviction.NewSetFromConfiguration[string](parameterCacheConfiguraiton.CacheReplacementPolicy)
if err != nil {
return err
}
parameterCache = cdc.NewTTLCache[cdc.Parameters](

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is a cache that maps instance names to CDC parameters, right? I don't think it's correct to create such a cache globally. If you use DemultiplexingBlobAccess, you can rewrite instance names. Wouldn't that lead to potential collisions?

Also in the case of centralized storage nodes I don't think it makes sense to have any caching of CDC parameters. There's nothing to cache, as the storage node would just a single configuration globally. It's only the gRPC client backend that needs a cache.

Maybe better to just extend BlobAccess to have a new GetCDCParameters() method or something? Then let individual storage backends be responsible for caching this information (or not).

clock.SystemClock,
evictionSet,
int(parameterCacheConfiguraiton.GetCacheSize()),
parameterCacheConfiguraiton.CacheDuration.AsDuration(),

@EdSchouten EdSchouten Jun 22, 2026

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing .CacheDuration.CheckValid()?

)
}

// Create the Chunk Storage (CS).
chunkStorageInfo, chunkStorage, allAuthorizers, err := newScannableBlobAccess(
dependenciesGroup,
configuration.ContentAddressableStorage,
blobstore_configuration.NewCASBlobAccessCreator(
casConfiguration.ChunkStorage,
blobstore_configuration.NewCSBlobAccessCreator(
grpcClientFactory,
int(configuration.MaximumMessageSizeBytes),
zstdPool,
),
grpcClientFactory,
)
if err != nil {
return util.StatusWrap(err, "Failed to create Content Addressable Storage")
return util.StatusWrap(err, "Failed to create Content Addressable Storage: Failed to create Chunk Storage")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Failed to create Chunk Storage for Content Addressable Storage"?

}
cacheCapabilitiesProviders = append(
cacheCapabilitiesProviders,
info.BlobAccess,
chunkStorageInfo.BlobAccess,
capabilities.NewStaticProvider(&remoteexecution.ServerCapabilities{
CacheCapabilities: &remoteexecution.CacheCapabilities{
SupportedCompressors: configuration.SupportedCompressors,
},
}),
)
cacheCapabilitiesAuthorizers = append(cacheCapabilitiesAuthorizers, allAuthorizers...)
contentAddressableStorageInfo = &info
contentAddressableStorage = authorizedBackend

// Create the Chunk List Storage (CLS).
chunkListStorageInfo, authorizedChunkListStorage, allAuthorizers, err := newScannableBlobAccess(
dependenciesGroup,
casConfiguration.ChunkListStorage,
blobstore_configuration.NewCLSBlobAccessCreator(
&chunkStorageInfo,
grpcClientFactory,
int(configuration.MaximumMessageSizeBytes),
parameterCache,
),
grpcClientFactory,
)
if err != nil {
return util.StatusWrap(err, "Failed to create Content Addressable Storage: Failed to create Chunk List Storage")
}
chunkListStorage = authorizedChunkListStorage
cacheCapabilitiesProviders = append(cacheCapabilitiesProviders, chunkListStorageInfo.BlobAccess)
cacheCapabilitiesAuthorizers = append(cacheCapabilitiesAuthorizers, allAuthorizers...)

cdcParameterProvider := cdc.NewParameterProviderFromCapabilitiesProvider(
authorizedChunkListStorage,
int(configuration.MaximumMessageSizeBytes),
)

if parameterCache != nil {
cdcParameterProvider = cdc.NewCachingParameterProvider(cdcParameterProvider, parameterCache)
}

contentAddressableStorage = cdc.NewCasChunkingBlobAccess(chunkStorage, authorizedChunkListStorage, cdcParameterProvider, int(configuration.MaximumMessageSizeBytes))
contentAddressableStorageKeyFormat = chunkStorageInfo.DigestKeyFormat.Combine(chunkListStorageInfo.DigestKeyFormat)
}

// Action Cache (AC).
Expand All @@ -91,7 +149,8 @@ func main() {
dependenciesGroup,
configuration.ActionCache,
blobstore_configuration.NewACBlobAccessCreator(
contentAddressableStorageInfo,
contentAddressableStorage,
contentAddressableStorageKeyFormat,
grpcClientFactory,
int(configuration.MaximumMessageSizeBytes),
),
Expand Down Expand Up @@ -193,18 +252,21 @@ func main() {
configuration.GrpcServers,
func(s grpc.ServiceRegistrar) {
if contentAddressableStorage != nil {
contentAddressableStorageServer := grpcservers.NewContentAddressableStorageServer(
contentAddressableStorage,
chunkListStorage,
configuration.MaximumMessageSizeBytes,
)
remoteexecution.RegisterContentAddressableStorageServer(
s,
grpcservers.NewContentAddressableStorageServer(
contentAddressableStorage,
configuration.MaximumMessageSizeBytes,
),
contentAddressableStorageServer,
)
bytestream.RegisterByteStreamServer(
s,
grpcservers.NewByteStreamServer(
contentAddressableStorage,
1<<16,
int(configuration.MaximumMessageSizeBytes),
zstdPool,
),
)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ require (
github.com/bazelbuild/buildtools v0.0.0-20260527135131-3b47c424ecf5
github.com/bazelbuild/remote-apis v0.0.0-20260331222004-becdd8f9ff81
github.com/bazelbuild/rules_go v0.60.0
github.com/buildbarn/go-cdc v0.0.9
github.com/buildbarn/go-sha256tree v0.0.0-20250310211320-0f70f20e855b
github.com/fxtlabs/primes v0.0.0-20150821004651-dad82d10a449
github.com/go-jose/go-jose/v3 v3.0.5
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ github.com/bazelbuild/rules_go v0.60.0/go.mod h1:CYcohJVxs4n7eftbC39GCqaEJm3E1EM
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/buildbarn/go-cdc v0.0.9 h1:bWfgn92ed8Oo2zZKJdMAfB0APGz7Q8zvnqUn3hPuihM=
github.com/buildbarn/go-cdc v0.0.9/go.mod h1:KUMqSMvoRlby3uak9aKIvgz3KgNqwm2CMUoVX1EDr8k=
github.com/buildbarn/go-sha256tree v0.0.0-20250310211320-0f70f20e855b h1:IKUxixGBm9UxobU7c248z0BF0ojG19uoSLz8MFZM/KA=
github.com/buildbarn/go-sha256tree v0.0.0-20250310211320-0f70f20e855b/go.mod h1:e7g3/yWApcg+PpDqd4eQEEV8pexQmfCgK3frP+1Wuvk=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
Expand Down
1 change: 1 addition & 0 deletions pkg/blobstore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"authorizing_blob_access.go",
"blob_access.go",
"cas_read_buffer_factory.go",
"cls_read_buffer_factory.go",
"deadline_enforcing_blob_access.go",
"demultiplexing_blob_access.go",
"empty_blob_injecting_blob_access.go",
Expand Down
3 changes: 3 additions & 0 deletions pkg/blobstore/buffer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_library(
srcs = [
"buffer.go",
"cas_buffer_with_background_task.go",
"cas_chunk_concatenating_buffer.go",
"cas_chunk_reader_buffer.go",
"cas_cloned_buffer.go",
"cas_error_handling_buffer.go",
Expand Down Expand Up @@ -44,6 +45,7 @@ go_library(
go_test(
name = "buffer_test",
srcs = [
"buffer_benchmark_test.go",
"cas_buffer_with_background_task_test.go",
"error_handler_test.go",
"example_test.go",
Expand All @@ -60,6 +62,7 @@ go_test(
deps = [
":buffer",
"//internal/mock",
"//pkg/blobstore",
"//pkg/digest",
"//pkg/testutil",
"@bazel_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto",
Expand Down
Loading