From beb9bf4f42d7ff4b642da993a5f7418069b03cf9 Mon Sep 17 00:00:00 2001 From: hweawer Date: Wed, 28 Jan 2026 10:33:04 +0100 Subject: [PATCH] feat: Add ctx to the origin prefetch client --- mocks/origin/blobclient/client.go | 8 +++--- mocks/origin/blobclient/clusterclient.go | 8 +++--- origin/blobclient/client.go | 21 +++++++++++++--- origin/blobclient/cluster_client.go | 32 ++++++++++++++++++++---- origin/blobserver/server_test.go | 2 +- proxy/proxyserver/prefetch.go | 4 +-- proxy/proxyserver/server_test.go | 8 +++--- 7 files changed, 59 insertions(+), 24 deletions(-) diff --git a/mocks/origin/blobclient/client.go b/mocks/origin/blobclient/client.go index 26fd496f9..bc72b2c71 100644 --- a/mocks/origin/blobclient/client.go +++ b/mocks/origin/blobclient/client.go @@ -187,17 +187,17 @@ func (mr *MockClientMockRecorder) OverwriteMetaInfo(d, pieceLength any) *gomock. } // PrefetchBlob mocks base method. -func (m *MockClient) PrefetchBlob(namespace string, d core.Digest) error { +func (m *MockClient) PrefetchBlob(ctx context.Context, namespace string, d core.Digest) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PrefetchBlob", namespace, d) + ret := m.ctrl.Call(m, "PrefetchBlob", ctx, namespace, d) ret0, _ := ret[0].(error) return ret0 } // PrefetchBlob indicates an expected call of PrefetchBlob. -func (mr *MockClientMockRecorder) PrefetchBlob(namespace, d any) *gomock.Call { +func (mr *MockClientMockRecorder) PrefetchBlob(ctx, namespace, d any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PrefetchBlob", reflect.TypeOf((*MockClient)(nil).PrefetchBlob), namespace, d) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PrefetchBlob", reflect.TypeOf((*MockClient)(nil).PrefetchBlob), ctx, namespace, d) } // ReplicateToRemote mocks base method. diff --git a/mocks/origin/blobclient/clusterclient.go b/mocks/origin/blobclient/clusterclient.go index 0d48e600b..f44ecd38c 100644 --- a/mocks/origin/blobclient/clusterclient.go +++ b/mocks/origin/blobclient/clusterclient.go @@ -115,17 +115,17 @@ func (mr *MockClusterClientMockRecorder) Owners(d any) *gomock.Call { } // PrefetchBlob mocks base method. -func (m *MockClusterClient) PrefetchBlob(namespace string, d core.Digest) error { +func (m *MockClusterClient) PrefetchBlob(ctx context.Context, namespace string, d core.Digest) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PrefetchBlob", namespace, d) + ret := m.ctrl.Call(m, "PrefetchBlob", ctx, namespace, d) ret0, _ := ret[0].(error) return ret0 } // PrefetchBlob indicates an expected call of PrefetchBlob. -func (mr *MockClusterClientMockRecorder) PrefetchBlob(namespace, d any) *gomock.Call { +func (mr *MockClusterClientMockRecorder) PrefetchBlob(ctx, namespace, d any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PrefetchBlob", reflect.TypeOf((*MockClusterClient)(nil).PrefetchBlob), namespace, d) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PrefetchBlob", reflect.TypeOf((*MockClusterClient)(nil).PrefetchBlob), ctx, namespace, d) } // ReplicateToRemote mocks base method. diff --git a/origin/blobclient/client.go b/origin/blobclient/client.go index 679cbf31c..7e05cc024 100644 --- a/origin/blobclient/client.go +++ b/origin/blobclient/client.go @@ -60,7 +60,7 @@ type Client interface { DuplicateUploadBlob(namespace string, d core.Digest, blob io.Reader, delay time.Duration) error DownloadBlob(ctx context.Context, namespace string, d core.Digest, dst io.Writer) error - PrefetchBlob(namespace string, d core.Digest) error + PrefetchBlob(ctx context.Context, namespace string, d core.Digest) error ReplicateToRemote(namespace string, d core.Digest, remoteDNS string) error @@ -270,15 +270,30 @@ func (c *HTTPClient) DownloadBlob(ctx context.Context, namespace string, d core. // PrefetchBlob is an asynchronous, idempotent operation that preheats the origin's cache with the given blob. // If the blob is not present, it is downloaded asynchronously. If the blob is present, this is a no-op. -func (c *HTTPClient) PrefetchBlob(namespace string, d core.Digest) error { +func (c *HTTPClient) PrefetchBlob(ctx context.Context, namespace string, d core.Digest) error { + ctx, span := c.tracer.Start(ctx, "blobclient.prefetch_blob", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("component", "origin-client"), + attribute.String("operation", "prefetch_blob"), + attribute.String("namespace", namespace), + attribute.String("blob.digest", d.Hex()), + ), + ) + defer span.End() + r, err := httputil.Post( fmt.Sprintf("http://%s/namespace/%s/blobs/%s/prefetch", c.addr, url.PathEscape(namespace), d), httputil.SendAcceptedCodes(http.StatusOK, http.StatusAccepted), - httputil.SendTLS(c.tls)) + httputil.SendTLS(c.tls), + httputil.SendTracingContext(ctx)) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "prefetch failed") return err } defer closers.Close(r.Body) + span.SetStatus(codes.Ok, "prefetch triggered") return nil } diff --git a/origin/blobclient/cluster_client.go b/origin/blobclient/cluster_client.go index 6caaaf488..50230ef8c 100644 --- a/origin/blobclient/cluster_client.go +++ b/origin/blobclient/cluster_client.go @@ -90,7 +90,7 @@ type ClusterClient interface { CheckReadiness() error UploadBlob(ctx context.Context, namespace string, d core.Digest, blob io.ReadSeeker) error DownloadBlob(ctx context.Context, namespace string, d core.Digest, dst io.Writer) error - PrefetchBlob(namespace string, d core.Digest) error + PrefetchBlob(ctx context.Context, namespace string, d core.Digest) error GetMetaInfo(namespace string, d core.Digest) (*core.MetaInfo, error) Stat(namespace string, d core.Digest) (*core.BlobInfo, error) OverwriteMetaInfo(d core.Digest, pieceLength int64) error @@ -278,27 +278,49 @@ func (c *clusterClient) DownloadBlob(ctx context.Context, namespace string, d co // PrefetchBlob preheats a blob in the origin cluster for downloading. // Check [Client].PrefetchBlob's comment for more info. -func (c *clusterClient) PrefetchBlob(namespace string, d core.Digest) error { +func (c *clusterClient) PrefetchBlob(ctx context.Context, namespace string, d core.Digest) error { + ctx, span := otel.Tracer("kraken-origin-cluster").Start(ctx, "cluster.prefetch_blob", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("component", "origin-cluster-client"), + attribute.String("namespace", namespace), + attribute.String("blob.digest", d.Hex()), + ), + ) + defer span.End() + + logger := log.WithTraceContext(ctx).With("namespace", namespace, "digest", d.Hex()) + logger.Debug("Starting blob prefetch from origin cluster") + clients, err := c.resolver.Resolve(d) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "resolve clients failed") return fmt.Errorf("resolve clients: %w", err) } var errs []error for _, client := range clients { - err = client.PrefetchBlob(namespace, d) + err = client.PrefetchBlob(ctx, namespace, d) if err == nil { + span.SetStatus(codes.Ok, "prefetch completed") return nil } if httputil.IsNotFound(err) { // no need to iterate over other origins - return ErrBlobNotFound + err = ErrBlobNotFound + span.RecordError(err) + span.SetStatus(codes.Error, "blob not found") + return err } errs = append(errs, err) } - return fmt.Errorf("all origins unavailable: %w", errors.Join(errs...)) + err = fmt.Errorf("all origins unavailable: %w", errors.Join(errs...)) + span.RecordError(err) + span.SetStatus(codes.Error, "prefetch failed") + return err } // Owners returns the origin peers which own d. diff --git a/origin/blobserver/server_test.go b/origin/blobserver/server_test.go index 9ccc0ae2f..af937ef04 100644 --- a/origin/blobserver/server_test.go +++ b/origin/blobserver/server_test.go @@ -209,7 +209,7 @@ func TestPrefetchHandler(t *testing.T) { ensureHasBlob(t, cp.Provide(s.host), namespace, blob) - err := cp.Provide(master1).PrefetchBlob(namespace, blob.Digest) + err := cp.Provide(master1).PrefetchBlob(context.Background(), namespace, blob.Digest) require.NoError(err) } diff --git a/proxy/proxyserver/prefetch.go b/proxy/proxyserver/prefetch.go index 6dbba6a3b..111da1d58 100644 --- a/proxy/proxyserver/prefetch.go +++ b/proxy/proxyserver/prefetch.go @@ -476,8 +476,6 @@ func (ph *PrefetchHandler) triggerPrefetchBlobs(input *prefetchInput) error { ) defer span.End() - _ = ctx // PrefetchBlob doesn't accept context yet - var wg sync.WaitGroup var mu sync.Mutex var errList []error @@ -490,7 +488,7 @@ func (ph *PrefetchHandler) triggerPrefetchBlobs(input *prefetchInput) error { wg.Add(1) go func(digest core.Digest) { defer wg.Done() - err := ph.clusterClient.PrefetchBlob(input.namespace, digest) + err := ph.clusterClient.PrefetchBlob(ctx, input.namespace, digest) if err != nil { mu.Lock() errList = append(errList, fmt.Errorf("digest %q, namespace %q, blob prefetch failure: %w", digest, input.namespace, err)) diff --git a/proxy/proxyserver/server_test.go b/proxy/proxyserver/server_test.go index 4cdbd0f4a..8c1cae131 100644 --- a/proxy/proxyserver/server_test.go +++ b/proxy/proxyserver/server_test.go @@ -234,8 +234,8 @@ func TestPrefetchV2(t *testing.T) { mocks.tagClient.EXPECT().Get(tagRequest).Return(manifest, nil) mocks.originClient.EXPECT().DownloadBlob(gomock.Any(), namespace, manifest, mockutil.MatchWriter(bs)).Return(nil) - mocks.originClient.EXPECT().PrefetchBlob(namespace, layers[1]).Return(nil) - mocks.originClient.EXPECT().PrefetchBlob(namespace, layers[2]).Return(nil) + mocks.originClient.EXPECT().PrefetchBlob(gomock.Any(), namespace, layers[1]).Return(nil) + mocks.originClient.EXPECT().PrefetchBlob(gomock.Any(), namespace, layers[2]).Return(nil) res, err := httputil.Post( fmt.Sprintf("http://%s/proxy/v2/registry/prefetch", addr), httputil.SendBody(bytes.NewReader(b))) @@ -278,8 +278,8 @@ func TestPrefetchV2OriginError(t *testing.T) { mocks.tagClient.EXPECT().Get(tagRequest).Return(manifest, nil) mocks.originClient.EXPECT().DownloadBlob(gomock.Any(), namespace, manifest, mockutil.MatchWriter(bs)).Return(nil) - mocks.originClient.EXPECT().PrefetchBlob(namespace, layers[1]).Return(errors.New("foo err")) - mocks.originClient.EXPECT().PrefetchBlob(namespace, layers[2]).Return(nil) + mocks.originClient.EXPECT().PrefetchBlob(gomock.Any(), namespace, layers[1]).Return(errors.New("foo err")) + mocks.originClient.EXPECT().PrefetchBlob(gomock.Any(), namespace, layers[2]).Return(nil) _, err := httputil.Post( fmt.Sprintf("http://%s/proxy/v2/registry/prefetch", addr), httputil.SendBody(bytes.NewReader(b)))