Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/torrent/scheduler/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (m *stateMocks) newTorrent() storage.Torrent {
mi := core.MetaInfoFixture()

m.metainfoClient.EXPECT().
Download(_testNamespace, mi.Digest()).
Download(gomock.Any(), _testNamespace, mi.Digest()).
Return(mi, nil)

t, err := m.torrentArchive.CreateTorrent(_testNamespace, mi.Digest())
Expand Down
24 changes: 13 additions & 11 deletions lib/torrent/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"testing"
"time"

"github.com/golang/mock/gomock"

"github.com/uber/kraken/core"
"github.com/uber/kraken/lib/hashring"
"github.com/uber/kraken/lib/hostlist"
Expand Down Expand Up @@ -47,7 +49,7 @@ func TestDownloadTorrentWithSeederAndLeecher(t *testing.T) {
namespace := core.TagFixture()

mocks.metaInfoClient.EXPECT().Download(
namespace, blob.Digest).Return(blob.MetaInfo, nil).Times(2)
gomock.Any(), namespace, blob.Digest).Return(blob.MetaInfo, nil).Times(2)

seeder.writeTorrent(namespace, blob)
require.NoError(seeder.scheduler.Download(namespace, blob.Digest))
Expand All @@ -73,7 +75,7 @@ func TestDownloadManyTorrentsWithSeederAndLeecher(t *testing.T) {
blob := core.NewBlobFixture()

mocks.metaInfoClient.EXPECT().Download(
namespace, blob.Digest).Return(blob.MetaInfo, nil).Times(2)
gomock.Any(), namespace, blob.Digest).Return(blob.MetaInfo, nil).Times(2)

wg.Add(1)
go func() {
Expand Down Expand Up @@ -108,7 +110,7 @@ func TestDownloadManyTorrentsWithSeederAndManyLeechers(t *testing.T) {
blobs[i] = blob

mocks.metaInfoClient.EXPECT().Download(
namespace, blob.Digest).Return(blob.MetaInfo, nil).Times(6)
gomock.Any(), namespace, blob.Digest).Return(blob.MetaInfo, nil).Times(6)

seeder.writeTorrent(namespace, blob)
require.NoError(seeder.scheduler.Download(namespace, blob.Digest))
Expand Down Expand Up @@ -143,7 +145,7 @@ func TestDownloadTorrentWhenPeersAllHaveDifferentPiece(t *testing.T) {
blob := core.SizedBlobFixture(uint64(len(peers)*pieceLength), uint64(pieceLength))

mocks.metaInfoClient.EXPECT().Download(
namespace, blob.Digest).Return(blob.MetaInfo, nil).Times(len(peers))
gomock.Any(), namespace, blob.Digest).Return(blob.MetaInfo, nil).Times(len(peers))

var wg sync.WaitGroup
for i, p := range peers {
Expand Down Expand Up @@ -178,7 +180,7 @@ func TestSeederTTI(t *testing.T) {
namespace := core.TagFixture()

mocks.metaInfoClient.EXPECT().Download(
namespace, blob.Digest).Return(blob.MetaInfo, nil).Times(2)
gomock.Any(), namespace, blob.Digest).Return(blob.MetaInfo, nil).Times(2)

clk := clock.NewMock()
w := newEventWatcher()
Expand Down Expand Up @@ -230,7 +232,7 @@ func TestLeecherTTI(t *testing.T) {
blob := core.NewBlobFixture()
namespace := core.TagFixture()

mocks.metaInfoClient.EXPECT().Download(namespace, blob.Digest).Return(blob.MetaInfo, nil)
mocks.metaInfoClient.EXPECT().Download(gomock.Any(), namespace, blob.Digest).Return(blob.MetaInfo, nil)

p := mocks.newPeer(config, withEventLoop(w), withClock(clk))
errc := make(chan error)
Expand Down Expand Up @@ -260,7 +262,7 @@ func TestMultipleDownloadsForSameTorrentSucceed(t *testing.T) {

// Allow any number of downloads due to concurrency below.
mocks.metaInfoClient.EXPECT().Download(
namespace, blob.Digest).Return(blob.MetaInfo, nil).AnyTimes()
gomock.Any(), namespace, blob.Digest).Return(blob.MetaInfo, nil).AnyTimes()

config := configFixture()

Expand Down Expand Up @@ -319,7 +321,7 @@ func TestNetworkEvents(t *testing.T) {
namespace := core.TagFixture()

mocks.metaInfoClient.EXPECT().Download(
namespace, blob.Digest).Return(blob.MetaInfo, nil).Times(2)
gomock.Any(), namespace, blob.Digest).Return(blob.MetaInfo, nil).Times(2)

seeder.writeTorrent(namespace, blob)
require.NoError(seeder.scheduler.Download(namespace, blob.Digest))
Expand Down Expand Up @@ -373,7 +375,7 @@ func TestPullInactiveTorrent(t *testing.T) {
namespace := core.TagFixture()

mocks.metaInfoClient.EXPECT().Download(
namespace, blob.Digest).Return(blob.MetaInfo, nil).Times(2)
gomock.Any(), namespace, blob.Digest).Return(blob.MetaInfo, nil).Times(2)

seeder := mocks.newPeer(config)

Expand Down Expand Up @@ -407,7 +409,7 @@ func TestSchedulerReload(t *testing.T) {
blob := core.NewBlobFixture()

mocks.metaInfoClient.EXPECT().Download(
namespace, blob.Digest).Return(blob.MetaInfo, nil).Times(2)
gomock.Any(), namespace, blob.Digest).Return(blob.MetaInfo, nil).Times(2)

seeder.writeTorrent(namespace, blob)
require.NoError(seeder.scheduler.Download(namespace, blob.Digest))
Expand Down Expand Up @@ -440,7 +442,7 @@ func TestSchedulerRemoveTorrent(t *testing.T) {
namespace := core.TagFixture()

mocks.metaInfoClient.EXPECT().Download(
namespace, blob.Digest).Return(blob.MetaInfo, nil)
gomock.Any(), namespace, blob.Digest).Return(blob.MetaInfo, nil)

errc := make(chan error)
go func() { errc <- p.scheduler.Download(namespace, blob.Digest) }()
Expand Down
36 changes: 32 additions & 4 deletions lib/torrent/storage/agentstorage/torrent_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package agentstorage

import (
"context"
"fmt"
"os"

Expand All @@ -25,6 +26,11 @@ import (
"github.com/uber/kraken/lib/store/metadata"
"github.com/uber/kraken/lib/torrent/storage"
"github.com/uber/kraken/tracker/metainfoclient"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)

// TorrentArchive is capable of initializing torrents in the download directory
Expand All @@ -33,19 +39,25 @@ type TorrentArchive struct {
stats tally.Scope
cads *store.CADownloadStore
metaInfoClient metainfoclient.Client
tracer trace.Tracer
}

// NewTorrentArchive creates a new TorrentArchive.
func NewTorrentArchive(
stats tally.Scope,
cads *store.CADownloadStore,
mic metainfoclient.Client) *TorrentArchive {

mic metainfoclient.Client,
) *TorrentArchive {
stats = stats.Tagged(map[string]string{
"module": "agenttorrentarchive",
})

return &TorrentArchive{stats, cads, mic}
return &TorrentArchive{
stats: stats,
cads: cads,
metaInfoClient: mic,
tracer: otel.Tracer("kraken-agent-storage"),
}
}

// Stat returns TorrentInfo for the given digest. Returns os.ErrNotExist if the
Expand Down Expand Up @@ -74,15 +86,31 @@ func (a *TorrentArchive) Stat(namespace string, d core.Digest) (*storage.Torrent
func (a *TorrentArchive) CreateTorrent(namespace string, d core.Digest) (storage.Torrent, error) {
var tm metadata.TorrentMeta
if err := a.cads.Any().GetMetadata(d.Hex(), &tm); os.IsNotExist(err) {
ctx, span := a.tracer.Start(context.Background(), "agent.download_metainfo",
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(
attribute.String("component", "agent-storage"),
attribute.String("operation", "download_metainfo"),
attribute.String("namespace", namespace),
attribute.String("blob.digest", d.Hex()),
),
)
defer span.End()

downloadTimer := a.stats.Timer("metainfo_download").Start()
mi, err := a.metaInfoClient.Download(namespace, d)
mi, err := a.metaInfoClient.Download(ctx, namespace, d)
if err != nil {
if err == metainfoclient.ErrNotFound {
span.RecordError(err)
span.SetStatus(codes.Error, "metainfo not found")
return nil, storage.ErrNotFound
}
span.RecordError(err)
span.SetStatus(codes.Error, "download metainfo failed")
return nil, fmt.Errorf("download metainfo: %s", err)
}
downloadTimer.Stop()
span.SetStatus(codes.Ok, "metainfo downloaded")

// There's a race condition here, but it's "okay"... Basically, we could
// initialize a download file with metainfo that is rejected by file store,
Expand Down
12 changes: 6 additions & 6 deletions lib/torrent/storage/agentstorage/torrent_archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestTorrentArchiveStatBitfield(t *testing.T) {
blob := core.SizedBlobFixture(4, 1)
mi := blob.MetaInfo

mocks.metaInfoClient.EXPECT().Download(namespace, mi.Digest()).Return(mi, nil).Times(1)
mocks.metaInfoClient.EXPECT().Download(gomock.Any(), namespace, mi.Digest()).Return(mi, nil).Times(1)

tor, err := archive.CreateTorrent(namespace, mi.Digest())
require.NoError(err)
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestTorrentArchiveCreateTorrent(t *testing.T) {
mi := core.MetaInfoFixture()
namespace := core.TagFixture()

mocks.metaInfoClient.EXPECT().Download(namespace, mi.Digest()).Return(mi, nil)
mocks.metaInfoClient.EXPECT().Download(gomock.Any(), namespace, mi.Digest()).Return(mi, nil)

tor, err := archive.CreateTorrent(namespace, mi.Digest())
require.NoError(err)
Expand Down Expand Up @@ -135,7 +135,7 @@ func TestTorrentArchiveCreateTorrentNotFound(t *testing.T) {
mi := core.MetaInfoFixture()
namespace := core.TagFixture()

mocks.metaInfoClient.EXPECT().Download(namespace, mi.Digest()).Return(nil, metainfoclient.ErrNotFound)
mocks.metaInfoClient.EXPECT().Download(gomock.Any(), namespace, mi.Digest()).Return(nil, metainfoclient.ErrNotFound)

_, err := archive.CreateTorrent(namespace, mi.Digest())
require.Equal(storage.ErrNotFound, err)
Expand All @@ -152,7 +152,7 @@ func TestTorrentArchiveDeleteTorrent(t *testing.T) {
mi := core.MetaInfoFixture()
namespace := core.TagFixture()

mocks.metaInfoClient.EXPECT().Download(namespace, mi.Digest()).Return(mi, nil)
mocks.metaInfoClient.EXPECT().Download(gomock.Any(), namespace, mi.Digest()).Return(mi, nil)

tor, err := archive.CreateTorrent(namespace, mi.Digest())
require.NoError(err)
Expand All @@ -176,7 +176,7 @@ func TestTorrentArchiveConcurrentGet(t *testing.T) {
namespace := core.TagFixture()

// Allow any times for concurrency below.
mocks.metaInfoClient.EXPECT().Download(namespace, mi.Digest()).Return(mi, nil).AnyTimes()
mocks.metaInfoClient.EXPECT().Download(gomock.Any(), namespace, mi.Digest()).Return(mi, nil).AnyTimes()

var wg sync.WaitGroup
for i := 0; i < 50; i++ {
Expand Down Expand Up @@ -206,7 +206,7 @@ func TestTorrentArchiveGetTorrent(t *testing.T) {
_, err := archive.GetTorrent(namespace, mi.Digest())
require.Error(err)

mocks.metaInfoClient.EXPECT().Download(namespace, mi.Digest()).Return(mi, nil)
mocks.metaInfoClient.EXPECT().Download(gomock.Any(), namespace, mi.Digest()).Return(mi, nil)

_, err = archive.CreateTorrent(namespace, mi.Digest())
require.NoError(err)
Expand Down
8 changes: 4 additions & 4 deletions mocks/origin/blobclient/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions mocks/origin/blobclient/clusterclient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions mocks/tracker/metainfoclient/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions nginx/config/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ server {
access_log {{.access_log_path}};
error_log {{.error_log_path}};

proxy_set_header traceparent $http_traceparent;
proxy_set_header tracestate $http_tracestate;
proxy_set_header jaeger-debug-id $http_jaeger_debug_id;

{{healthEndpoint "tracker"}}

location / {
Expand Down
25 changes: 22 additions & 3 deletions origin/blobclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type Client interface {
Stat(namespace string, d core.Digest) (*core.BlobInfo, error)
StatLocal(namespace string, d core.Digest) (*core.BlobInfo, error)

GetMetaInfo(namespace string, d core.Digest) (*core.MetaInfo, error)
GetMetaInfo(ctx context.Context, namespace string, d core.Digest) (*core.MetaInfo, error)
OverwriteMetaInfo(d core.Digest, pieceLength int64) error

UploadBlob(ctx context.Context, namespace string, d core.Digest, blob io.Reader) error
Expand Down Expand Up @@ -312,24 +312,43 @@ func (c *HTTPClient) ReplicateToRemote(namespace string, d core.Digest, remoteDN
// (i.e. still downloading), returns a 202 httputil.StatusError, indicating that
// the request should be retried later. If no blob exists for d, returns a 404
// httputil.StatusError.
func (c *HTTPClient) GetMetaInfo(namespace string, d core.Digest) (*core.MetaInfo, error) {
func (c *HTTPClient) GetMetaInfo(ctx context.Context, namespace string, d core.Digest) (*core.MetaInfo, error) {
ctx, span := c.tracer.Start(ctx, "blobclient.get_metainfo",
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(
attribute.String("component", "origin-client"),
attribute.String("operation", "get_metainfo"),
attribute.String("namespace", namespace),
attribute.String("blob.digest", d.Hex()),
),
)
defer span.End()

r, err := httputil.Get(
fmt.Sprintf("http://%s/internal/namespace/%s/blobs/%s/metainfo",
c.addr, url.PathEscape(namespace), d),
httputil.SendTimeout(15*time.Second),
httputil.SendTLS(c.tls))
httputil.SendTLS(c.tls),
httputil.SendTracingContext(ctx))
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "get metainfo failed")
return nil, err
}
defer closers.Close(r.Body)
raw, err := io.ReadAll(r.Body)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "read body failed")
return nil, fmt.Errorf("read body: %s", err)
}
mi, err := core.DeserializeMetaInfo(raw)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "deserialize metainfo failed")
return nil, fmt.Errorf("deserialize metainfo: %s", err)
}
span.SetStatus(codes.Ok, "metainfo retrieved")
return mi, nil
}

Expand Down
Loading
Loading