Skip to content
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions core/commands/dag/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ type DagStat struct {
Cid cid.Cid
Size uint64 `json:",omitempty"`
NumBlocks int64 `json:",omitempty"`
NumFiles int64 `json:",omitempty"` // UnixFS file nodes (actual files, including nested)
}

func (s *DagStat) String() string {
Expand Down Expand Up @@ -349,13 +350,15 @@ type DagStatSummary struct {
TotalSize uint64 `json:",omitempty"`
SharedSize uint64 `json:",omitempty"`
Ratio float32 `json:",omitempty"`
NumFiles int64 `json:",omitempty"` // total UnixFS file count across all DAGs
DagStatsArray []*DagStat `json:"DagStats,omitempty"`
}

func (s *DagStatSummary) String() string {
return fmt.Sprintf("Total Size: %d (%s)\nUnique Blocks: %d\nShared Size: %d (%s)\nRatio: %f",
return fmt.Sprintf("Total Size: %d (%s)\nUnique Blocks: %d\nTotal Files: %d\nShared Size: %d (%s)\nRatio: %f",
s.TotalSize, humanize.Bytes(s.TotalSize),
s.UniqueBlocks,
s.NumFiles,
s.SharedSize, humanize.Bytes(s.SharedSize),
s.Ratio)
}
Expand Down Expand Up @@ -405,15 +408,17 @@ Note: This command skips duplicate blocks in reporting both size and the number
csvWriter := csv.NewWriter(w)
csvWriter.Comma = '\t'
cidSpacing := len(event.DagStatsArray[0].Cid.String())
header := []string{fmt.Sprintf("%-*s", cidSpacing, "CID"), fmt.Sprintf("%-15s", "Blocks"), "Size"}
header := []string{fmt.Sprintf("%-*s", cidSpacing, "CID"), fmt.Sprintf("%-15s", "Blocks"), fmt.Sprintf("%-10s", "Files"), "Size"}
if err := csvWriter.Write(header); err != nil {
return err
}
for _, dagStat := range event.DagStatsArray {
numBlocksStr := fmt.Sprint(dagStat.NumBlocks)
numFilesStr := fmt.Sprint(dagStat.NumFiles)
err := csvWriter.Write([]string{
dagStat.Cid.String(),
fmt.Sprintf("%-15s", numBlocksStr),
fmt.Sprintf("%-10s", numFilesStr),
fmt.Sprint(dagStat.Size),
})
if err != nil {
Expand Down
50 changes: 40 additions & 10 deletions core/commands/dag/stat.go
Comment thread
ChayanDass marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/dustin/go-humanize"
mdag "github.com/ipfs/boxo/ipld/merkledag"
"github.com/ipfs/boxo/ipld/merkledag/traverse"
"github.com/ipfs/boxo/ipld/unixfs"
cid "github.com/ipfs/go-cid"
cmds "github.com/ipfs/go-ipfs-cmds"
"github.com/ipfs/kubo/core/commands/cmdenv"
Expand All @@ -25,6 +26,7 @@ func dagStat(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment)
if val, specified := req.Options[progressOptionName].(bool); specified {
progressive = val
}

api, err := cmdenv.GetApi(env, req)
if err != nil {
return err
Expand All @@ -33,6 +35,7 @@ func dagStat(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment)

cidSet := cid.NewSet()
dagStatSummary := &DagStatSummary{DagStatsArray: []*DagStat{}}

for _, a := range req.Arguments {
p, err := cmdutils.PathOrCidPath(a)
if err != nil {
Expand All @@ -50,24 +53,45 @@ func dagStat(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment)
if err != nil {
return err
}

dagstats := &DagStat{Cid: rp.RootCid()}
dagStatSummary.appendStats(dagstats)

chunkCids := cid.NewSet()
err = traverse.Traverse(obj, traverse.Options{
DAG: nodeGetter,
Order: traverse.DFSPre,
Func: func(current traverse.State) error {
currentNodeSize := uint64(len(current.Node.RawData()))
nd := current.Node
currentNodeSize := uint64(len(nd.RawData()))
dagstats.Size += currentNodeSize
dagstats.NumBlocks++
if !cidSet.Has(current.Node.Cid()) {

if pn, ok := nd.(*mdag.ProtoNode); ok {
if fsn, err := unixfs.FSNodeFromBytes(pn.Data()); err == nil {
if fsn.Type() == unixfs.TFile && len(pn.Links()) > 0 {
for _, l := range pn.Links() {
chunkCids.Add(l.Cid)
}
}
if !chunkCids.Has(nd.Cid()) {
// Count as a file only if not a chunk block
switch fsn.Type() {
case unixfs.TFile, unixfs.TRaw, unixfs.TSymlink, unixfs.TMetadata:
dagstats.NumFiles++
}
}
}
}

if !cidSet.Has(nd.Cid()) {
dagStatSummary.incrementTotalSize(currentNodeSize)
}
dagStatSummary.incrementRedundantSize(currentNodeSize)
cidSet.Add(current.Node.Cid())
cidSet.Add(nd.Cid())

if progressive {
if err := res.Emit(dagStatSummary); err != nil {
return err
}
return res.Emit(dagStatSummary)
}
return nil
},
Expand All @@ -82,10 +106,13 @@ func dagStat(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment)
dagStatSummary.UniqueBlocks = cidSet.Len()
dagStatSummary.calculateSummary()

if err := res.Emit(dagStatSummary); err != nil {
return err
var totalFiles int64
for _, s := range dagStatSummary.DagStatsArray {
totalFiles += s.NumFiles
}
return nil
dagStatSummary.NumFiles = totalFiles

return res.Emit(dagStatSummary)
}

func finishCLIStat(res cmds.Response, re cmds.ResponseEmitter) error {
Expand Down Expand Up @@ -122,7 +149,10 @@ func finishCLIStat(res cmds.Response, re cmds.ResponseEmitter) error {
totalBlocks += stat.NumBlocks
totalSize += stat.Size
}
fmt.Fprintf(os.Stderr, "Fetched/Processed %d blocks, %d bytes (%s)\r", totalBlocks, totalSize, humanize.Bytes(totalSize))
fmt.Fprintf(os.Stderr,
"Fetched/Processed %d blocks, %d bytes (%s)\r",
totalBlocks, totalSize, humanize.Bytes(totalSize),
)
}
default:
return e.TypeErr(out, v)
Expand Down
137 changes: 131 additions & 6 deletions core/commands/get.go
Comment thread
ChayanDass marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import (
gotar "archive/tar"
"bufio"
"compress/gzip"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"os"
gopath "path"
"path/filepath"
Expand All @@ -15,11 +18,13 @@ import (
"github.com/ipfs/kubo/core/commands/cmdenv"
"github.com/ipfs/kubo/core/commands/cmdutils"
"github.com/ipfs/kubo/core/commands/e"
fsrepo "github.com/ipfs/kubo/repo/fsrepo"

"github.com/cheggaaa/pb"
"github.com/ipfs/boxo/files"
"github.com/ipfs/boxo/tar"
cmds "github.com/ipfs/go-ipfs-cmds"
manet "github.com/multiformats/go-multiaddr/net"
)

var ErrInvalidCompressionLevel = errors.New("compression level must be between 1 and 9")
Expand All @@ -29,6 +34,7 @@ const (
archiveOptionName = "archive"
compressOptionName = "compress"
compressionLevelOptionName = "compression-level"
getTotalBlocksKey = "_getTotalBlocks"
)

var GetCmd = &cmds.Command{
Expand Down Expand Up @@ -61,8 +67,29 @@ may also specify the level of compression by specifying '-l=<1-9>'.
cmds.BoolOption(progressOptionName, "p", "Stream progress data.").WithDefault(true),
},
PreRun: func(req *cmds.Request, env cmds.Environment) error {
_, err := getCompressOptions(req)
return err
if _, err := getCompressOptions(req); err != nil {
return err
}

progress, _ := req.Options[progressOptionName].(bool)
if !progress {
return nil
}

baseURL, err := getAPIBaseURL(env)
if err != nil {
return nil
}

rootCID, err := resolveRootCID(baseURL, req.Arguments[0])
if err != nil || rootCID == "" {
return nil
}

totalBlocks, files := fetchStatInfo(baseURL, rootCID)
req.Options[getTotalBlocksKey] = totalBlocks
printGetProgress(os.Stderr, rootCID, files, totalBlocks)
return nil
Comment thread
ChayanDass marked this conversation as resolved.
Outdated
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
ctx := req.Context
Expand Down Expand Up @@ -141,6 +168,10 @@ may also specify the level of compression by specifying '-l=<1-9>'.

archive, _ := req.Options[archiveOptionName].(bool)
progress, _ := req.Options[progressOptionName].(bool)
totalBlocks, _ := req.Options[getTotalBlocksKey].(int)
if totalBlocks <= 0 {
totalBlocks = 1
}

gw := getWriter{
Out: os.Stdout,
Expand All @@ -149,13 +180,93 @@ may also specify the level of compression by specifying '-l=<1-9>'.
Compression: cmplvl,
Size: int64(res.Length()),
Progress: progress,
TotalBlocks: totalBlocks,
}

return gw.Write(outReader, outPath)
},
},
}

func getAPIBaseURL(env cmds.Environment) (string, error) {
configRoot, err := cmdenv.GetConfigRoot(env)
if err != nil {
return "", err
}
apiAddr, err := fsrepo.APIAddr(configRoot)
if err != nil {
return "", err
}
network, host, err := manet.DialArgs(apiAddr)
if err != nil || (network != "tcp" && network != "tcp4" && network != "tcp6") {
return "", errors.New("unsupported network")
}
return "http://" + host, nil
}

func resolveRootCID(baseURL, pathStr string) (string, error) {
Comment thread
ChayanDass marked this conversation as resolved.
Outdated
resp, err := http.Post(baseURL+"/api/v0/dag/resolve?arg="+url.QueryEscape(pathStr), "", nil)
if err != nil {
return "", err
}
defer resp.Body.Close()
var out struct {
Cid interface{} `json:"Cid"`
}
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return "", err
}
switch v := out.Cid.(type) {
case string:
return v, nil
case map[string]interface{}:
cid, _ := v["/"].(string)
return cid, nil
}
return "", nil
}

// fetchStatInfo calls dag/stat and returns totalBlocks and file count directly from NumFiles.
func fetchStatInfo(baseURL, rootCID string) (totalBlocks, numFiles int) {
resp, err := http.Post(
baseURL+"/api/v0/dag/stat?arg="+url.QueryEscape("/ipfs/"+rootCID)+"&progress=false",
"", nil,
)
if err != nil || resp.StatusCode != http.StatusOK {
return 1, 1
}
defer resp.Body.Close()

var out struct {
UniqueBlocks int `json:"UniqueBlocks"`
DagStats []struct {
NumBlocks int64 `json:"NumBlocks"`
NumFiles int64 `json:"NumFiles"`
} `json:"DagStats"`
}
if err := json.NewDecoder(resp.Body).Decode(&out); err == nil && len(out.DagStats) > 0 {
ds := out.DagStats[0]
totalBlocks = int(ds.NumBlocks)
numFiles = int(ds.NumFiles)
}
if totalBlocks <= 0 {
totalBlocks = max(out.UniqueBlocks, 1)
}
if numFiles <= 0 {
numFiles = 1
}
return
}

func printGetProgress(w io.Writer, cidStr string, numFiles, totalBlocks int) {
fmt.Fprintf(w, "\nFetching CID: %s\n", cidStr)
if numFiles == 1 {
fmt.Fprintf(w, "Items: 1 file | Blocks: %d\n\n", totalBlocks)
} else {
fmt.Fprintf(w, "Items: %d files | Blocks: %d\n\n", numFiles, totalBlocks)
}
}

type clearlineReader struct {
io.Reader
out io.Writer
Expand All @@ -164,8 +275,7 @@ type clearlineReader struct {
func (r *clearlineReader) Read(p []byte) (n int, err error) {
n, err = r.Reader.Read(p)
if err == io.EOF {
// callback
fmt.Fprintf(r.out, "\033[2K\r") // clear progress bar line on EOF
fmt.Fprintf(r.out, "\033[2K\r")
}
return
}
Expand Down Expand Up @@ -210,6 +320,7 @@ type getWriter struct {
Compression int
Size int64
Progress bool
TotalBlocks int
}

func (gw *getWriter) Write(r io.Reader, fpath string) error {
Expand Down Expand Up @@ -258,10 +369,24 @@ func (gw *getWriter) writeExtracted(r io.Reader, fpath string) error {
var progressCb func(int64) int64
if gw.Progress {
bar := makeProgressBar(gw.Err, gw.Size)
totalBlocks := gw.TotalBlocks
fmt.Fprintf(gw.Err, "Blocks: 0 / %d\n", totalBlocks)
bar.Start()
defer bar.Finish()
defer bar.Set64(gw.Size)
progressCb = bar.Add64
defer func() {
bar.Set64(gw.Size)
fmt.Fprintf(gw.Err, "\033[A\033[2K\rBlocks: %d / %d\033[B", totalBlocks, totalBlocks)
}()
size := gw.Size
progressCb = func(delta int64) int64 {
total := bar.Add64(delta)
blocksEst := totalBlocks
if total < size && size > 0 {
blocksEst = int(int64(totalBlocks) * total / size)
}
fmt.Fprintf(gw.Err, "\033[A\033[2K\rBlocks: %d / %d\033[B", blocksEst, totalBlocks)
return total
}
}

extractor := &tar.Extractor{Path: fpath, Progress: progressCb}
Expand Down
Loading