Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 94 additions & 5 deletions core/commands/get.go
Comment thread
ChayanDass marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ import (
gopath "path"
"path/filepath"
"strings"
"sync/atomic"

"github.com/ipfs/kubo/core/commands/cmdenv"
"github.com/ipfs/kubo/core/commands/cmdutils"
"github.com/ipfs/kubo/core/commands/e"

"github.com/cheggaaa/pb"
"github.com/dustin/go-humanize"
"github.com/ipfs/boxo/files"
"github.com/ipfs/boxo/tar"
cmds "github.com/ipfs/go-ipfs-cmds"
Expand Down Expand Up @@ -91,6 +93,11 @@ may also specify the level of compression by specifying '-l=<1-9>'.
return err
}

var numBlocks int64
if st, err := api.Dag().Stat(ctx, p); err == nil {
numBlocks = st.NumBlocks
}

res.SetLength(uint64(size))

archive, _ := req.Options[archiveOptionName].(bool)
Expand All @@ -116,7 +123,11 @@ may also specify the level of compression by specifying '-l=<1-9>'.
res.SetContentType("application/x-tar")
}

return res.Emit(reader)
return res.Emit(&getResponse{
Reader: reader,
RootCID: p.String(),
NumBlocks: numBlocks,
})
},
PostRun: cmds.PostRunMap{
cmds.CLI: func(res cmds.Response, re cmds.ResponseEmitter) error {
Expand All @@ -127,8 +138,16 @@ may also specify the level of compression by specifying '-l=<1-9>'.
return err
}

outReader, ok := v.(io.Reader)
if !ok {
var outReader io.Reader
var rootCID string
var numBlocks int64
if gr, ok := v.(*getResponse); ok {
outReader = gr.Reader
rootCID = gr.RootCID
numBlocks = gr.NumBlocks
} else if r, ok := v.(io.Reader); ok {
outReader = r
} else {
return e.New(e.TypeErr(outReader, v))
}

Expand All @@ -142,20 +161,42 @@ may also specify the level of compression by specifying '-l=<1-9>'.
archive, _ := req.Options[archiveOptionName].(bool)
progress, _ := req.Options[progressOptionName].(bool)

if progress && rootCID != "" {
cidOnly := rootCID
if i := strings.LastIndex(rootCID, "/"); i >= 0 {
cidOnly = rootCID[i+1:]
}
fmt.Fprintf(os.Stderr, "Fetching %s\n", cidOnly)
payloadSize := int64(res.Length())
if numBlocks > 0 {
fmt.Fprintf(os.Stderr, " Blocks: %d | Size: %s\n", numBlocks, humanize.IBytes(uint64(payloadSize)))
} else if payloadSize > 0 {
fmt.Fprintf(os.Stderr, " Size: %s\n", humanize.IBytes(uint64(payloadSize)))
}
fmt.Fprint(os.Stderr, "\n")
}

gw := getWriter{
Out: os.Stdout,
Err: os.Stderr,
Archive: archive,
Compression: cmplvl,
Size: int64(res.Length()),
Progress: progress,
NumBlocks: numBlocks,
}

return gw.Write(outReader, outPath)
},
},
}

type getResponse struct {
io.Reader
RootCID string
NumBlocks int64
}

type clearlineReader struct {
io.Reader
out io.Writer
Expand All @@ -164,8 +205,51 @@ type clearlineReader struct {
func (r *clearlineReader) Read(p []byte) (n int, err error) {
n, err = r.Reader.Read(p)
if err == io.EOF {
// callback
fmt.Fprintf(r.out, "\033[2K\r") // clear progress bar line on EOF
fmt.Fprint(r.out, "\033[2K\r")
}
return
}

type blockTrackingReader struct {
io.Reader
bar *pb.ProgressBar
totalBlocks int64
blocksRead int64 // atomic
bytesRead int64 // atomic
blockSize int64 // = totalSize / totalBlocks
}

func newBlockTrackingReader(r io.Reader, totalSize, totalBlocks int64, bar *pb.ProgressBar) *blockTrackingReader {
blockSize := int64(1)
if totalBlocks > 0 && totalSize > 0 {
blockSize = totalSize / totalBlocks
if blockSize < 1 {
blockSize = 1
}
}
return &blockTrackingReader{
Reader: r,
bar: bar,
totalBlocks: totalBlocks,
blockSize: blockSize,
}
}

func (r *blockTrackingReader) Read(p []byte) (n int, err error) {
n, err = r.Reader.Read(p)
if n > 0 && r.totalBlocks > 0 {
newBytes := atomic.AddInt64(&r.bytesRead, int64(n))
newBlocks := newBytes / r.blockSize
if newBlocks > r.totalBlocks {
newBlocks = r.totalBlocks
}
old := atomic.SwapInt64(&r.blocksRead, newBlocks)
if old != newBlocks {
r.bar.Prefix(fmt.Sprintf(" block %d / %d ", newBlocks, r.totalBlocks))
}
}
if err == io.EOF && r.totalBlocks > 0 {
r.bar.Prefix(fmt.Sprintf(" block %d / %d ", r.totalBlocks, r.totalBlocks))
}
return
}
Expand Down Expand Up @@ -210,6 +294,7 @@ type getWriter struct {
Compression int
Size int64
Progress bool
NumBlocks int64
}

func (gw *getWriter) Write(r io.Reader, fpath string) error {
Expand Down Expand Up @@ -258,6 +343,10 @@ func (gw *getWriter) writeExtracted(r io.Reader, fpath string) error {
var progressCb func(int64) int64
if gw.Progress {
bar := makeProgressBar(gw.Err, gw.Size)
if gw.NumBlocks > 0 {
bar.Prefix(fmt.Sprintf(" block 0 / %d ", gw.NumBlocks))
r = newBlockTrackingReader(r, gw.Size, gw.NumBlocks, bar)
}
bar.Start()
defer bar.Finish()
defer bar.Set64(gw.Size)
Expand Down
38 changes: 36 additions & 2 deletions core/coreapi/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@ package coreapi

import (
"context"
"fmt"

dag "github.com/ipfs/boxo/ipld/merkledag"
"github.com/ipfs/boxo/ipld/merkledag/traverse"
"github.com/ipfs/boxo/path"
pin "github.com/ipfs/boxo/pinning/pinner"
cid "github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
coreiface "github.com/ipfs/kubo/core/coreiface"
"github.com/ipfs/kubo/tracing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/ipfs/kubo/tracing"
ipld "github.com/ipfs/go-ipld-format"
)

type dagAPI struct {
Expand Down Expand Up @@ -68,6 +72,36 @@ func (api *dagAPI) Session(ctx context.Context) ipld.NodeGetter {
return dag.NewSession(ctx, api.DAGService)
}

func (api *dagAPI) Stat(ctx context.Context, p path.Path) (*coreiface.DagStatResult, error) {
rp, remainder, err := api.core.ResolvePath(ctx, p)
if err != nil {
return nil, err
}
if len(remainder) > 0 {
return nil, fmt.Errorf("cannot return size for anything other than a DAG with a root CID")
}
nodeGetter := dag.NewSession(ctx, api.DAGService)
obj, err := nodeGetter.Get(ctx, rp.RootCid())
if err != nil {
return nil, err
}
result := &coreiface.DagStatResult{}
err = traverse.Traverse(obj, traverse.Options{
DAG: nodeGetter,
Order: traverse.DFSPre,
Func: func(current traverse.State) error {
result.Size += uint64(len(current.Node.RawData()))
result.NumBlocks++
return nil
},
SkipDuplicates: true,
})
if err != nil {
return nil, fmt.Errorf("error traversing DAG: %w", err)
}
return result, nil
}

var (
_ ipld.DAGService = (*dagAPI)(nil)
_ dag.SessionMaker = (*dagAPI)(nil)
Expand Down
12 changes: 12 additions & 0 deletions core/coreiface/dag.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,25 @@
package iface

import (
"context"

"github.com/ipfs/boxo/path"
ipld "github.com/ipfs/go-ipld-format"
)

// DagStatResult is the result of DAG Stat: size and block count for a single root.
type DagStatResult struct {
NumBlocks int64
Size uint64
}

// APIDagService extends ipld.DAGService
type APIDagService interface {
ipld.DAGService

// Pinning returns special NodeAdder which recursively pins added nodes
Pinning() ipld.NodeAdder

// Stat walks the DAG from the given path and returns total size and block count.
Stat(ctx context.Context, p path.Path) (*DagStatResult, error)
}