diff --git a/config/config.go b/config/config.go index 0952e0d3173..c8432dd6d37 100644 --- a/config/config.go +++ b/config/config.go @@ -45,6 +45,8 @@ type Config struct { Import Import Version Version + OnDemandPinning OnDemandPinning + Internal Internal // experimental/unstable options Bitswap Bitswap diff --git a/config/experiments.go b/config/experiments.go index 6c43ac04f07..4cf3af86e76 100644 --- a/config/experiments.go +++ b/config/experiments.go @@ -10,6 +10,7 @@ type Experiments struct { OptimisticProvide bool OptimisticProvideJobsPoolSize int GatewayOverLibp2p bool `json:",omitempty"` + OnDemandPinningEnabled bool GraphsyncEnabled graphsyncEnabled `json:",omitempty"` AcceleratedDHTClient experimentalAcceleratedDHTClient `json:",omitempty"` diff --git a/config/ondemandpin.go b/config/ondemandpin.go new file mode 100644 index 00000000000..4b072e94d32 --- /dev/null +++ b/config/ondemandpin.go @@ -0,0 +1,20 @@ +package config + +import "time" + +const ( + DefaultOnDemandPinReplicationTarget = 5 + DefaultOnDemandPinCheckInterval = 10 * time.Minute + DefaultOnDemandPinUnpinGracePeriod = 24 * time.Hour +) + +type OnDemandPinning struct { + // Minimum providers desired in the DHT (excluding self). + ReplicationTarget OptionalInteger + + // How often the checker evaluates all registered CIDs. + CheckInterval OptionalDuration + + // How long replication must stay above target before unpinning. + UnpinGracePeriod OptionalDuration +} diff --git a/core/commands/commands_test.go b/core/commands/commands_test.go index be85c742648..343a62207e0 100644 --- a/core/commands/commands_test.go +++ b/core/commands/commands_test.go @@ -167,6 +167,10 @@ func TestCommands(t *testing.T) { "/pin/remote/service/add", "/pin/remote/service/ls", "/pin/remote/service/rm", + "/pin/ondemand", + "/pin/ondemand/add", + "/pin/ondemand/rm", + "/pin/ondemand/ls", "/pin/rm", "/pin/update", "/pin/verify", diff --git a/core/commands/pin/ondemandpin.go b/core/commands/pin/ondemandpin.go new file mode 100644 index 00000000000..8e1635f93b1 --- /dev/null +++ b/core/commands/pin/ondemandpin.go @@ -0,0 +1,280 @@ +package pin + +import ( + "fmt" + "io" + "time" + + cmds "github.com/ipfs/go-ipfs-cmds" + "github.com/ipfs/kubo/config" + cmdenv "github.com/ipfs/kubo/core/commands/cmdenv" + "github.com/ipfs/kubo/core/commands/cmdutils" + "github.com/ipfs/kubo/ondemandpin" +) + +const onDemandLiveOptionName = "live" + +var onDemandPinCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Manage on-demand pins.", + ShortDescription: ` +On-demand pins when few DHT providers exist in the routing table; unpins after +replication stays above target for a grace period. Requires config +Experimental.OnDemandPinningEnabled. +`, + }, + Subcommands: map[string]*cmds.Command{ + "add": addOnDemandPinCmd, + "rm": rmOnDemandPinCmd, + "ls": listOnDemandPinCmd, + }, +} + +type OnDemandPinOutput struct { + Cid string `json:"Cid"` +} + +var addOnDemandPinCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Register CIDs for on-demand pinning.", + ShortDescription: `Registers CID(s) for on-demand pinning; checker pins when needed.`, + }, + Arguments: []cmds.Argument{ + cmds.StringArg("cid", true, true, "CID(s) to register."), + }, + Type: OnDemandPinOutput{}, + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + n, err := cmdenv.GetNode(env) + if err != nil { + return err + } + + cfg, err := n.Repo.Config() + if err != nil { + return err + } + if !cfg.Experimental.OnDemandPinningEnabled { + return fmt.Errorf("on-demand pinning is not enabled; set Experimental.OnDemandPinningEnabled = true in config") + } + + store := n.OnDemandPinStore + + api, err := cmdenv.GetApi(env, req) + if err != nil { + return err + } + + for _, arg := range req.Arguments { + p, err := cmdutils.PathOrCidPath(arg) + if err != nil { + return fmt.Errorf("invalid CID or path %q: %w", arg, err) + } + + rp, _, err := api.ResolvePath(req.Context, p) + if err != nil { + return fmt.Errorf("resolving %q: %w", arg, err) + } + c := rp.RootCid() + + if err := store.Add(req.Context, c); err != nil { + return err + } + + if checker := n.OnDemandPinChecker; checker != nil { + checker.Enqueue(c) + } + + if err := res.Emit(&OnDemandPinOutput{ + Cid: c.String(), + }); err != nil { + return err + } + } + return nil + }, + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *OnDemandPinOutput) error { + fmt.Fprintf(w, "registered %s for on-demand pinning\n", out.Cid) + return nil + }), + }, +} + +var rmOnDemandPinCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Remove CIDs from on-demand pinning.", + ShortDescription: ` +Removes CID(s) from the registry. Checker-pinned content is unpinned. + +Works when on-demand pinning is disabled, to clear old registrations. +`, + }, + Arguments: []cmds.Argument{ + cmds.StringArg("cid", true, true, "CID(s) to remove."), + }, + Type: OnDemandPinOutput{}, + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + n, err := cmdenv.GetNode(env) + if err != nil { + return err + } + + store := n.OnDemandPinStore + + api, err := cmdenv.GetApi(env, req) + if err != nil { + return err + } + + for _, arg := range req.Arguments { + p, err := cmdutils.PathOrCidPath(arg) + if err != nil { + return fmt.Errorf("invalid CID or path %q: %w", arg, err) + } + + rp, _, err := api.ResolvePath(req.Context, p) + if err != nil { + return fmt.Errorf("resolving %q: %w", arg, err) + } + c := rp.RootCid() + + isOurs, err := ondemandpin.PinHasName(req.Context, n.Pinning, c, ondemandpin.OnDemandPinName) + if err != nil { + return fmt.Errorf("checking pin state for %s: %w", c, err) + } + if isOurs { + if err := api.Pin().Rm(req.Context, rp); err != nil { + return fmt.Errorf("unpinning %s: %w", c, err) + } + } + + if err := store.Remove(req.Context, c); err != nil { + return err + } + + if err := res.Emit(&OnDemandPinOutput{ + Cid: c.String(), + }); err != nil { + return err + } + } + return nil + }, + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *OnDemandPinOutput) error { + fmt.Fprintf(w, "removed %s from on-demand pinning\n", out.Cid) + return nil + }), + }, +} + +type OnDemandLsOutput struct { + Cid string `json:"Cid"` + PinnedByUs bool `json:"PinnedByUs"` + Providers *int `json:"Providers,omitempty"` + LastAboveTarget string `json:"LastAboveTarget,omitempty"` + CreatedAt string `json:"CreatedAt"` +} + +var listOnDemandPinCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "List on-demand pins.", + ShortDescription: ` +Lists CIDs registered for on-demand pinning with their current state. +Use --live to include real-time provider counts from the DHT. +`, + }, + Arguments: []cmds.Argument{ + cmds.StringArg("cid", false, true, "Optional CID(s) to filter."), + }, + Options: []cmds.Option{ + cmds.BoolOption(onDemandLiveOptionName, "l", "Perform live provider lookup."), + }, + Type: OnDemandLsOutput{}, + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + n, err := cmdenv.GetNode(env) + if err != nil { + return err + } + + store := n.OnDemandPinStore + + live, _ := req.Options[onDemandLiveOptionName].(bool) + + var globalTarget int + if live { + cfg, err := n.Repo.Config() + if err != nil { + return err + } + globalTarget = int(cfg.OnDemandPinning.ReplicationTarget.WithDefault(config.DefaultOnDemandPinReplicationTarget)) + } + + var records []ondemandpin.Record + if len(req.Arguments) > 0 { + api, err := cmdenv.GetApi(env, req) + if err != nil { + return err + } + for _, arg := range req.Arguments { + p, err := cmdutils.PathOrCidPath(arg) + if err != nil { + return fmt.Errorf("invalid CID or path %q: %w", arg, err) + } + rp, _, err := api.ResolvePath(req.Context, p) + if err != nil { + return fmt.Errorf("resolving %q: %w", arg, err) + } + rec, err := store.Get(req.Context, rp.RootCid()) + if err != nil { + return err + } + records = append(records, *rec) + } + } else { + records, err = store.List(req.Context) + if err != nil { + return err + } + } + + for _, rec := range records { + out := OnDemandLsOutput{ + Cid: rec.Cid.String(), + PinnedByUs: rec.PinnedByUs, + CreatedAt: rec.CreatedAt.Format(time.RFC3339), + } + if !rec.LastAboveTarget.IsZero() { + out.LastAboveTarget = rec.LastAboveTarget.Format(time.RFC3339) + } + + if live && n.Routing != nil { + count := ondemandpin.CountProviders(req.Context, n.Routing, n.Identity, rec.Cid, globalTarget) + out.Providers = &count + } + + if err := res.Emit(&out); err != nil { + return err + } + } + return nil + }, + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *OnDemandLsOutput) error { + pinState := "not-pinned" + if out.PinnedByUs { + pinState = "pinned" + } + fmt.Fprintf(w, "%s", out.Cid) + if out.Providers != nil { + fmt.Fprintf(w, " providers=%d", *out.Providers) + } + fmt.Fprintf(w, " %s created=%s", pinState, out.CreatedAt) + if out.LastAboveTarget != "" { + fmt.Fprintf(w, " above-target-since=%s", out.LastAboveTarget) + } + fmt.Fprintln(w) + return nil + }), + }, +} diff --git a/core/commands/pin/pin.go b/core/commands/pin/pin.go index 3762684781b..24d2897ddce 100644 --- a/core/commands/pin/pin.go +++ b/core/commands/pin/pin.go @@ -38,6 +38,8 @@ var PinCmd = &cmds.Command{ "verify": verifyPinCmd, "update": updatePinCmd, "remote": remotePinCmd, + + "ondemand": onDemandPinCmd, }, } diff --git a/core/core.go b/core/core.go index 5f37c287116..44ec5d9e8cb 100644 --- a/core/core.go +++ b/core/core.go @@ -56,6 +56,7 @@ import ( "github.com/ipfs/kubo/core/node" "github.com/ipfs/kubo/core/node/libp2p" "github.com/ipfs/kubo/fuse/mount" + "github.com/ipfs/kubo/ondemandpin" "github.com/ipfs/kubo/p2p" "github.com/ipfs/kubo/repo" irouting "github.com/ipfs/kubo/routing" @@ -76,6 +77,9 @@ type IpfsNode struct { PrivateKey ic.PrivKey `optional:"true"` // the local node's private Key PNetFingerprint libp2p.PNetFingerprint `optional:"true"` // fingerprint of private network + OnDemandPinStore *ondemandpin.Store + OnDemandPinChecker *ondemandpin.Checker `optional:"true"` + // Services Peerstore pstore.Peerstore `optional:"true"` // storage for other Peer instances Blockstore bstore.GCBlockstore // the block store (lower level) diff --git a/core/node/groups.go b/core/node/groups.go index ab497e33b57..3113e87c7b7 100644 --- a/core/node/groups.go +++ b/core/node/groups.go @@ -350,6 +350,8 @@ func Online(bcfg *BuildCfg, cfg *config.Config, userResourceOverrides rcmgr.Part // Disabling is controlled by Provide.Enabled=false or setting Interval to 0. isProviderEnabled := cfg.Provide.Enabled.WithDefault(config.DefaultProvideEnabled) && cfg.Provide.DHT.Interval.WithDefault(config.DefaultProvideDHTInterval) != 0 + isOnDemandPinEnabled := cfg.Experimental.OnDemandPinningEnabled + return fx.Options( fx.Provide(BitswapOptions(cfg)), fx.Provide(Bitswap(isBitswapServerEnabled, isBitswapLibp2pEnabled, isHTTPRetrievalEnabled)), @@ -365,6 +367,8 @@ func Online(bcfg *BuildCfg, cfg *config.Config, userResourceOverrides rcmgr.Part LibP2P(bcfg, cfg, userResourceOverrides), OnlineProviders(isProviderEnabled, cfg), + + maybeProvide(OnDemandPinChecker(cfg.OnDemandPinning), isOnDemandPinEnabled), ) } @@ -458,6 +462,7 @@ func IPFS(ctx context.Context, bcfg *BuildCfg) fx.Option { fx.Provide(BlockService(cfg)), fx.Provide(Pinning(providerStrategy)), fx.Provide(Files(providerStrategy)), + fx.Provide(OnDemandPinStore), Core, ) } diff --git a/core/node/ondemandpin.go b/core/node/ondemandpin.go new file mode 100644 index 00000000000..f542e086a56 --- /dev/null +++ b/core/node/ondemandpin.go @@ -0,0 +1,168 @@ +package node + +import ( + "context" + "time" + + "github.com/dustin/go-humanize" + blockstore "github.com/ipfs/boxo/blockstore" + "github.com/ipfs/boxo/ipld/merkledag" + pin "github.com/ipfs/boxo/pinning/pinner" + "github.com/ipfs/go-cid" + format "github.com/ipfs/go-ipld-format" + peer "github.com/libp2p/go-libp2p/core/peer" + routing "github.com/libp2p/go-libp2p/core/routing" + "go.uber.org/fx" + + "github.com/ipfs/kubo/config" + "github.com/ipfs/kubo/core/node/helpers" + "github.com/ipfs/kubo/ondemandpin" + "github.com/ipfs/kubo/repo" +) + +// pinIdleTimeout cancels a pin when no new blocks arrive for this long. +const pinIdleTimeout = 2 * time.Minute + +type kuboPinService struct { + pinner pin.Pinner + dag format.DAGService + bs blockstore.GCBlockstore +} + +func (s *kuboPinService) Pin(ctx context.Context, c cid.Cid, name string) error { + defer s.bs.PinLock(ctx).Unlock(ctx) + + node, err := s.fetchRoot(ctx, c) + if err != nil { + return err + } + if err := s.pinDAG(ctx, node, name); err != nil { + return err + } + return s.pinner.Flush(ctx) +} + +func (s *kuboPinService) fetchRoot(ctx context.Context, c cid.Cid) (format.Node, error) { + ctx, cancel := context.WithTimeout(ctx, pinIdleTimeout) + defer cancel() + return s.dag.Get(ctx, c) +} + +// pinDAG recursively pins the DAG rooted at node. +// A background goroutine monitors block-fetching progress and cancels the operation if no new blocks arrive within pinIdleTimeout. +func (s *kuboPinService) pinDAG(ctx context.Context, node format.Node, name string) error { + tracker := new(merkledag.ProgressTracker) + trackerCtx := tracker.DeriveContext(ctx) + pinCtx, cancel := context.WithCancel(trackerCtx) + defer cancel() + + go watchPinProgress(pinCtx, cancel, tracker, pinIdleTimeout) + return s.pinner.Pin(pinCtx, node, true, name) +} + +func watchPinProgress(ctx context.Context, cancel context.CancelFunc, tracker *merkledag.ProgressTracker, timeout time.Duration) { + var last int + lastProgress := time.Now() + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if cur := tracker.Value(); cur != last { + last = cur + lastProgress = time.Now() + } else if time.Since(lastProgress) >= timeout { + cancel() + return + } + } + } +} + +func (s *kuboPinService) Unpin(ctx context.Context, c cid.Cid) error { + defer s.bs.PinLock(ctx).Unlock(ctx) + + if err := s.pinner.Unpin(ctx, c, true); err != nil { + return err + } + return s.pinner.Flush(ctx) +} + +func (s *kuboPinService) IsPinned(ctx context.Context, c cid.Cid) (bool, error) { + _, pinned, err := s.pinner.IsPinned(ctx, c) + return pinned, err +} + +func (s *kuboPinService) HasPinWithName(ctx context.Context, c cid.Cid, name string) (bool, error) { + return ondemandpin.PinHasName(ctx, s.pinner, c, name) +} + +type kuboStorageChecker struct { + repo repo.Repo +} + +func (s *kuboStorageChecker) StorageUsage(ctx context.Context) (uint64, uint64, error) { + cfg, err := s.repo.Config() + if err != nil { + return 0, 0, err + } + used, err := s.repo.GetStorageUsage(ctx) + if err != nil { + return 0, 0, err + } + if cfg.Datastore.StorageMax == "" { + return used, 0, nil + } + max, err := humanize.ParseBytes(cfg.Datastore.StorageMax) + if err != nil { + return 0, 0, err + } + wm := cfg.Datastore.StorageGCWatermark + if wm <= 0 || wm > 100 { + wm = 90 + } + return used, max * uint64(wm) / 100, nil +} + +func OnDemandPinStore(r repo.Repo) *ondemandpin.Store { + return ondemandpin.NewStore(r.Datastore()) +} + +func OnDemandPinChecker(cfg config.OnDemandPinning) func( + mctx helpers.MetricsCtx, + lc fx.Lifecycle, + r repo.Repo, + store *ondemandpin.Store, + pinner pin.Pinner, + cr routing.ContentRouting, + dag format.DAGService, + bs blockstore.GCBlockstore, + id peer.ID, +) *ondemandpin.Checker { + return func( + mctx helpers.MetricsCtx, + lc fx.Lifecycle, + r repo.Repo, + store *ondemandpin.Store, + pinner pin.Pinner, + cr routing.ContentRouting, + dag format.DAGService, + bs blockstore.GCBlockstore, + id peer.ID, + ) *ondemandpin.Checker { + pins := &kuboPinService{pinner: pinner, dag: dag, bs: bs} + storage := &kuboStorageChecker{repo: r} + checker := ondemandpin.NewChecker(store, pins, storage, cr, id, cfg) + ctx := helpers.LifecycleCtx(mctx, lc) + + lc.Append(fx.Hook{ + OnStart: func(context.Context) error { + go checker.Run(ctx) + return nil + }, + }) + return checker + } +} diff --git a/docs/changelogs/v0.42.md b/docs/changelogs/v0.42.md index 17167733cda..0598720ff6e 100644 --- a/docs/changelogs/v0.42.md +++ b/docs/changelogs/v0.42.md @@ -10,6 +10,7 @@ This release was brought to you by the [Shipyard](https://ipshipyard.com/) team. - [Overview](#overview) - [๐Ÿ”ฆ Highlights](#-highlights) + - [Experimental on-demand pinning](#experimental-on-demand-pinning) - [๐Ÿ“ Changelog](#-changelog) - [๐Ÿ‘จโ€๐Ÿ‘ฉโ€๐Ÿ‘งโ€๐Ÿ‘ฆ Contributors](#-contributors) @@ -17,6 +18,51 @@ This release was brought to you by the [Shipyard](https://ipshipyard.com/) team. ### ๐Ÿ”ฆ Highlights +#### Experimental on-demand pinning + +Automatically pins content when DHT provider counts fall below a configurable +replication target, and unpins once replication has been above target for a +grace period. Helps keeping critical data around, without wasting storage on +overly replicated CIDs. + +The feature is gated behind `Experimental.OnDemandPinningEnabled` and described +in [ipfs/specs#532](https://github.com/ipfs/specs/pull/532). + +```console +$ ipfs config --json Experimental.OnDemandPinningEnabled true +``` + +New CLI commands under `ipfs pin ondemand`: + +- `add` -- register CIDs for on-demand pinning +- `rm` -- deregister and unpin +- `ls` -- list registered CIDs (use `--live` for real-time DHT provider counts) + +Design highlights: + +- **Pin partitioning**: the checker needs to distinguish its pins from user + pins to avoid accidental deletion. This implementation uses boxo's pin name + field ("on-demand"). +- **Storage budget**: skips pinning when repo usage exceeds + `StorageMax * StorageGCWatermark`. +- **Idle timeout**: DAG fetches timeout after 2 minutes without receiving new + blocks, allowing large downloads while skipping dead records. +- **Provide after pin**: the checker publishes a DHT provider record after + pinning so other peers can discover the content on this node. +- **Sybil limitation**: provider counts come from DHT queries, which are + susceptible to Sybil manipulation. Documented as a known limitation. + +Configuration at [`OnDemandPinning`](https://github.com/ipfs/kubo/blob/master/docs/config.md#ondemandpinning): + +| Option | Default | Description | +|---|---|---| +| `OnDemandPinning.ReplicationTarget` | `5` | Minimum providers in DHT (excluding self) | +| `OnDemandPinning.CheckInterval` | `"10m"` | How often the checker runs | +| `OnDemandPinning.UnpinGracePeriod` | `"24h"` | How long above target before unpinning | + +See [experimental features](https://github.com/ipfs/kubo/blob/master/docs/experimental-features.md#on-demand-pinning) +for full documentation. + ### ๐Ÿ“ Changelog ### ๐Ÿ‘จโ€๐Ÿ‘ฉโ€๐Ÿ‘งโ€๐Ÿ‘ฆ Contributors diff --git a/docs/config.md b/docs/config.md index 9582ac02041..88093e0af57 100644 --- a/docs/config.md +++ b/docs/config.md @@ -60,6 +60,7 @@ config file at runtime. - [`Discovery.MDNS.Interval`](#discoverymdnsinterval) - [`Experimental`](#experimental) - [`Experimental.Libp2pStreamMounting`](#experimentallibp2pstreammounting) + - [`Experimental.OnDemandPinningEnabled`](#experimentalondemandpinningenabled) - [`Gateway`](#gateway) - [`Gateway.NoFetch`](#gatewaynofetch) - [`Gateway.NoDNSLink`](#gatewaynodnslink) @@ -119,6 +120,11 @@ config file at runtime. - [`Mounts.IPNS`](#mountsipns) - [`Mounts.MFS`](#mountsmfs) - [`Mounts.FuseAllowOther`](#mountsfuseallowother) + - [`OnDemandPinning`](#ondemandpinning) + - [`OnDemandPinning.ReplicationTarget`](#ondemandpinningreplicationtarget) + - [`OnDemandPinning.CheckInterval`](#ondemandpinningcheckinterval) + - [`OnDemandPinning.UnpinGracePeriod`](#ondemandpinningunpingraceperiod) + - [`Pinning`](#pinning) - [`Pinning.RemoteServices`](#pinningremoteservices) - [`Pinning.RemoteServices: API`](#pinningremoteservices-api) @@ -1100,6 +1106,20 @@ in the [new mDNS implementation](https://github.com/libp2p/zeroconf#readme). Toggle and configure experimental features of Kubo. Experimental features are listed [here](./experimental-features.md). +### `Experimental.OnDemandPinningEnabled` + +Enables on-demand pinning. When enabled, the node runs a background checker +that periodically evaluates DHT provider counts for CIDs registered via +`ipfs pin ondemand add`. CIDs with fewer providers than the replication target +are pinned; pins are removed after replication stays above target for a grace +period (default 24h). + +See [`OnDemandPinning`](#ondemandpinning) for configuration. + +Default: `false` + +Type: `bool` + ### `Experimental.Libp2pStreamMounting` Enables the `ipfs p2p` commands for tunneling TCP connections through libp2p @@ -2029,6 +2049,38 @@ Default: `"5m"` Type: `duration` +## `OnDemandPinning` + +Configures the on-demand pinning system. Requires +[`Experimental.OnDemandPinningEnabled`](#experimentalondemandpinningenabled). + +### `OnDemandPinning.ReplicationTarget` + +The minimum number of providers desired in the DHT (excluding the local node). +When fewer providers are found, the node pins the content locally. + +Default: `5` + +Type: `optionalInteger` + +### `OnDemandPinning.CheckInterval` + +How often the background checker evaluates all on-demand pins. + +Default: `"10m"` + +Type: `optionalDuration` + +### `OnDemandPinning.UnpinGracePeriod` + +How long replication must stay above target before the local pin is removed. +This prevents thrashing when provider counts fluctuate near the target +boundary. + +Default: `"24h"` + +Type: `optionalDuration` + ## `Provide` Configures how your node advertises content to make it discoverable by other diff --git a/docs/experimental-features.md b/docs/experimental-features.md index c6eae93c981..e71a35b6f3c 100644 --- a/docs/experimental-features.md +++ b/docs/experimental-features.md @@ -28,6 +28,7 @@ the above issue. - [Noise](#noise) - [Optimistic Provide](#optimistic-provide) - [HTTP Gateway over Libp2p](#http-gateway-over-libp2p) +- [On-Demand Pinning](#on-demand-pinning) --- @@ -601,6 +602,67 @@ ipfs config --json Experimental.GatewayOverLibp2p true - [ ] Needs a mechanism for HTTP handler to signal supported features ([IPIP-425](https://github.com/ipfs/specs/pull/425)) - [ ] Needs an option for Kubo to detect peers that have it enabled and prefer HTTP transport before falling back to bitswap (and use CAR if peer supports dag-scope=entity from [IPIP-402](https://specs.ipfs.tech/ipips/ipip-0402/)) +## On-Demand Pinning + +### State + +Experimental, disabled by default. + +On-demand pinning lets a node automatically pin content when DHT provider +counts fall below a configurable replication target, and unpin when +replication recovers above target for a grace period. Under-replicated +content gets pinned; storage is freed once enough other providers exist. + +The feature consists of: + +- A **registry** of CIDs to monitor, managed via `ipfs pin ondemand add|rm|ls`. +- A **background checker** that periodically queries the DHT for each + registered CID and pins or unpins accordingly. +- **Pin partitioning**: the checker marks its pins with the name `"on-demand"` + to distinguish them from user-created pins and will never remove a pin it + did not create. + +When the checker pins a CID it also publishes a provider record to the DHT +so other peers can discover the content on this node. + +**Security consideration**: DHT provider counts can be gamed via Sybil +attacks. An attacker could inflate provider counts to trick nodes into +unpinning content that is not actually well-replicated. The grace period +provides partial mitigation by requiring fake provider records to be +sustained for its full duration (default 24 hours). + +### How to enable + +``` +ipfs config --json Experimental.OnDemandPinningEnabled true +``` + +### Configuring + +See [`OnDemandPinning`](https://github.com/ipfs/kubo/blob/master/docs/config.md#ondemandpinning) +for tunable parameters: `ReplicationTarget`, `CheckInterval`, +and `UnpinGracePeriod`. + +### Basic usage + +```bash +# Register a CID for on-demand monitoring +ipfs pin ondemand add QmExample + +# List all registered CIDs (with live provider counts from DHT) +ipfs pin ondemand ls --live + +# Remove a CID from on-demand monitoring (also unpins if checker had pinned it) +ipfs pin ondemand rm QmExample +``` + +### Road to being a real feature + +- [ ] Needs more people to use and report on how well it works +- [ ] Needs integration tests in `test/cli/` +- [ ] Needs UI support in ipfs-webui / IPFS Desktop +- [ ] Evaluate Sybil resilience and whether additional mitigations are needed + ## Accelerated DHT Client This feature now lives at [`Routing.AcceleratedDHTClient`](https://github.com/ipfs/kubo/blob/master/docs/config.md#routingaccelerateddhtclient). diff --git a/ondemandpin/checker.go b/ondemandpin/checker.go new file mode 100644 index 00000000000..9b121029ea9 --- /dev/null +++ b/ondemandpin/checker.go @@ -0,0 +1,282 @@ +package ondemandpin + +import ( + "context" + "time" + + pin "github.com/ipfs/boxo/pinning/pinner" + "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" + "github.com/ipfs/kubo/config" + peer "github.com/libp2p/go-libp2p/core/peer" + routing "github.com/libp2p/go-libp2p/core/routing" +) + +var log = logging.Logger("ondemandpin") + +// OnDemandPinName is the pin name the checker uses when creating pins (Kubo specific. Other implementation may divert from this method). +// Pins carrying this name are considered managed by on-demand pinning and may be removed automatically when replication recovers. +const OnDemandPinName = "on-demand" + +// checkTimeout prevents hung DHT query or pin/unpin operation from blocking the checker indefinitely. +const checkTimeout = 5 * time.Minute + +type PinService interface { + Pin(ctx context.Context, c cid.Cid, name string) error + Unpin(ctx context.Context, c cid.Cid) error + IsPinned(ctx context.Context, c cid.Cid) (bool, error) + HasPinWithName(ctx context.Context, c cid.Cid, name string) (bool, error) +} + +type StorageChecker interface { + StorageUsage(ctx context.Context) (used, limit uint64, err error) +} + +type Checker struct { + store *Store + pins PinService + storage StorageChecker + routing routing.ContentRouting + selfID peer.ID + + replicationTarget int + checkInterval time.Duration + unpinGracePeriod time.Duration + + now func() time.Time + priorityCh chan cid.Cid +} + +func NewChecker( + store *Store, + pins PinService, + storage StorageChecker, + cr routing.ContentRouting, + selfID peer.ID, + cfg config.OnDemandPinning, +) *Checker { + return &Checker{ + store: store, + pins: pins, + storage: storage, + routing: cr, + selfID: selfID, + + replicationTarget: int(cfg.ReplicationTarget.WithDefault(config.DefaultOnDemandPinReplicationTarget)), + checkInterval: cfg.CheckInterval.WithDefault(config.DefaultOnDemandPinCheckInterval), + unpinGracePeriod: cfg.UnpinGracePeriod.WithDefault(config.DefaultOnDemandPinUnpinGracePeriod), + + now: time.Now, + priorityCh: make(chan cid.Cid, 64), + } +} + +func (c *Checker) Enqueue(ci cid.Cid) { + select { + case c.priorityCh <- ci: + default: + log.Warnw("priority queue full, CID will be checked in next regular cycle", "cid", ci) + } +} + +// Run blocks until ctx is cancelled. +func (c *Checker) Run(ctx context.Context) { + log.Info("on-demand pin checker started") + defer log.Info("on-demand pin checker stopped") + + ticker := time.NewTicker(c.checkInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case ci := <-c.priorityCh: + c.checkOne(ctx, ci) + case <-ticker.C: + c.checkAll(ctx) + } + } +} + +func (c *Checker) checkAll(ctx context.Context) { + records, err := c.store.List(ctx) + if err != nil { + log.Errorw("failed to list on-demand pins", "error", err) + return + } + + log.Infow("starting check cycle", "records", len(records)) + for _, rec := range records { + // Drain priority checks between records so Enqueue'd CIDs don't wait for a full sweep to complete. + select { + case ci := <-c.priorityCh: + c.checkOne(ctx, ci) + default: + } + if ctx.Err() != nil { + return + } + c.checkRecord(ctx, &rec) + } +} + +func (c *Checker) checkOne(ctx context.Context, ci cid.Cid) { + rec, err := c.store.Get(ctx, ci) + if err != nil { + log.Debugw("CID not in store, skipping", "cid", ci, "error", err) + return + } + c.checkRecord(ctx, rec) +} + +// checkRecord evaluates a single on-demand pin record in three phases: +// +// 1. Guard: if the CID already has an external pin (not created by us), skip it to avoid interfering with user-managed pins. +// We use PinService.IsPinned (any pin) here โ€” not Record.PinnedByUs (on-demand pins). +// 2. Under-replicated: if provider count is low, pin locally. +// 3. Well-replicated: if provider count is high for a full grace period, unpin (on-demand pins only). +func (c *Checker) checkRecord(ctx context.Context, rec *Record) { + ctx, cancel := context.WithTimeout(ctx, checkTimeout) + defer cancel() + + pinned, err := c.pins.IsPinned(ctx, rec.Cid) + if err != nil { + log.Errorw("failed to check pin state, skipping CID", "cid", rec.Cid, "error", err) + return + } + if !rec.PinnedByUs && pinned { + log.Debugw("skipping: CID has a user-managed pin", "cid", rec.Cid) + return + } + + count := CountProviders(ctx, c.routing, c.selfID, rec.Cid, c.replicationTarget) + log.Debugw("provider count", "cid", rec.Cid, "count", count, "target", c.replicationTarget, "pinnedByUs", rec.PinnedByUs) + + if count < c.replicationTarget { + c.handleUnderReplicated(ctx, rec, count, pinned) + } else { + c.handleWellReplicated(ctx, rec, count) + } +} + +// handleUnderReplicated pins the CID if it isn't already pinned. +func (c *Checker) handleUnderReplicated(ctx context.Context, rec *Record, count int, pinExists bool) { + if rec.PinnedByUs && pinExists { + if !rec.LastAboveTarget.IsZero() { + rec.LastAboveTarget = time.Time{} + c.saveRecord(ctx, rec) + } + return + } + + if rec.PinnedByUs { + log.Warnw("pin was removed externally, will re-pin", "cid", rec.Cid) + rec.PinnedByUs = false + } + + if !c.hasStorageBudget(ctx) { + log.Warnw("skipping pin: repo near storage limit", "cid", rec.Cid) + return + } + + if err := c.pins.Pin(ctx, rec.Cid, OnDemandPinName); err != nil { + log.Errorw("failed to pin", "cid", rec.Cid, "error", err) + return + } + rec.PinnedByUs = true + rec.LastAboveTarget = time.Time{} + log.Infow("pinned", "cid", rec.Cid, "providers", count, "target", c.replicationTarget) + + if err := c.routing.Provide(ctx, rec.Cid, true); err != nil { + log.Warnw("failed to provide after pin", "cid", rec.Cid, "error", err) + } + c.saveRecord(ctx, rec) +} + +// handleWellReplicated manages grace-period-then-unpin. +func (c *Checker) handleWellReplicated(ctx context.Context, rec *Record, count int) { + if !rec.PinnedByUs { + return + } + + if rec.LastAboveTarget.IsZero() { + rec.LastAboveTarget = c.now() + c.saveRecord(ctx, rec) + log.Debugw("grace period started", "cid", rec.Cid, "providers", count, "target", c.replicationTarget) + return + } + + if c.now().Sub(rec.LastAboveTarget) < c.unpinGracePeriod { + return + } + + hasOurPin, err := c.pins.HasPinWithName(ctx, rec.Cid, OnDemandPinName) + if err != nil { + log.Errorw("failed to check pin name, skipping unpin", "cid", rec.Cid, "error", err) + return + } + + if hasOurPin { + if err := c.pins.Unpin(ctx, rec.Cid); err != nil { + log.Errorw("failed to unpin", "cid", rec.Cid, "error", err) + return + } + log.Infow("unpinned", "cid", rec.Cid, "providers", count, "target", c.replicationTarget) + } else { + log.Infow("relinquishing management: pin name changed externally", "cid", rec.Cid) + } + + rec.PinnedByUs = false + rec.LastAboveTarget = time.Time{} + c.saveRecord(ctx, rec) +} + +func (c *Checker) saveRecord(ctx context.Context, rec *Record) { + if err := c.store.Update(ctx, rec); err != nil { + log.Errorw("failed to update record", "cid", rec.Cid, "error", err) + } +} + +func (c *Checker) hasStorageBudget(ctx context.Context) bool { + if c.storage == nil { + return true + } + used, limit, err := c.storage.StorageUsage(ctx) + if err != nil { + log.Warnw("failed to check storage usage, proceeding with pin", "error", err) + return true + } + if limit == 0 { + return true + } + return used < limit +} + +// CountProviders counts unique providers with target+1 results to ensure that, even if selfID appears, we still discover up to target. +func CountProviders(ctx context.Context, cr routing.ContentRouting, selfID peer.ID, c cid.Cid, target int) int { + ch := cr.FindProvidersAsync(ctx, c, target+1) + + seen := make(map[peer.ID]struct{}) + for pi := range ch { + if pi.ID == selfID { + continue + } + seen[pi.ID] = struct{}{} + } + return len(seen) +} + +// PinHasName is used by checker (via PinService.HasPinWithName) and the rm command to identify pins managed by on-demand pinning. +func PinHasName(ctx context.Context, p pin.Pinner, c cid.Cid, name string) (bool, error) { + results, err := p.CheckIfPinnedWithType(ctx, pin.Recursive, true, c) + if err != nil { + return false, err + } + for _, r := range results { + if r.Pinned() && r.Name == name { + return true, nil + } + } + return false, nil +} diff --git a/ondemandpin/checker_test.go b/ondemandpin/checker_test.go new file mode 100644 index 00000000000..02ced95d2ee --- /dev/null +++ b/ondemandpin/checker_test.go @@ -0,0 +1,165 @@ +package ondemandpin + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + "github.com/ipfs/kubo/config" + peer "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type fakeClock struct{ t atomic.Int64 } + +func newFakeClock() *fakeClock { + c := &fakeClock{} + c.t.Store(time.Now().UnixNano()) + return c +} + +func (c *fakeClock) Now() time.Time { return time.Unix(0, c.t.Load()) } +func (c *fakeClock) Advance(d time.Duration) { c.t.Add(int64(d)) } + +type mockRouting struct { + mu sync.Mutex + providers map[cid.Cid][]peer.AddrInfo +} + +func newMockRouting() *mockRouting { + return &mockRouting{providers: make(map[cid.Cid][]peer.AddrInfo)} +} + +func (m *mockRouting) setProviders(c cid.Cid, peerIDs ...peer.ID) { + m.mu.Lock() + defer m.mu.Unlock() + infos := make([]peer.AddrInfo, len(peerIDs)) + for i, pid := range peerIDs { + infos[i] = peer.AddrInfo{ID: pid} + } + m.providers[c] = infos +} + +func (m *mockRouting) FindProvidersAsync(ctx context.Context, c cid.Cid, limit int) <-chan peer.AddrInfo { + ch := make(chan peer.AddrInfo) + go func() { + defer close(ch) + m.mu.Lock() + provs := m.providers[c] + m.mu.Unlock() + for i, pi := range provs { + if i >= limit { + break + } + select { + case ch <- pi: + case <-ctx.Done(): + return + } + } + }() + return ch +} + +func (m *mockRouting) Provide(context.Context, cid.Cid, bool) error { return nil } + +type mockPins struct { + pinned map[cid.Cid]string +} + +func newMockPins() *mockPins { return &mockPins{pinned: make(map[cid.Cid]string)} } + +func (m *mockPins) Pin(_ context.Context, c cid.Cid, name string) error { + m.pinned[c] = name + return nil +} + +func (m *mockPins) Unpin(_ context.Context, c cid.Cid) error { + delete(m.pinned, c) + return nil +} + +func (m *mockPins) IsPinned(_ context.Context, c cid.Cid) (bool, error) { + _, ok := m.pinned[c] + return ok, nil +} + +func (m *mockPins) HasPinWithName(_ context.Context, c cid.Cid, name string) (bool, error) { + n, ok := m.pinned[c] + return ok && n == name, nil +} + +func (m *mockPins) isPinned(c cid.Cid) bool { + _, ok := m.pinned[c] + return ok +} + +func newTestChecker(t *testing.T) (*Checker, *Store, *mockRouting, *mockPins, *fakeClock) { + t.Helper() + store := NewStore(dssync.MutexWrap(datastore.NewMapDatastore())) + r := newMockRouting() + p := newMockPins() + clock := newFakeClock() + + checker := NewChecker(store, p, nil, r, peer.ID("self"), config.OnDemandPinning{}) + checker.checkInterval = time.Minute + checker.unpinGracePeriod = 200 * time.Millisecond + checker.now = clock.Now + + return checker, store, r, p, clock +} + +// Under-replicated content gets pinned. +func TestCheckerPinsBelowTarget(t *testing.T) { + ctx := context.Background() + checker, store, r, p, _ := newTestChecker(t) + c := testCID(t, "under-replicated") + + require.NoError(t, store.Add(ctx, c)) + r.setProviders(c, peer.ID("p1"), peer.ID("p2")) + + checker.checkAll(ctx) + + assert.True(t, p.isPinned(c)) +} + +// Well-replicated content is left alone. +func TestCheckerDoesNotPinAboveTarget(t *testing.T) { + ctx := context.Background() + checker, store, r, p, _ := newTestChecker(t) + c := testCID(t, "well-replicated") + + require.NoError(t, store.Add(ctx, c)) + r.setProviders(c, peer.ID("p1"), peer.ID("p2"), peer.ID("p3"), peer.ID("p4"), peer.ID("p5"), peer.ID("p6")) + + checker.checkAll(ctx) + + assert.False(t, p.isPinned(c)) +} + +// Pinned content is unpinned only after the grace period expires. +func TestCheckerUnpinsAfterGracePeriod(t *testing.T) { + ctx := context.Background() + checker, store, r, p, clock := newTestChecker(t) + c := testCID(t, "recovering") + + require.NoError(t, store.Add(ctx, c)) + r.setProviders(c, peer.ID("p1")) + checker.checkAll(ctx) + require.True(t, p.isPinned(c)) + + // Providers recover above target. + r.setProviders(c, peer.ID("p1"), peer.ID("p2"), peer.ID("p3"), peer.ID("p4"), peer.ID("p5"), peer.ID("p6")) + checker.checkAll(ctx) + assert.True(t, p.isPinned(c), "not yet past grace period") + + clock.Advance(250 * time.Millisecond) + checker.checkAll(ctx) + assert.False(t, p.isPinned(c), "past grace period") +} diff --git a/ondemandpin/store.go b/ondemandpin/store.go new file mode 100644 index 00000000000..0c50bb0be45 --- /dev/null +++ b/ondemandpin/store.go @@ -0,0 +1,143 @@ +package ondemandpin + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "sync" + "time" + + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/query" +) + +var ( + ErrAlreadyRegistered = errors.New("CID is already registered for on-demand pinning") + ErrNotRegistered = errors.New("CID is not registered for on-demand pinning") +) + +const dsPrefix = "/ondemand-pins/" + +type Record struct { + Cid cid.Cid `json:"Cid"` + // PinnedByUs tracks the on-demand pins only (does not include standard pins). + PinnedByUs bool `json:"IsPinned"` + LastAboveTarget time.Time `json:"LastAboveTarget,omitempty"` + CreatedAt time.Time `json:"CreatedAt"` +} + +type Store struct { + ds datastore.Batching + mu sync.RWMutex + now func() time.Time +} + +func NewStore(ds datastore.Batching) *Store { + return &Store{ds: ds, now: time.Now} +} + +func dsKey(c cid.Cid) datastore.Key { + return datastore.NewKey(dsPrefix + c.String()) +} + +func (s *Store) Add(ctx context.Context, c cid.Cid) error { + s.mu.Lock() + defer s.mu.Unlock() + + key := dsKey(c) + + if has, err := s.ds.Has(ctx, key); err != nil { + return fmt.Errorf("checking existing record: %w", err) + } else if has { + return fmt.Errorf("%s: %w", c, ErrAlreadyRegistered) + } + + rec := Record{ + Cid: c, + CreatedAt: s.now(), + } + + return s.put(ctx, key, &rec) +} + +func (s *Store) Remove(ctx context.Context, c cid.Cid) error { + s.mu.Lock() + defer s.mu.Unlock() + + key := dsKey(c) + if has, err := s.ds.Has(ctx, key); err != nil { + return fmt.Errorf("checking record: %w", err) + } else if !has { + return fmt.Errorf("%s: %w", c, ErrNotRegistered) + } + + if err := s.ds.Delete(ctx, key); err != nil { + return fmt.Errorf("deleting record: %w", err) + } + return s.ds.Sync(ctx, key) +} + +func (s *Store) Get(ctx context.Context, c cid.Cid) (*Record, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + return s.get(ctx, dsKey(c)) +} + +func (s *Store) List(ctx context.Context) ([]Record, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + results, err := s.ds.Query(ctx, query.Query{ + Prefix: dsPrefix, + }) + if err != nil { + return nil, fmt.Errorf("querying on-demand pins: %w", err) + } + defer results.Close() + + var records []Record + for result := range results.Next() { + if result.Error != nil { + return nil, fmt.Errorf("iterating on-demand pins: %w", result.Error) + } + var rec Record + if err := json.Unmarshal(result.Value, &rec); err != nil { + return nil, fmt.Errorf("unmarshaling record %s: %w", result.Key, err) + } + records = append(records, rec) + } + return records, nil +} + +func (s *Store) Update(ctx context.Context, rec *Record) error { + s.mu.Lock() + defer s.mu.Unlock() + + return s.put(ctx, dsKey(rec.Cid), rec) +} + +func (s *Store) get(ctx context.Context, key datastore.Key) (*Record, error) { + val, err := s.ds.Get(ctx, key) + if err != nil { + return nil, fmt.Errorf("getting record: %w", err) + } + var rec Record + if err := json.Unmarshal(val, &rec); err != nil { + return nil, fmt.Errorf("unmarshaling record: %w", err) + } + return &rec, nil +} + +func (s *Store) put(ctx context.Context, key datastore.Key, rec *Record) error { + val, err := json.Marshal(rec) + if err != nil { + return fmt.Errorf("marshaling record: %w", err) + } + if err := s.ds.Put(ctx, key, val); err != nil { + return fmt.Errorf("storing record: %w", err) + } + return s.ds.Sync(ctx, key) +} diff --git a/ondemandpin/store_test.go b/ondemandpin/store_test.go new file mode 100644 index 00000000000..7383889f085 --- /dev/null +++ b/ondemandpin/store_test.go @@ -0,0 +1,57 @@ +package ondemandpin + +import ( + "context" + "testing" + + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + mh "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func testCID(t *testing.T, data string) cid.Cid { + t.Helper() + h, err := mh.Sum([]byte(data), mh.SHA2_256, -1) + require.NoError(t, err) + return cid.NewCidV1(cid.Raw, h) +} + +func newTestStore(t *testing.T) *Store { + t.Helper() + return NewStore(dssync.MutexWrap(datastore.NewMapDatastore())) +} + +// Add, Get, Remove lifecycle works. +func TestStoreRoundTrip(t *testing.T) { + ctx := context.Background() + s := newTestStore(t) + c := testCID(t, "hello") + + require.NoError(t, s.Add(ctx, c)) + + rec, err := s.Get(ctx, c) + require.NoError(t, err) + assert.Equal(t, c, rec.Cid) + assert.False(t, rec.PinnedByUs) + + require.NoError(t, s.Remove(ctx, c)) + + _, err = s.Get(ctx, c) + assert.Error(t, err) +} + +// List returns all registered records. +func TestStoreList(t *testing.T) { + ctx := context.Background() + s := newTestStore(t) + + require.NoError(t, s.Add(ctx, testCID(t, "a"))) + require.NoError(t, s.Add(ctx, testCID(t, "b"))) + + records, err := s.List(ctx) + require.NoError(t, err) + assert.Len(t, records, 2) +}